Skip to main content
Loading...

More Python Posts

import subprocess
import logging
import re
from typing import Dict, Any, List, Optional


class BGPRouter:
    """BGP Router class for parsing and managing BGP route information."""
    
    def __init__(self, local_asn: str = '65412'):
        """Initialize BGP router with local ASN.
        
        Args:
            local_asn: Local BGP ASN number
        """
        self.local_asn = local_asn
        
    def _normalize_network_cidr(self, network: str) -> str:
        """Normalize network address by adding appropriate CIDR notation.
        
        Args:
            network: Network address (e.g., "172.31.0.0" or "172.16.0.1/32")
            
        Returns:
            Network address with appropriate CIDR notation
        """
        if '/' in network:
            return network
            
        try:
            octets = network.split('.')
            if len(octets) != 4:
                return network  # Invalid IP format or IPv6, return as-is

            # Determine CIDR based on trailing zero pattern

            # Check for default route
            if network == '0.0.0.0':
                return '0.0.0.0/0'

            if octets[1:] == ['0', '0', '0']:
                return f"{network}/8"

            if octets[2:] == ['0', '0']:
                return f"{network}/16"

            if octets[3] == '0':
                return f"{network}/24"
 
        except (ValueError, IndexError):
            return network
        

    def _parse_as_path(self, path_info: str) -> str:
        """Extract AS path from BGP path information.
        
        Args:
            path_info: Raw path information from BGP output
            
        Returns:
            Cleaned AS path string
        """
        path_info = path_info.strip()
        
        # Handle internal routes
        if path_info == 'i':
            return self.local_asn
            
        # Extract AS numbers using list comprehension
        path_parts = path_info.split()
        as_numbers = [part for part in path_parts if part.isdigit()]
        
        return ' '.join(as_numbers)

    def _parse_route_line(self, line: str) -> Optional[Dict[str, Any]]:
        """Parse a single BGP route line.
        
        Args:
            line: BGP route line from show command output
            
        Returns:
            Route dictionary or None if parsing fails
        """
        pattern = r'^\*>\s+(\S+)\s+(\S+)\s+(\d+)\s+(\d+)\s+(.+)$'
        match = re.match(pattern, line)
        if not match:
            return None
            
        network, next_hop, metric_str, weight_str, path_info = match.groups()
        
        try:
            return {
                "network": self._normalize_network_cidr(network),
                "nextHopIp": next_hop,
                "med": int(metric_str),
                "localPref": 100, # Always 100 for learned routes on PEs
                "weight": int(weight_str),
                "asPath": self._parse_as_path(path_info)
            }
        except ValueError as e:
            logging.warning(f"Failed to parse route line '{line}': {e}")
            return None

    def _find_route_start_index(self, lines: List[str]) -> Optional[int]:
        """Find the index where BGP routes start in the output.
        
        Args:
            lines: List of output lines
            
        Returns:
            Index of first route line or None if not found
        """
        for i, line in enumerate(lines):
            if 'Network' in line and 'Next Hop' in line:
                return i + 1
        return None

    def _get_all_bgp_routes(self) -> Dict[str, List[Dict[str, Any]]]:
        """Parse BGP route table output and return structured data.
        
        Returns:
            Dictionary containing list of BGP routes
        """
        try:
            output = """BGP table version is 0, local router ID is 169.254.112.97
Status codes: s suppressed, d damped, h history, * valid, > best, i - internal,
            r RIB-failure, S Stale, R Removed
Origin codes: i - IGP, e - EGP, ? - incomplete

Network          Next Hop            Metric LocPrf Weight Path
*> 0.0.0.0          169.254.112.98        0             0 65000 i
*> 172.16.0.1/32    169.254.112.98        0             0 65000 i
*> 172.16.0.2/32    169.254.112.98        0             0 65000 65114 49449 i
*> 172.16.0.3/32    169.254.112.98        0             0 65000 i
*> 172.16.0.4/32    169.254.112.98        0             0 65000 i
*> 172.16.0.5/32    169.254.112.98        0             0 65000 i
*> 172.16.0.112/32  169.254.112.98        0             0 65000 i
*> 172.16.0.113/32  169.254.112.98        0             0 65000 i
*> 172.16.0.114/32  169.254.112.98        0             0 65000 i
*> 172.16.0.115/32  169.254.112.98        0             0 65000 i
*> 172.16.0.116/32  169.254.112.98        0             0 65000 i
*> 172.16.0.117/32  169.254.112.98        0             0 65000 i
*> 172.16.0.118/32  169.254.112.98        0             0 65000 i
*> 172.16.0.119/32  169.254.112.98        0             0 65000 i
*> 172.16.0.120/32  169.254.112.98        0             0 65000 i
*> 172.16.0.121/32  169.254.112.98        0             0 65000 i
*> 172.31.0.0       169.254.112.97        100           32768 i
*> 172.0.0.0        169.254.112.97        100           32768 i
*> 172.1.1.0        169.254.112.97        100           32768 i
*> 172.31.0.1/32    169.254.112.97        100           32768 i
*> 172.31.0.2/32    169.254.112.97        100           32768 i
*> 172.31.0.3/32    169.254.112.97        100           32768 i
*> 172.31.0.4/32    169.254.112.97        100           32768 i
*> 2001:db8:2::/64  fe80::2               100           32768 i
*> 2001:db8:2::1/128 fe80::2              100           32768 i
*> 2001:db8:2::2/128 fe80::2              100           32768 i
*> 2001:db8:2::3/128 fe80::2              100           32768 i
*> 2001:db8:2::4/128 fe80::2              100           32768 i

Total number of prefixes 3233"""
            
            # Check if command output is valid
            if not output or not output.strip():
                logging.warning("BGP command returned empty output")
                return {"routes": []}
            
            lines = output.strip().split('\n')
            route_start_idx = self._find_route_start_index(lines)
            
            if route_start_idx is None:
                logging.warning("BGP output does not contain expected header format")
                return {"routes": []}
            
            # Parse routes using list comprehension and filter
            route_lines = [
                line.strip() for line in lines[route_start_idx:]
                if line.strip() and line.strip().startswith('*>') 
                and not line.strip().startswith('Total number')
            ]
            
            # Parse routes and convert to dictionaries
            routes = []
            for line in route_lines:
                route = self._parse_route_line(line)
                if route is not None:
                    routes.append(route)
            
            return {"routes": routes}
            
        except Exception as e:
            logging.error(f"Failed to get BGP routes: {e}")
            return {"routes": []}
    
    def get_specific_network(self, bgp_properties: Dict[str, Any], prefix: str) -> Dict[str, Any]:
        """Find specific network in BGP properties.
        
        Args:
            bgp_properties: BGP properties dictionary
            prefix: Network prefix to search for
            
        Returns:
            Route information dictionary or empty dict if not found
        """
        if not bgp_properties or not isinstance(bgp_properties, dict):
            logging.warning(f"Invalid BGP properties: {type(bgp_properties)}")
            return {}
            
        routes = bgp_properties.get("routes", [])
        logging.debug(f"Searching for prefix '{prefix}' in {len(routes)} routes")
        
        # Use next() with generator expression for efficient search
        try:
            route = next(
                route for route in routes 
                if route.get("network") == prefix
            )
            logging.debug(f"Found matching route for prefix '{prefix}': {route}")
            return route
        except StopIteration:
            logging.debug(f"No route found for prefix '{prefix}'")
            return {}


