import logging
import os
import re
import subprocess
import time
import json
from typing import Optional, Tuple, List, Dict, Any
from glob import glob
from datetime import datetime
class BGPRouteParser():
bgp_route_pattern = r'^\*>\s+(\S+)\s+(\S+)\s+(\d+)\s+(\d+)\s+(.+)$'
def _normalize_network_cidr(self, network: str) -> str:
"""Normalize network address by adding appropriate CIDR notation.
Args:
network: Network address (e.g., "172.31.0.0" or "172.16.0.1/32")
Returns:
Network address with appropriate CIDR notation
"""
if '/' in network:
return network
try:
octets = network.split('.')
if len(octets) != 4:
return network
if network == '0.0.0.0':
return '0.0.0.0/0'
if octets[1:] == ['0', '0', '0']:
return f"{network}/8"
if octets[2:] == ['0', '0']:
return f"{network}/16"
if octets[3] == '0':
return f"{network}/24"
except (ValueError, IndexError):
return network
def _parse_as_path(self, path_info: str) -> str:
"""Extract AS path from BGP path information.
Args:
path_info: Raw path information from BGP output
Returns:
Cleaned AS path string
"""
path_info = path_info.strip()
if path_info == 'i':
return str(self.headend_config.local_bgp_asn)
path_parts = path_info.split()
as_numbers = [part for part in path_parts if part.isdigit()]
return ' '.join(as_numbers)
def _parse_route_line(self, line: str) -> Optional[Dict[str, Any]]:
"""Parse a single BGP route line (IPv4 format).
Args:
line: BGP route line from show command output
Returns:
Route dictionary or None if parsing fails
"""
match = re.match(self.bgp_route_pattern, line)
if not match:
return None
network, next_hop, metric_str, weight_str, path_info = match.groups()
try:
return {
"network": self._normalize_network_cidr(network),
"nextHopIp": next_hop,
"med": int(metric_str),
"localPref": 100,
"weight": int(weight_str),
"asPath": self._parse_as_path(path_info)
}
except ValueError as e:
logging.warning(f"Failed to parse route line '{line}': {e}")
return None
def _parse_ipv6_route_block(self, lines: List[str], start_idx: int) -> Tuple[Optional[Dict[str, Any]], int]:
"""Parse a multi-line IPv6 BGP route block.
Args:
lines: List of all output lines
start_idx: Index of the first line of the route block
Returns:
Tuple of (route dictionary or None, next line index to process)
"""
if start_idx >= len(lines):
return None, start_idx + 1
first_line = lines[start_idx].strip()
if not first_line.startswith('*>'):
return None, start_idx + 1
network_match = re.match(r'^\*>\s+(\S+)$', first_line)
if not network_match:
return None, start_idx + 1
network = network_match.group(1)
if start_idx + 1 >= len(lines):
return None, start_idx + 1
second_line = lines[start_idx + 1].strip()
if not second_line or second_line.startswith('*>'):
return None, start_idx + 1
next_hop = second_line
if start_idx + 2 >= len(lines):
return None, start_idx + 2
third_line = lines[start_idx + 2].strip()
if not third_line or third_line.startswith('*>'):
return None, start_idx + 2
path_parts = third_line.split()
if len(path_parts) < 4:
return None, start_idx + 3
try:
metric_str = path_parts[0]
weight_str = path_parts[-2]
path_info = path_parts[-1]
return {
"network": self._normalize_network_cidr(network),
"nextHopIp": next_hop,
"med": int(metric_str),
"localPref": 100,
"weight": int(weight_str),
"asPath": self._parse_as_path(path_info)
}, start_idx + 3
except (ValueError, IndexError) as e:
logging.warning(f"Failed to parse IPv6 route block starting at line {start_idx}: {e}")
return None, start_idx + 3
def _find_route_start_index(self, lines: List[str]) -> Optional[int]:
"""Find the index where BGP routes start in the output.
Args:
lines: List of output lines
Returns:
Index of first route line or None if not found
"""
for i, line in enumerate(lines):
if 'Network' in line and 'Next Hop' in line:
return i + 1
return None
def _get_ipv4_bgp_routes(self) -> List[Dict[str, Any]]:
"""Parse IPv4 BGP route table output and return structured data.
Returns:
List of IPv4 BGP routes
"""
try:
output = """BGP table version is 0, local router ID is 169.254.148.249
Status codes: s suppressed, d damped, h history, * valid, > best, i - internal,
r RIB-failure, S Stale, R Removed
Origin codes: i - IGP, e - EGP, ? - incomplete
Network Next Hop Metric LocPrf Weight Path
*> 172.16.0.1/32 169.254.50.85 0 0 65000 i
Total number of prefixes 2"""
if not output or not output.strip():
logging.warning("IPv4 BGP command returned empty output")
return []
lines = output.strip().split('\n')
route_start_idx = self._find_route_start_index(lines)
if route_start_idx is None:
logging.warning("IPv4 BGP output does not contain expected header format")
return []
route_lines = [
line.strip() for line in lines[route_start_idx:]
if line.strip() and line.strip().startswith('*>')
and not line.strip().startswith('Total number')
]
routes = []
for line in route_lines:
route = self._parse_route_line(line)
if route is not None:
routes.append(route)
return routes
except Exception as e:
logging.error(f"Failed to get IPv4 BGP routes: {e}")
return []
def _get_ipv6_bgp_routes(self) -> List[Dict[str, Any]]:
"""Parse IPv6 BGP route table output and return structured data.
Returns:
List of IPv6 BGP routes
"""
try:
output = """BGP table version is 0, local router ID is 169.254.148.249
Status codes: s suppressed, d damped, h history, * valid, > best, i - internal,
r RIB-failure, S Stale, R Removed
Origin codes: i - IGP, e - EGP, ? - incomplete
Network Next Hop Metric LocPrf Weight Path
*> 2600:1702:5fc0:8792::/64
fd1c:a349:ac38:7ea1:792e:e716:baa1:52b6
0 0 65000 i
*> 2600:4700:4700::1111/128
fd1c:a349:ac38:7ea1:792e:e716:baa1:52b5
100 32768 i
Total number of prefixes 2"""
if not output or not output.strip():
logging.warning("IPv6 BGP command returned empty output")
return []
lines = output.strip().split('\n')
route_start_idx = self._find_route_start_index(lines)
if route_start_idx is None:
logging.warning("IPv6 BGP output does not contain expected header format")
return []
routes = []
i = route_start_idx
while i < len(lines):
line = lines[i].strip()
if not line or line.startswith('Total number'):
i += 1
continue
if line.startswith('*>'):
route, next_idx = self._parse_ipv6_route_block(lines, i)
if route is not None:
routes.append(route)
i = next_idx
else:
i += 1
return routes
except Exception as e:
logging.error(f"Failed to get IPv6 BGP routes: {e}")
return []
def _get_all_bgp_routes(self) -> Dict[str, List[Dict[str, Any]]]:
"""Parse both IPv4 and IPv6 BGP route table outputs and return structured data.
Returns:
Dictionary containing list of all BGP routes (IPv4 and IPv6 combined)
"""
try:
ipv4_routes = self._get_ipv4_bgp_routes()
ipv6_routes = self._get_ipv6_bgp_routes()
all_routes = ipv4_routes + ipv6_routes
logging.debug(f"Retrieved {len(ipv4_routes)} IPv4 routes and {len(ipv6_routes)} IPv6 routes")
return {"routes": all_routes}
except Exception as e:
logging.error(f"Failed to get all BGP routes: {e}")
return {"routes": []}
def _get_specific_network(self, bgp_properties: Dict[str, Any], prefix: str) -> Dict[str, Any]:
"""Find specific network in BGP properties.
Args:
bgp_properties: BGP properties dictionary
prefix: Network prefix to search for
Returns:
Route information dictionary or empty dict if not found
"""
if not bgp_properties or not isinstance(bgp_properties, dict):
logging.warning(f"Invalid BGP properties: {type(bgp_properties)}")
return {}
routes = bgp_properties.get("routes", [])
logging.debug(f"Searching for prefix '{prefix}' in {len(routes)} routes")
try:
route = next(
route for route in routes
if route.get("network") == prefix
)
logging.debug(f"Found matching route for prefix '{prefix}': {route}")
return route
except StopIteration:
logging.debug(f"No route found for prefix '{prefix}'")
return {}
def main():
"""Main function to demonstrate BGP route parsing."""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
parser = BGPRouteParser()
print("Parsing BGP routes...")
routes_data = parser._get_all_bgp_routes()
print(f"Found {len(routes_data['routes'])} BGP routes:")
print("-" * 50)
for i, route in enumerate(routes_data['routes'], 1):
print(f"Route {route}:")
return routes_data
if __name__ == "__main__":
main()