Skip to main content
Loading...

More Python Posts

import logging
import os
import re
import subprocess
import time
import json
from typing import Optional, Tuple, List, Dict, Any
from glob import glob
from datetime import datetime


class BGPRouteParser():
    bgp_route_pattern = r'^\*>\s+(\S+)\s+(\S+)\s+(\d+)\s+(\d+)\s+(.+)$'


    def _normalize_network_cidr(self, network: str) -> str:
        """Normalize network address by adding appropriate CIDR notation.
        
        Args:
            network: Network address (e.g., "172.31.0.0" or "172.16.0.1/32")
            
        Returns:
            Network address with appropriate CIDR notation
        """
        if '/' in network:
            return network
            
        try:
            octets = network.split('.')
            if len(octets) != 4:
                return network  # Invalid IP format or IPv6, return as-is

            # Determine CIDR based on trailing zero pattern

            # Check for default route
            if network == '0.0.0.0':
                return '0.0.0.0/0'

            if octets[1:] == ['0', '0', '0']:
                return f"{network}/8"

            if octets[2:] == ['0', '0']:
                return f"{network}/16"

            if octets[3] == '0':
                return f"{network}/24"

        except (ValueError, IndexError):
            return network
        

    def _parse_as_path(self, path_info: str) -> str:
        """Extract AS path from BGP path information.
        
        Args:
            path_info: Raw path information from BGP output
            
        Returns:
            Cleaned AS path string
        """
        path_info = path_info.strip()
        
        # Handle internal routes
        if path_info == 'i':
            return str(self.headend_config.local_bgp_asn)
            
        # Extract AS numbers using list comprehension
        path_parts = path_info.split()
        as_numbers = [part for part in path_parts if part.isdigit()]
        
        return ' '.join(as_numbers)

    def _parse_route_line(self, line: str) -> Optional[Dict[str, Any]]:
        """Parse a single BGP route line (IPv4 format).
        
        Args:
            line: BGP route line from show command output
            
        Returns:
            Route dictionary or None if parsing fails
        """
        
        match = re.match(self.bgp_route_pattern, line)
        if not match:
            return None
            
        network, next_hop, metric_str, weight_str, path_info = match.groups()
        
        try:
            return {
                "network": self._normalize_network_cidr(network),
                "nextHopIp": next_hop,
                "med": int(metric_str),
                "localPref": 100, # Always 100 for learned routes on PEs
                "weight": int(weight_str),
                "asPath": self._parse_as_path(path_info)
            }
        except ValueError as e:
            logging.warning(f"Failed to parse route line '{line}': {e}")
            return None

    def _parse_ipv6_route_block(self, lines: List[str], start_idx: int) -> Tuple[Optional[Dict[str, Any]], int]:
        """Parse a multi-line IPv6 BGP route block.
        
        Args:
            lines: List of all output lines
            start_idx: Index of the first line of the route block
            
        Returns:
            Tuple of (route dictionary or None, next line index to process)
        """
        if start_idx >= len(lines):
            return None, start_idx + 1
            
        # First line: *> network
        first_line = lines[start_idx].strip()
        if not first_line.startswith('*>'):
            return None, start_idx + 1
            
        # Extract network from first line
        network_match = re.match(r'^\*>\s+(\S+)$', first_line)
        if not network_match:
            return None, start_idx + 1
            
        network = network_match.group(1)
        
        # Second line: next hop (indented)
        if start_idx + 1 >= len(lines):
            return None, start_idx + 1
            
        second_line = lines[start_idx + 1].strip()
        if not second_line or second_line.startswith('*>'):
            # This might be a single-line IPv6 route or malformed
            return None, start_idx + 1
            
        next_hop = second_line
        
        # Third line: metric, locprf, weight, path (indented)
        if start_idx + 2 >= len(lines):
            return None, start_idx + 2
            
        third_line = lines[start_idx + 2].strip()
        if not third_line or third_line.startswith('*>'):
            # Malformed route block
            return None, start_idx + 2
            
        # Parse the third line: metric locprf weight path
        path_parts = third_line.split()
        if len(path_parts) < 4:
            return None, start_idx + 3
            
        try:
            metric_str = path_parts[0]
            # Skip locprf (path_parts[1]) as it's empty or not used
            weight_str = path_parts[-2]  # Second to last element
            path_info = path_parts[-1]   # Last element
            
            return {
                "network": self._normalize_network_cidr(network),
                "nextHopIp": next_hop,
                "med": int(metric_str),
                "localPref": 100, # Always 100 for learned routes on PEs
                "weight": int(weight_str),
                "asPath": self._parse_as_path(path_info)
            }, start_idx + 3
            
        except (ValueError, IndexError) as e:
            logging.warning(f"Failed to parse IPv6 route block starting at line {start_idx}: {e}")
            return None, start_idx + 3

    def _find_route_start_index(self, lines: List[str]) -> Optional[int]:
        """Find the index where BGP routes start in the output.
        
        Args:
            lines: List of output lines
            
        Returns:
            Index of first route line or None if not found
        """
        for i, line in enumerate(lines):
            if 'Network' in line and 'Next Hop' in line:
                return i + 1
        return None

    def _get_ipv4_bgp_routes(self) -> List[Dict[str, Any]]:
        """Parse IPv4 BGP route table output and return structured data.
        
        Returns:
            List of IPv4 BGP routes
        """
        try:
            output = """BGP table version is 0, local router ID is 169.254.148.249
    Status codes: s suppressed, d damped, h history, * valid, > best, i - internal,
                r RIB-failure, S Stale, R Removed
    Origin codes: i - IGP, e - EGP, ? - incomplete

    Network          Next Hop            Metric LocPrf Weight Path
    *> 172.16.0.1/32   169.254.50.85         0             0 65000 i


    Total number of prefixes 2"""
            
            # Check if command output is valid
            if not output or not output.strip():
                logging.warning("IPv4 BGP command returned empty output")
                return []
            
            lines = output.strip().split('\n')
            route_start_idx = self._find_route_start_index(lines)
            
            if route_start_idx is None:
                logging.warning("IPv4 BGP output does not contain expected header format")
                return []
            
            # Parse routes using list comprehension and filter
            route_lines = [
                line.strip() for line in lines[route_start_idx:]
                if line.strip() and line.strip().startswith('*>') 
                and not line.strip().startswith('Total number')
            ]
            
            # Parse routes and convert to dictionaries
            routes = []
            for line in route_lines:
                route = self._parse_route_line(line)
                if route is not None:
                    routes.append(route)
            
            return routes
            
        except Exception as e:
            logging.error(f"Failed to get IPv4 BGP routes: {e}")
            return []

    def _get_ipv6_bgp_routes(self) -> List[Dict[str, Any]]:
        """Parse IPv6 BGP route table output and return structured data.
        
        Returns:
            List of IPv6 BGP routes
        """
        try:
            output = """BGP table version is 0, local router ID is 169.254.148.249
    Status codes: s suppressed, d damped, h history, * valid, > best, i - internal,
                r RIB-failure, S Stale, R Removed
    Origin codes: i - IGP, e - EGP, ? - incomplete

    Network          Next Hop            Metric LocPrf Weight Path
    *> 2600:1702:5fc0:8792::/64
                        fd1c:a349:ac38:7ea1:792e:e716:baa1:52b6
                                                0             0 65000 i
    *> 2600:4700:4700::1111/128
                        fd1c:a349:ac38:7ea1:792e:e716:baa1:52b5
                                            100         32768 i

    Total number of prefixes 2"""
            
            # Check if command output is valid
            if not output or not output.strip():
                logging.warning("IPv6 BGP command returned empty output")
                return []
            
            lines = output.strip().split('\n')
            route_start_idx = self._find_route_start_index(lines)
            
            if route_start_idx is None:
                logging.warning("IPv6 BGP output does not contain expected header format")
                return []
            
            # Parse IPv6 routes (multi-line format)
            routes = []
            i = route_start_idx
            
            while i < len(lines):
                line = lines[i].strip()
                
                # Skip empty lines and total count lines
                if not line or line.startswith('Total number'):
                    i += 1
                    continue
                    
                # Process route blocks starting with *>
                if line.startswith('*>'):
                    route, next_idx = self._parse_ipv6_route_block(lines, i)
                    if route is not None:
                        routes.append(route)
                    i = next_idx
                else:
                    i += 1
            
            return routes
            
        except Exception as e:
            logging.error(f"Failed to get IPv6 BGP routes: {e}")
            return []

    def _get_all_bgp_routes(self) -> Dict[str, List[Dict[str, Any]]]:
        """Parse both IPv4 and IPv6 BGP route table outputs and return structured data.
        
        Returns:
            Dictionary containing list of all BGP routes (IPv4 and IPv6 combined)
        """
        try:
            # Get IPv4 routes
            ipv4_routes = self._get_ipv4_bgp_routes()
            
            # Get IPv6 routes
            ipv6_routes = self._get_ipv6_bgp_routes()
            
            # Combine all routes
            all_routes = ipv4_routes + ipv6_routes
            
            logging.debug(f"Retrieved {len(ipv4_routes)} IPv4 routes and {len(ipv6_routes)} IPv6 routes")
            
            return {"routes": all_routes}
            
        except Exception as e:
            logging.error(f"Failed to get all BGP routes: {e}")
            return {"routes": []}

    def _get_specific_network(self, bgp_properties: Dict[str, Any], prefix: str) -> Dict[str, Any]:
        """Find specific network in BGP properties.
        
        Args:
            bgp_properties: BGP properties dictionary
            prefix: Network prefix to search for
            
        Returns:
            Route information dictionary or empty dict if not found
        """
        if not bgp_properties or not isinstance(bgp_properties, dict):
            logging.warning(f"Invalid BGP properties: {type(bgp_properties)}")
            return {}
            
        routes = bgp_properties.get("routes", [])
        logging.debug(f"Searching for prefix '{prefix}' in {len(routes)} routes")
        
        # Use next() with generator expression for efficient search
        try:
            route = next(
                route for route in routes 
                if route.get("network") == prefix
            )
            logging.debug(f"Found matching route for prefix '{prefix}': {route}")
            return route
        except StopIteration:
            logging.debug(f"No route found for prefix '{prefix}'")
            return {}


