import subprocess
import logging
import re
from typing import Dict, Any, List, Optional
class BGPRouter:
"""BGP Router class for parsing and managing BGP route information."""
def __init__(self, local_asn: str = '65412'):
"""Initialize BGP router with local ASN.
Args:
local_asn: Local BGP ASN number
"""
self.local_asn = local_asn
def _normalize_network_cidr(self, network: str) -> str:
"""Normalize network address by adding appropriate CIDR notation.
Args:
network: Network address (e.g., "172.31.0.0" or "172.16.0.1/32")
Returns:
Network address with appropriate CIDR notation
"""
if '/' in network:
return network
try:
octets = network.split('.')
if len(octets) != 4:
return network
if network == '0.0.0.0':
return '0.0.0.0/0'
if octets[1:] == ['0', '0', '0']:
return f"{network}/8"
if octets[2:] == ['0', '0']:
return f"{network}/16"
if octets[3] == '0':
return f"{network}/24"
except (ValueError, IndexError):
return network
def _parse_as_path(self, path_info: str) -> str:
"""Extract AS path from BGP path information.
Args:
path_info: Raw path information from BGP output
Returns:
Cleaned AS path string
"""
path_info = path_info.strip()
if path_info == 'i':
return self.local_asn
path_parts = path_info.split()
as_numbers = [part for part in path_parts if part.isdigit()]
return ' '.join(as_numbers)
def _parse_route_line(self, line: str) -> Optional[Dict[str, Any]]:
"""Parse a single BGP route line.
Args:
line: BGP route line from show command output
Returns:
Route dictionary or None if parsing fails
"""
pattern = r'^\*>\s+(\S+)\s+(\S+)\s+(\d+)\s+(\d+)\s+(.+)$'
match = re.match(pattern, line)
if not match:
return None
network, next_hop, metric_str, weight_str, path_info = match.groups()
try:
return {
"network": self._normalize_network_cidr(network),
"nextHopIp": next_hop,
"med": int(metric_str),
"localPref": 100,
"weight": int(weight_str),
"asPath": self._parse_as_path(path_info)
}
except ValueError as e:
logging.warning(f"Failed to parse route line '{line}': {e}")
return None
def _find_route_start_index(self, lines: List[str]) -> Optional[int]:
"""Find the index where BGP routes start in the output.
Args:
lines: List of output lines
Returns:
Index of first route line or None if not found
"""
for i, line in enumerate(lines):
if 'Network' in line and 'Next Hop' in line:
return i + 1
return None
def _get_all_bgp_routes(self) -> Dict[str, List[Dict[str, Any]]]:
"""Parse BGP route table output and return structured data.
Returns:
Dictionary containing list of BGP routes
"""
try:
output = """BGP table version is 0, local router ID is 169.254.112.97
Status codes: s suppressed, d damped, h history, * valid, > best, i - internal,
r RIB-failure, S Stale, R Removed
Origin codes: i - IGP, e - EGP, ? - incomplete
Network Next Hop Metric LocPrf Weight Path
*> 0.0.0.0 169.254.112.98 0 0 65000 i
*> 172.16.0.1/32 169.254.112.98 0 0 65000 i
*> 172.16.0.2/32 169.254.112.98 0 0 65000 65114 49449 i
*> 172.16.0.3/32 169.254.112.98 0 0 65000 i
*> 172.16.0.4/32 169.254.112.98 0 0 65000 i
*> 172.16.0.5/32 169.254.112.98 0 0 65000 i
*> 172.16.0.112/32 169.254.112.98 0 0 65000 i
*> 172.16.0.113/32 169.254.112.98 0 0 65000 i
*> 172.16.0.114/32 169.254.112.98 0 0 65000 i
*> 172.16.0.115/32 169.254.112.98 0 0 65000 i
*> 172.16.0.116/32 169.254.112.98 0 0 65000 i
*> 172.16.0.117/32 169.254.112.98 0 0 65000 i
*> 172.16.0.118/32 169.254.112.98 0 0 65000 i
*> 172.16.0.119/32 169.254.112.98 0 0 65000 i
*> 172.16.0.120/32 169.254.112.98 0 0 65000 i
*> 172.16.0.121/32 169.254.112.98 0 0 65000 i
*> 172.31.0.0 169.254.112.97 100 32768 i
*> 172.0.0.0 169.254.112.97 100 32768 i
*> 172.1.1.0 169.254.112.97 100 32768 i
*> 172.31.0.1/32 169.254.112.97 100 32768 i
*> 172.31.0.2/32 169.254.112.97 100 32768 i
*> 172.31.0.3/32 169.254.112.97 100 32768 i
*> 172.31.0.4/32 169.254.112.97 100 32768 i
*> 2001:db8:2::/64 fe80::2 100 32768 i
*> 2001:db8:2::1/128 fe80::2 100 32768 i
*> 2001:db8:2::2/128 fe80::2 100 32768 i
*> 2001:db8:2::3/128 fe80::2 100 32768 i
*> 2001:db8:2::4/128 fe80::2 100 32768 i
Total number of prefixes 3233"""
if not output or not output.strip():
logging.warning("BGP command returned empty output")
return {"routes": []}
lines = output.strip().split('\n')
route_start_idx = self._find_route_start_index(lines)
if route_start_idx is None:
logging.warning("BGP output does not contain expected header format")
return {"routes": []}
route_lines = [
line.strip() for line in lines[route_start_idx:]
if line.strip() and line.strip().startswith('*>')
and not line.strip().startswith('Total number')
]
routes = []
for line in route_lines:
route = self._parse_route_line(line)
if route is not None:
routes.append(route)
return {"routes": routes}
except Exception as e:
logging.error(f"Failed to get BGP routes: {e}")
return {"routes": []}
def get_specific_network(self, bgp_properties: Dict[str, Any], prefix: str) -> Dict[str, Any]:
"""Find specific network in BGP properties.
Args:
bgp_properties: BGP properties dictionary
prefix: Network prefix to search for
Returns:
Route information dictionary or empty dict if not found
"""
if not bgp_properties or not isinstance(bgp_properties, dict):
logging.warning(f"Invalid BGP properties: {type(bgp_properties)}")
return {}
routes = bgp_properties.get("routes", [])
logging.debug(f"Searching for prefix '{prefix}' in {len(routes)} routes")
try:
route = next(
route for route in routes
if route.get("network") == prefix
)
logging.debug(f"Found matching route for prefix '{prefix}': {route}")
return route
except StopIteration:
logging.debug(f"No route found for prefix '{prefix}'")
return {}
def main():
"""Main function to demonstrate BGP router functionality."""
router = BGPRouter()
bgp_routes = router._get_all_bgp_routes()
print(f"Parsed {len(bgp_routes['routes'])} BGP routes")
print(bgp_routes)
if __name__ == "__main__":
main()