def main():
    """Main function to demonstrate BGP router functionality."""
    router = BGPRouter()
    
    # Get all BGP routes
    bgp_routes = router._get_all_bgp_routes()
    print(f"Parsed {len(bgp_routes['routes'])} BGP routes")
    print(bgp_routes)
    
    # # Display first few routes
    # for i, route in enumerate(bgp_routes['routes'][:3]):
    #     print(f"Route {i+1}: {route}")
    
    # # Search for specific prefix
    # specific_prefix = "172.16.0.1/32"
    # route_info = router.get_specific_network(bgp_routes, specific_prefix)
    
    # if route_info:
    #     print(f"Found route for {specific_prefix}: {route_info}")
    # else:
    #     print(f"No route found for {specific_prefix}")


if __name__ == "__main__":
    main()
import os, json, boto3, requests
from flask import Flask, request, jsonify
from flask_cors import CORS, cross_origin
from random import shuffle

app = Flask(__name__)
cors = CORS(app)

dynamodb = boto3.resource("dynamodb", region_name="us-east-1")

app.url_map.strict_slashes = False
SECRET_KEY = os.environ.get("SECRET_KEY")


@app.route("/teks")
def teks_request():
    teks_file = open("teks.json", "r")
    data = json.load(teks_file)
    return jsonify(data)


@app.route("/teks/find/113.41.<int:teks_id>.<string:section_id>")
def teks_find_request(teks_id, section_id):
    teks_file = open("teks.json", "r")
    data = json.load(teks_file)
    for item in data:
        if item["id"] == teks_id:
            for child in item["children"]:
                if child["id"] == section_id:
                    return {"tek": item, "content": child["content"]}
    return jsonify(
        [
            f"Something went wrong. TEKS section id of {section_id} cannot be found within TEKS section {teks_id}."
        ]
    )