def main():
    """Main function to demonstrate BGP route parsing."""
    # Configure logging
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s'
    )
    
    # Create parser instance
    parser = BGPRouteParser()
    
    # Get BGP routes
    print("Parsing BGP routes...")
    routes_data = parser._get_all_bgp_routes()
    
    # Display results
    print(f"Found {len(routes_data['routes'])} BGP routes:")
    print("-" * 50)
    
    for i, route in enumerate(routes_data['routes'], 1):
        print(f"Route {route}:")

    
    return routes_data


if __name__ == "__main__":
    main()
class ProposalParser:
    """A class to parse IKE and ESP proposal strings into human-readable formats.

    This class supports parsing of IKE and ESP proposals, extracting encryption, hash, PRF (for IKE),
    and Diffie-Hellman (DH) group information. It also handles the concatenation of these components
    into a structured format, indicating whether Perfect Forward Secrecy (PFS) is enabled for ESP proposals.
    The parser uses predefined mappings for DH groups, encryption algorithms, hash functions, and Pseudo-Random Functions (PRFs).
    It can process a list of proposals and return a formatted string summarizing the cryptographic parameters.
    Attributes:

        dh_mapping (dict): A mapping of Diffie-Hellman groups to their corresponding identifiers

        enc_mapping (dict): A mapping of encryption algorithms to their corresponding identifiers

        hash_mapping (dict): A mapping of hash functions to their corresponding identifiers

        prf_mapping (dict): A mapping of Pseudo-Random Functions to their corresponding identifiers

    Methods:
        parse_ike_proposal(proposal): Parses a single IKE or ESP proposal string into a structured dictionary with encryption, hash, PRF, and DH group information.

        process_proposals(proposal_list): Processes a comma-separated list of IKE or ESP proposals, concatenating encryption, hash, PRF (for IKE), and DH group values, and indicating whether PFS is enabled for ESP proposals.

    """
    
    def __init__(self):
        """Initialize the parser with mappings for DH groups, encryption, hash, and PRF."""
        self.dh_mapping = {
            'MODP_768': '1',
            'MODP_1024': '2',
            'MODP_1536': '5',
            'MODP_2048': '14',
            'MODP_3072': '15',
            'MODP_4096': '16',
            'MODP_6144': '17',
            'MODP_8192': '18',
            'ECP_256': '19',
            'ECP_384': '20',
            'ECP_521': '21',
            'ECP_192': '25',
            'ECP_224': '26',
            'MODP_1024_160': '22',
            'MODP_2048_224': '23',
            'MODP_2048_256': '24',
            'FFDHE_2048': '256',
            'FFDHE_3072': '257',
            'FFDHE_4096': '258',
            'FFDHE_6144': '259',
            'FFDHE_8192': '260',
            'ECP_224_BP': '27',
            'ECP_256_BP': '28',
            'ECP_384_BP': '29',
            'ECP_512_BP': '30',
            'CURVE_25519': '31',
            'CURVE_448': '32',
        }
        
        self.enc_mapping = {
            'AES_CBC_128': 'AES128',
            'AES_CBC_192': 'AES192',
            'AES_CBC_256': 'AES256',
            'AES_GCM_16_128': 'AES128-GCM-16',
            'AES_GCM_16_192': 'AES192-GCM-16',
            'AES_GCM_16_256': 'AES256-GCM-16',
            'AES_GCM_8_128': 'AES128-GCM-8',
            'AES_GCM_8_256': 'AES256-GCM-8',
            'AES_GCM_12_128': 'AES128-GCM-12',
            'AES_GCM_12_256': 'AES256-GCM-12',
            'AES_CCM_16_128': 'AES128-CCM-16',
            'AES_CCM_16_256': 'AES256-CCM-16',
            'AES_CTR_128': 'AES128-CTR',
            'AES_CTR_192': 'AES192-CTR',
            'AES_CTR_256': 'AES256-CTR',
            '3DES_CBC': '3DES',
            'DES_CBC': 'DES',
            'CAMELLIA_CBC_128': 'CAMELLIA128',
            'CAMELLIA_CBC_256': 'CAMELLIA256',
            'CHACHA20_POLY1305': 'CHACHA20-POLY1305',
            'BLOWFISH_CBC': 'BLOWFISH',
            'CAST5_CBC': 'CAST5'
        }
        
        self.hash_mapping = {
            'HMAC_MD5': 'MD5',
            'HMAC_MD5_96': 'MD5',
            'HMAC_SHA1': 'SHA1',
            'HMAC_SHA1_96': 'SHA1',
            'HMAC_SHA2_256': 'SHA2-256',
            'HMAC_SHA2_256_128': 'SHA2-256',
            'HMAC_SHA2_384': 'SHA2-384',
            'HMAC_SHA2_384_192': 'SHA2-384',
            'HMAC_SHA2_512': 'SHA2-512',
            'HMAC_SHA2_512_256': 'SHA2-512',
            'HMAC_SHA3_224': 'SHA3-224',
            'HMAC_SHA3_256': 'SHA3-256',
            'HMAC_SHA3_384': 'SHA3-384',
            'HMAC_SHA3_512': 'SHA3-512',
            'AES_GMAC_128': 'GMAC-128',
            'AES_GMAC_192': 'GMAC-192',
            'AES_GMAC_256': 'GMAC-256',
            'POLY1305': 'POLY1305'
        }
        
        self.prf_mapping = {
            'PRF_HMAC_MD5': 'MD5',
            'PRF_HMAC_SHA1': 'SHA1',
            'PRF_HMAC_SHA2_256': 'SHA2-256',
            'PRF_HMAC_SHA2_384': 'SHA2-384',
            'PRF_HMAC_SHA2_512': 'SHA2-512',
            'PRF_AES128_CMAC': 'AES128-CMAC',
            'PRF_AES128_XCBC': 'AES128-XCBC',
            'PRF_HMAC_SHA3_224': 'SHA3-224',
            'PRF_HMAC_SHA3_256': 'SHA3-256',
            'PRF_HMAC_SHA3_384': 'SHA3-384',
            'PRF_HMAC_SHA3_512': 'SHA3-512'
        }

    def parse_ike_proposal(self, proposal):
        """
        Parse an IKE or ESP proposal string into a structured format.

        Args:
            proposal (str): The proposal string, e.g., "IKE:AES_CBC_256/HMAC_SHA2_256/PRF_HMAC_SHA2_256/MODP_2048"
            
        Returns:
            dict: A dictionary with keys 'encryption', 'hash', 'prf', and 'dh_group'
        """

        # Split the proposal into components based on '/'
        components = proposal.split('/')
        
        result = {
            'encryption': [],
            'hash': [],
            'prf': [],
            'dh_group': []
        }
        
        
        is_ike = proposal.startswith('IKE:')
        is_esp = proposal.startswith('ESP:')
        

        # Remove IKE or ESP prefix if present for easier parsing later
        if is_ike or is_esp:
            components[0] = components[0].replace('IKE:', '').replace('ESP:', '')
        

        
        # Determine the current section based on the first component
        # Determine the current section based on the first component
        enc_components = []
        hash_components = []
        prf_components = []
        dh_components = []
        unknown_components = []  # To track unrecognized components

        for component in components:
            categorized = False

            # Encryption components
            if (component in self.enc_mapping or 
                any(s in component for s in ['AES_CBC', 'AES_GCM', 'AES_CTR', 'CHACHA20', 
                                            'BLOWFISH', 'CAST5', 'DES', '3DES', 'CAMELLIA'])):
                enc_components.append(component)
                categorized = True

            # Hash components
            if (component in self.hash_mapping or 
                any(s in component for s in [ 'HMAC_MD5', 'POLY1305', 'AES_GMAC'])):
                hash_components.append(component)
                categorized = True
            else:
                hash_components.append("None")

            # PRF components (only if is_ike is True)
            if is_ike and 'PRF_' in component:
                prf_components.append(component)
                categorized = True

            # Diffie-Hellman components
            if (component in self.dh_mapping or 
                any(s in component for s in ['MODP', 'ECP', 'FFDHE', 'CURVE'])):
                dh_components.append(component)
                categorized = True

            # Skip irrelevant components
            if component == 'NO_EXT_SEQ':
                continue

            # Log unrecognized components
            if not categorized:
                unknown_components.append(component)

        # Optional: Log or handle unknown components
        if unknown_components:
            print(f"Warning: Unrecognized components: {unknown_components}")
        

        # Map encryption components
        for enc in enc_components:
            mapped_enc = self.enc_mapping.get(enc, 'Unknown')
            if mapped_enc != 'Unknown' and mapped_enc not in result['encryption']:
                result['encryption'].append(mapped_enc)
        
        # Map hash components (skip for AEAD ciphers like AES-GCM)
        #if not any(enc.startswith('AES_GCM') or enc.startswith('AES_CCM') or enc == 'CHACHA20_POLY1305' for enc in enc_components):
        for hash_val in hash_components:
            print(hash_components)
            mapped_hash = self.hash_mapping.get(hash_val, 'Unknown')
            if mapped_hash != 'Unknown' and mapped_hash not in result['hash']:
                result['hash'].append(mapped_hash)
            if mapped_hash == 'Unknown':
                result['hash'].append(mapped_hash)

        
        # Map PRF components
        for prf in prf_components:
            mapped_prf = self.prf_mapping.get(prf, 'Unknown')
            if mapped_prf == "Unknown":
                result['prf'].append(mapped_prf)
            if mapped_prf != 'Unknown' and mapped_prf not in result['prf']:
                result['prf'].append(mapped_prf)


        # Map DH group components
        for dh in dh_components:
            mapped_dh = self.dh_mapping.get(dh)
            if mapped_dh != 'None' and mapped_dh not in result['dh_group']:
                result['dh_group'].append(mapped_dh)

        
        # Handle ESP case (no PRF for ESP proposals)
        if is_esp:
            result['prf'] = ['None']
        
        if not result['encryption']:
            result['encryption'] = ['Unknown']

        # if not result['hash']:
        #     result['hash'] = ['Unknown']
        
        print(result['hash'])

        return result

    def process_proposals(self, proposal_list):
        """
        Process a list of IKE or ESP proposals, concatenating encryption, hash, PRF (for IKE only), 
        and DH group values, and indicate whether PFS is enabled for ESP proposals only.
        
        Args:
            proposal_list (str): Comma-separated string of IKE or ESP proposals
        
        Returns:
            str: Formatted string with concatenated encryption, hash, PRF (for IKE), DH groups, and PFS status (for ESP)
        """
        proposal_list = proposal_list.replace(',', ', ')
        proposals = proposal_list.strip().split(', ')

        
        # Collect unique encryption, hash, PRF, and DH groups
        enc_set = set()
        hash_set = set()
        prf_set = set()
        dh_set = set()
        


        # Parse each proposal and update the sets for later sorting and formatting
        for proposal in proposals:
            parsed = self.parse_ike_proposal(proposal.strip())
            print(parsed)
            enc_set.update(parsed['encryption'])
            if parsed['hash'] != ['None']:
                hash_set.update(parsed['hash'])
            else:
                hash_set.update(parsed['hash'])
            if parsed['prf'] != ['None']:
                prf_set.update(parsed['prf'])
            if parsed['dh_group'] != ['None']:
                dh_set.update(parsed['dh_group'])

        
        # Convert sets to sorted lists
        enc_list = sorted(list(enc_set))
        hash_list = sorted(list(hash_set))
        prf_list = sorted(list(prf_set))
        dh_list = sorted(list(dh_set), key=lambda x: int(x))

        
        # Determine PFS status for ESP proposals only
        is_ike = any(proposal.startswith('IKE:') for proposal in proposals)
        pfs_status = "PFS: Enabled" if dh_set and not is_ike else "PFS: None"

        
        # Format output as a single concatenated string
        enc_part = f"Encryption: {', '.join(enc_list)}" if enc_list else "Encryption: None"
        hash_part = f"Hash: {', '.join(hash_list)}"
        dh_part = f"DH Group(s): {', '.join(dh_list)}" if dh_list else "DH Group(s): None"
        prf_part = f"PRF: {', '.join(prf_list)}" if prf_list else "PRF: None"

        
        # Return formatted string based on whether it's an IKE or ESP proposal
        if is_ike:
            return f"{enc_part} {hash_part} {prf_part} {dh_part}"
        else:
            return f"{enc_part} {hash_part} {dh_part} {pfs_status}"
            
# Example usage
if __name__ == "__main__":
    parser = ProposalParser()

    unknown_hash = "IKE:AES_CBC_256/INVALID_HASH/PRF_HMAC_SHA2_256/MODP_2048"
    none_hash = "IKE:AES_CBC_256/PRF_HMAC_SHA2_256/MODP_2048"

    # Outputs Encryption: AES256 Hash: Unknown PRF: SHA2-256 DH Group(s): 14 <-- Correct!
    print(parser.process_proposals(unknown_hash))

    # Outputs Encryption: AES256 Hash: Unknown PRF: SHA2-256 DH Group(s): 14 <-- Incorrect
    print(parser.process_proposals(none_hash))