import subprocess
import logging
import re
from typing import Dict, Any
class BGPRouter():
def __init__(self):
self.headend_config = type('obj', (object,), {'local_bgp_asn': '65412'})
def _normalize_network_cidr(self, network: str) -> str:
"""Normalize network address by adding appropriate CIDR notation
when networks dont already contain them
Args:
network: Network address (e.g., "172.31.0.0" or "172.16.0.1/32")
Returns:
Network address with appropriate CIDR notation
"""
if '/' in network:
return network
try:
octets = network.split('.')
if len(octets) != 4:
return network
if octets[1:] == ['0', '0', '0']:
return f"{network}/8"
elif octets[2:] == ['0', '0']:
return f"{network}/16"
elif octets[3] == '0':
return f"{network}/24"
else:
return f"{network}/32"
except (ValueError, IndexError):
return network
def _get_all_bgp_attributes(self) -> Dict[str, Any]:
"""
Parse BGP route table output and return structured data.
Returns:
dict: Structured BGP routes data
"""
routes = []
try:
output = """BGP table version is 0, local router ID is 169.254.112.97
Status codes: s suppressed, d damped, h history, * valid, > best, i - internal,
r RIB-failure, S Stale, R Removed
Origin codes: i - IGP, e - EGP, ? - incomplete
Network Next Hop Metric LocPrf Weight Path
*> 172.16.0.1/32 169.254.112.98 0 0 65000 i
*> 172.16.0.2/32 169.254.112.98 0 0 65000 65114 49449 i
*> 172.16.0.3/32 169.254.112.98 0 0 65000 i
*> 172.16.0.4/32 169.254.112.98 0 0 65000 i
*> 172.16.0.5/32 169.254.112.98 0 0 65000 i
*> 172.16.0.112/32 169.254.112.98 0 0 65000 i
*> 172.16.0.113/32 169.254.112.98 0 0 65000 i
*> 172.16.0.114/32 169.254.112.98 0 0 65000 i
*> 172.16.0.115/32 169.254.112.98 0 0 65000 i
*> 172.16.0.116/32 169.254.112.98 0 0 65000 i
*> 172.16.0.117/32 169.254.112.98 0 0 65000 i
*> 172.16.0.118/32 169.254.112.98 0 0 65000 i
*> 172.16.0.119/32 169.254.112.98 0 0 65000 i
*> 172.16.0.120/32 169.254.112.98 0 0 65000 i
*> 172.16.0.121/32 169.254.112.98 0 0 65000 i
*> 172.31.0.0 169.254.112.97 100 32768 i
*> 172.31.0.1/32 169.254.112.97 100 32768 i
*> 172.31.0.2/32 169.254.112.97 100 32768 i
*> 172.31.0.3/32 169.254.112.97 100 32768 i
*> 172.31.0.4/32 169.254.112.97 100 32768 i
*> 2001:db8:2::/64 fe80::2 100 32768 i
*> 2001:db8:2::1/128 fe80::2 100 32768 i
*> 2001:db8:2::2/128 fe80::2 100 32768 i
*> 2001:db8:2::3/128 fe80::2 100 32768 i
*> 2001:db8:2::4/128 fe80::2 100 32768 i
Total number of prefixes 3233"""
if not output or not output.strip():
logging.warning("BGP command returned empty output")
return {"routes": routes}
lines = output.strip().split('\n')
route_start_idx = None
for i, line in enumerate(lines):
if 'Network' in line and 'Next Hop' in line:
route_start_idx = i + 1
break
if route_start_idx is None:
logging.warning("BGP output does not contain expected header format")
return {"routes": routes}
for line in lines[route_start_idx:]:
line = line.strip()
if not line or line.startswith('Total number'):
continue
if not line.startswith('*>'):
continue
pattern = r'^\*>\s+(\S+)\s+(\S+)\s+(\d+)\s+(\d+)\s+(.+)$'
match = re.match(pattern, line)
if match:
network = self._normalize_network_cidr(match.group(1))
next_hop = match.group(2)
metric = int(match.group(3))
loc_prf = 100
weight = int(match.group(4))
path_info = match.group(5).strip()
path_parts = path_info.split()
as_path = []
for part in path_parts:
if part.isdigit():
as_path.append(part)
path = ' '.join(as_path) if as_path else ""
if path_info.strip() == "i":
path = str(self.headend_config.local_bgp_asn)
route = {
"network": network,
"nextHopIp": next_hop,
"med": metric,
"localPref": loc_prf,
"weight": weight,
"asPath": path
}
routes.append(route)
return {"routes": routes}
except Exception as e:
logging.error(f"Failed to get all BGP prefixes: {e}")
return {"routes": routes}
def _get_specific_network(self, bgp_properties, prefix):
if not bgp_properties or not isinstance(bgp_properties, dict):
logging.warning(f"BGP properties is None or invalid type: {type(bgp_properties)}")
return {}
if "routes" not in bgp_properties:
logging.warning(f"BGP properties missing 'routes' key: {bgp_properties}")
return {}
routes = bgp_properties["routes"]
logging.debug(f"Searching for prefix '{prefix}' in {len(routes)} BGP routes")
for route in routes:
if route.get("network") == prefix:
logging.debug(f"Found matching route for prefix '{prefix}': {route}")
return route
return {}
def main():
router = BGPRouter()
bgp_routes = router._get_all_bgp_attributes()
print(bgp_routes)
if __name__ == "__main__":
main()
"""
{
'routes': [{
'network': '172.16.0.1/32',
'nextHopIp': '169.254.112.98',
'med': 0,
'localPref': 100,
'weight': 0,
'asPath': '65000'
}, {
'network': '172.16.0.2/32',
'nextHopIp': '169.254.112.98',
'med': 0,
'localPref': 100,
'weight': 0,
'asPath': '65000 65114 49449'
}, {
'network': '172.16.0.3/32',
'nextHopIp': '169.254.112.98',
'med': 0,
'localPref': 100,
'weight': 0,
'asPath': '65000'
}, {
'network': '172.16.0.4/32',
'nextHopIp': '169.254.112.98',
'med': 0,
'localPref': 100,
'weight': 0,
'asPath': '65000'
}, {
'network': '172.16.0.5/32',
'nextHopIp': '169.254.112.98',
'med': 0,
'localPref': 100,
'weight': 0,
'asPath': '65000'
}, {
'network': '172.16.0.112/32',
'nextHopIp': '169.254.112.98',
'med': 0,
'localPref': 100,
'weight': 0,
'asPath': '65000'
}, {
'network': '172.16.0.113/32',
'nextHopIp': '169.254.112.98',
'med': 0,
'localPref': 100,
'weight': 0,
'asPath': '65000'
}, {
'network': '172.16.0.114/32',
'nextHopIp': '169.254.112.98',
'med': 0,
'localPref': 100,
'weight': 0,
'asPath': '65000'
}, {
'network': '172.16.0.115/32',
'nextHopIp': '169.254.112.98',
'med': 0,
'localPref': 100,
'weight': 0,
'asPath': '65000'
}, {
'network': '172.16.0.116/32',
'nextHopIp': '169.254.112.98',
'med': 0,
'localPref': 100,
'weight': 0,
'asPath': '65000'
}, {
'network': '172.16.0.117/32',
'nextHopIp': '169.254.112.98',
'med': 0,
'localPref': 100,
'weight': 0,
'asPath': '65000'
}, {
'network': '172.16.0.118/32',
'nextHopIp': '169.254.112.98',
'med': 0,
'localPref': 100,
'weight': 0,
'asPath': '65000'
}, {
'network': '172.16.0.119/32',
'nextHopIp': '169.254.112.98',
'med': 0,
'localPref': 100,
'weight': 0,
'asPath': '65000'
}, {
'network': '172.16.0.120/32',
'nextHopIp': '169.254.112.98',
'med': 0,
'localPref': 100,
'weight': 0,
'asPath': '65000'
}, {
'network': '172.16.0.121/32',
'nextHopIp': '169.254.112.98',
'med': 0,
'localPref': 100,
'weight': 0,
'asPath': '65000'
}, {
'network': '172.31.0.0/16',
'nextHopIp': '169.254.112.97',
'med': 100,
'localPref': 100,
'weight': 32768,
'asPath': '65412'
}, {
'network': '172.31.0.1/32',
'nextHopIp': '169.254.112.97',
'med': 100,
'localPref': 100,
'weight': 32768,
'asPath': '65412'
}, {
'network': '172.31.0.2/32',
'nextHopIp': '169.254.112.97',
'med': 100,
'localPref': 100,
'weight': 32768,
'asPath': '65412'
}, {
'network': '172.31.0.3/32',
'nextHopIp': '169.254.112.97',
'med': 100,
'localPref': 100,
'weight': 32768,
'asPath': '65412'
}, {
'network': '172.31.0.4/32',
'nextHopIp': '169.254.112.97',
'med': 100,
'localPref': 100,
'weight': 32768,
'asPath': '65412'
}, {
'network': '2001:db8:2::',
'nextHopIp': 'fe80::2',
'med': 100,
'localPref': 100,
'weight': 32768,
'asPath': '65412'
}, {
'network': '2001:db8:2::1/128',
'nextHopIp': 'fe80::2',
'med': 100,
'localPref': 100,
'weight': 32768,
'asPath': '65412'
}, {
'network': '2001:db8:2::2/128',
'nextHopIp': 'fe80::2',
'med': 100,
'localPref': 100,
'weight': 32768,
'asPath': '65412'
}, {
'network': '2001:db8:2::3/128',
'nextHopIp': 'fe80::2',
'med': 100,
'localPref': 100,
'weight': 32768,
'asPath': '65412'
}, {
'network': '2001:db8:2::4/128',
'nextHopIp': 'fe80::2',
'med': 100,
'localPref': 100,
'weight': 32768,
'asPath': '65412'
}]
}
"""