@app.route("/lessonplan/read/<id>")
def read_lesson_plan(id):
    lesson_table = dynamodb.Table("Lesson_Plans")
    items = lesson_table.scan()['Items']
    for lesson in items:
        if (lesson["uuid"] == id):
            return jsonify(lesson)
    return {"error": "id does not exist", "section": id}

@app.route("/teks/<int:teks_id>")
def teks_id_request(teks_id):
    teks_file = open("teks.json", "r")
    data = json.load(teks_file)
    for item in data:
        if item["id"] == teks_id:
            return jsonify(item)
    return jsonify([f"Something went wrong. TEKS id of {teks_id} cannot be found."])


@app.route("/assessment/write", methods=["GET", "POST"])
def assessment_write():
    assessment_json = request.json
    assessment_data = dict(assessment_json)
    assessment_table = dynamodb.Table("Assessments")
    assessment_table.put_item(Item=assessment_data)

    if assessment_data == get_assessment(assessment_data["id"]):
        return "Success"
    else:
        return "Failure"

@app.route("/students/read/<id>")
def students_read(id):
    return jsonify(get_students(id))

@app.route("/students/read")
def all_students_read():
    student_table = dynamodb.Table("Students") 
    items = student_table.scan()['Items']
    return jsonify(items)

@app.route("/assessment/read/<id>")
def assessment_read(id):
    return jsonify(get_assessment(id))


@app.route("/assessment/submit/<id>", methods=["POST"])
def submit_assessment(id):
    assessments_table = dynamodb.Table("Assessments")
    assessment = assessments_table.get_item(Key={"id": id})

    if not assessment.get("Item"):
        return {"error": "id does not exist", "section": id}

    responses = {
        question["id"]: question["response"]
        for question in request.json.get("questions")
    }

    correct_answers = 0

    for response in responses:
        # print(
        #     (
        #         responses[response],
        #         find_question(assessment.get("Item").get("questions"), response).get(
        #             "correctAnswer"
        #         ),
        #     )
        # )
        if responses[response] == find_question(
            assessment.get("Item").get("questions"), response
        ).get("correctAnswer"):

            correct_answers += 1

    score = correct_answers / len(request.json.get("questions"))

    users_table = dynamodb.Table("Students")

    users_table.update_item(
        Key={"uuid": request.json.get("student_id")},
        UpdateExpression="SET completedAssessments = list_append(completedAssessments, :i)",
        ExpressionAttributeValues={
            ":i": [
                {
                    "id": id,
                    "score": round(score * 100),
                }
            ]
        },
    )

    message = None
    if round(score * 100) > 70:
        message = f"Congratulations! You passed your assessment with a {round(score * 100)}%."
    else:
        message = f"You failed your assessment with a {round(score * 100)}%."

    sns = boto3.client("sns", region_name="us-east-1")
    number = "+15125967383"
    sns.publish(PhoneNumber=number, Message=message)

    return {"score": score, "message": message}


def find_question(all, id):
    #print("id to find: ", id)
    for question in all:
        if question["id"] == id:
            #print(question)
            return question


def get_assessment(id):
    assessment_table = dynamodb.Table("Assessments")
    results = assessment_table.get_item(Key={"id": id})

    if results.get("Item") is None:
        return {"error": "id does not exist", "section": id}
    else:
        quiz = results.get("Item")
        return {
            "title": quiz.get("title"),
            "id": quiz.get("id"),
            "questions": [
                {
                    "id": question.get("id"),
                    "title": question.get("title"),
                    "options": random_answers(
                        question.get("incorrectAnswers")
                        + [question.get("correctAnswer")]
                    ),
                }
                for question in quiz.get("questions")
            ],
        }

def get_students(id):
    students_table = dynamodb.Table('Students')
    results = students_table.get_item(Key = {
        "uuid": id
    })

    if(results.get("Item") is None):
        return {'error': 'id does not exist', 'section': id}
    else:
        student = results.get("Item")
        return student

def lesson_plans_read():
    student_table = dynamodb.Table("Lesson_Plans")
    items = student_table.scan()['Items']
    return jsonify(items)

def random_answers(answers):
    shuffle(answers)
    return answers

@app.route("/recommendations/<uuid>")
def get_recommendation(uuid):
    student_info_table = dynamodb.Table('Students')
    lesson_plans_table = dynamodb.Table('Lesson_Plans')
    student = get_students(uuid)
    tek = student.get("struggleTeks")[0]

    lesson_plans = lesson_plans_table.scan( Select='ALL_ATTRIBUTES', FilterExpression='tek = :s', ExpressionAttributeValues={ ":s": tek })

    #print(lesson_plans) 

    return jsonify({"student": student, "lesson_plans": lesson_plans.get("Items")})

if __name__ == "__main__":
    app.run(host="0.0.0.0", debug=True)