Skip to main content
Loading...

More Python Posts

#Sets
U = {0,1,2,3,4,5,6,7,8,9}
P = {1,2,3,4}
Q = {4,5,6}
R = {3,4,6,8,9}

def set2bits(xs,us) :
    bs=[]
    for x in us :
        if x in xs :
            bs.append(1)
        else:
            bs.append(0)
    assert len(us) == len(bs)
    return bs

def union(set1,set2) :
    finalSet = set()
    bitList1 = set2bits(set1, U)
    bitList2 = set2bits(set2, U)

    for i in range(len(U)) :
        if(bitList1[i] or bitList2[i]) :
            finalSet.add(i)

    return finalSet

def intersection(set1,set2) :
    finalSet = set()
    bitList1 = set2bits(set1, U)
    bitList2 = set2bits(set2, U)

    for i in range(len(U)) :
        if(bitList1[i] and bitList2[i]) :
            finalSet.add(i)

    return finalSet

def compliment(set1) :
    finalSet = set()
    bitList = set2bits(set1, U)

    for i in range(len(U)) :
        if(not bitList[i]) :
            finalSet.add(i)

    return finalSet

def implication(a,b):
    return union(compliment(a), b)

###########################################################################################
######################         Problems 1-6         #######################################
###########################################################################################

#p \/ (q /\ r) = (p \/ q) /\ (p \/ r)
def prob1():
    return union(P, intersection(Q,R)) == intersection(union(P,Q), union(P,R))

#p /\ (q \/ r) = (p /\ q) \/ (p /\ r)
def prob2():
    return intersection(P, union(Q,R)) == union(intersection(P,Q), intersection(P,R))

#~(p /\ q) = ~p \/ ~q
def prob3():
    return compliment(intersection(P,R)) == union(compliment(P), compliment(R))

#~(p \/ q) = ~p /\ ~q
def prob4():
    return compliment(union(P,Q)) == intersection(compliment(P), compliment(Q))

#(p=>q) = (~q => ~p)
def prob5():
    return implication(P,Q) == implication(compliment(Q), compliment(P))

#(p => q) /\ (q => r)  =>  (p => r)
def prob6():
    return implication(intersection(implication(P,Q), implication(Q,R)), implication(P,R))


print("Problem 1: ", prob1())
print("Problem 2: ", prob2())
print("Problem 3: ", prob3())
print("Problem 4: ", prob4())
print("Problem 5: ", prob5())
print("Problem 6: ", prob6())

'''
Problem 1:  True
Problem 2:  True
Problem 3:  True
Problem 4:  True
Problem 5:  True
Problem 6:  {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
'''
import re

def _map_ikev2_vendor_capabilities(message_type, input_string):
    # List of acceptable message types
    valid_message_types = ['IKE_SA_INIT', 'IKE_AUTH']
    
    # Validate message type
    if message_type not in valid_message_types:
        raise ValueError(f"Invalid message type: {message_type}. Must be one of {valid_message_types}")
    
    # Mapping dictionary for IKE values with RFC references
    value_map = {
        # IKE_SA_INIT capabilities
        'FRAG_SUP': 'IKE Fragmentation',  # RFC 7383, Section 3
        'REDIR_SUP': 'IKE Redirection',  # RFC 5685, Section 3
        'HASH_ALG': 'Hash Algorithms',  # RFC 7296, Section 3.3.2
        'NATD_S_IP': 'NAT Detection (Source IP)',  # RFC 7296, Section 2.23
        'NATD_D_IP': 'NAT Detection (Destination IP)',  # RFC 7296, Section 2.23
        'SIGN_HASH_ALGS': 'Signature Hash Algorithms',  # RFC 7296, Section 2.15
        'NON_FIRST_FRAGMENTS': 'Non-First IKE Fragments',  # RFC 7383, Section 3
        'CHILDLESS_IKEV2_SUP': 'Childless IKEv2',  # RFC 6023, Section 3
        'INTERMEDIATE': 'Intermediate Exchange',  # RFC 9242, Section 3
        'COOKIE': 'Cookie-Based DoS Protection',  # RFC 7296, Section 2.6
        # IKE_AUTH capabilities
        'ESP_TFC_PAD_N': 'ESPv3 TFC Padding Disabled',  # RFC 7296, Section 3.3.1
        'MOBIKE_SUP': 'MOBIKE',  # RFC 4555, Section 3
        'MULT_AUTH': 'Multiple Auth',  # RFC 4739, Section 3
        'EAP_ONLY': 'EAP-Only Auth',  # RFC 5998, Section 3
        'MSG_ID_SYN_SUP': 'Message ID Synchronization',  # RFC 6311, Section 3
        'IPCOMP_SUPPORTED': 'IP Payload Compression Support',  # RFC 7296, Section 3.3.2
        'ADD_4_ADDR': 'Additional IPv4 Addresses',  # RFC 4555, Section 3.2
        'ADD_6_ADDR': 'Additional IPv6 Addresses',  # RFC 4555, Section 3.2
        'INIT_CONTACT': 'Initial Contact',  # RFC 7296, Section 3.16
        'HTTP_CERT_LOOKUP_SUP': 'HTTP Certificate Lookup',  # RFC 7296, Section 3.7
    }
        
    # Regex to capture N(...) patterns
    pattern = r'N\([^)]+\)'
    matches = re.findall(pattern, input_string)
    
    # Parse matches and create result list
    result = []
    for match in matches:
        key = match[2:-1]  # Extract content inside N(...)
        # Only include keys valid for the message type
        if key in value_map:
            result.append(value_map[key])
    
    return ', '.join(result)

# Example usage
ike_sa_init_string = '2025-07-18 20:16:22.839 15[ENC] <4> parsed IKE_SA_INIT request 0 [ SA KE No N(NATD_S_IP) N(NATD_D_IP) N(FRAG_SUP) N(HASH_ALG) N(MULT_AUTH) ]'
ike_auth_string = '2025-07-18 20:16:22.898 07[ENC] <4> parsed IKE_AUTH request 1 [ IDi N(INIT_CONTACT) IDr AUTH N(ESP_TFC_PAD_N) SA TSi TSr N(MOBIKE_SUP) N(ADD_4_ADDR)N(MULT_AUTH) N(EAP_ONLY) N(MSG_ID_SYN_SUP) ]'

# Test with IKE_SA_INIT
print("IKE_SA_INIT Results:")
print(_map_ikev2_vendor_capabilities("IKE_SA_INIT", ike_sa_init_string))

# Test with IKE_AUTH
print("\nIKE_AUTH Results:")
print(_map_ikev2_vendor_capabilities("IKE_AUTH", ike_auth_string))
import os, json, boto3, requests
from flask import Flask, request, jsonify
from flask_cors import CORS, cross_origin
from random import shuffle

app = Flask(__name__)
cors = CORS(app)

dynamodb = boto3.resource("dynamodb", region_name="us-east-1")

app.url_map.strict_slashes = False
SECRET_KEY = os.environ.get("SECRET_KEY")


@app.route("/teks")
def teks_request():
    teks_file = open("teks.json", "r")
    data = json.load(teks_file)
    return jsonify(data)


@app.route("/teks/find/113.41.<int:teks_id>.<string:section_id>")
def teks_find_request(teks_id, section_id):
    teks_file = open("teks.json", "r")
    data = json.load(teks_file)
    for item in data:
        if item["id"] == teks_id:
            for child in item["children"]:
                if child["id"] == section_id:
                    return {"tek": item, "content": child["content"]}
    return jsonify(
        [
            f"Something went wrong. TEKS section id of {section_id} cannot be found within TEKS section {teks_id}."
        ]
    )

@app.route("/lessonplan/read/<id>")
def read_lesson_plan(id):
    lesson_table = dynamodb.Table("Lesson_Plans")
    items = lesson_table.scan()['Items']
    for lesson in items:
        if (lesson["uuid"] == id):
            return jsonify(lesson)
    return {"error": "id does not exist", "section": id}

@app.route("/teks/<int:teks_id>")
def teks_id_request(teks_id):
    teks_file = open("teks.json", "r")
    data = json.load(teks_file)
    for item in data:
        if item["id"] == teks_id:
            return jsonify(item)
    return jsonify([f"Something went wrong. TEKS id of {teks_id} cannot be found."])


@app.route("/assessment/write", methods=["GET", "POST"])
def assessment_write():
    assessment_json = request.json
    assessment_data = dict(assessment_json)
    assessment_table = dynamodb.Table("Assessments")
    assessment_table.put_item(Item=assessment_data)

    if assessment_data == get_assessment(assessment_data["id"]):
        return "Success"
    else:
        return "Failure"

@app.route("/students/read/<id>")
def students_read(id):
    return jsonify(get_students(id))

@app.route("/students/read")
def all_students_read():
    student_table = dynamodb.Table("Students") 
    items = student_table.scan()['Items']
    return jsonify(items)

@app.route("/assessment/read/<id>")
def assessment_read(id):
    return jsonify(get_assessment(id))


@app.route("/assessment/submit/<id>", methods=["POST"])
def submit_assessment(id):
    assessments_table = dynamodb.Table("Assessments")
    assessment = assessments_table.get_item(Key={"id": id})

    if not assessment.get("Item"):
        return {"error": "id does not exist", "section": id}

    responses = {
        question["id"]: question["response"]
        for question in request.json.get("questions")
    }

    correct_answers = 0

    for response in responses:
        # print(
        #     (
        #         responses[response],
        #         find_question(assessment.get("Item").get("questions"), response).get(
        #             "correctAnswer"
        #         ),
        #     )
        # )
        if responses[response] == find_question(
            assessment.get("Item").get("questions"), response
        ).get("correctAnswer"):

            correct_answers += 1

    score = correct_answers / len(request.json.get("questions"))

    users_table = dynamodb.Table("Students")

    users_table.update_item(
        Key={"uuid": request.json.get("student_id")},
        UpdateExpression="SET completedAssessments = list_append(completedAssessments, :i)",
        ExpressionAttributeValues={
            ":i": [
                {
                    "id": id,
                    "score": round(score * 100),
                }
            ]
        },
    )

    message = None
    if round(score * 100) > 70:
        message = f"Congratulations! You passed your assessment with a {round(score * 100)}%."
    else:
        message = f"You failed your assessment with a {round(score * 100)}%."

    sns = boto3.client("sns", region_name="us-east-1")
    number = "+15125967383"
    sns.publish(PhoneNumber=number, Message=message)

    return {"score": score, "message": message}


def find_question(all, id):
    #print("id to find: ", id)
    for question in all:
        if question["id"] == id:
            #print(question)
            return question


def get_assessment(id):
    assessment_table = dynamodb.Table("Assessments")
    results = assessment_table.get_item(Key={"id": id})

    if results.get("Item") is None:
        return {"error": "id does not exist", "section": id}
    else:
        quiz = results.get("Item")
        return {
            "title": quiz.get("title"),
            "id": quiz.get("id"),
            "questions": [
                {
                    "id": question.get("id"),
                    "title": question.get("title"),
                    "options": random_answers(
                        question.get("incorrectAnswers")
                        + [question.get("correctAnswer")]
                    ),
                }
                for question in quiz.get("questions")
            ],
        }

def get_students(id):
    students_table = dynamodb.Table('Students')
    results = students_table.get_item(Key = {
        "uuid": id
    })

    if(results.get("Item") is None):
        return {'error': 'id does not exist', 'section': id}
    else:
        student = results.get("Item")
        return student

def lesson_plans_read():
    student_table = dynamodb.Table("Lesson_Plans")
    items = student_table.scan()['Items']
    return jsonify(items)

def random_answers(answers):
    shuffle(answers)
    return answers

@app.route("/recommendations/<uuid>")
def get_recommendation(uuid):
    student_info_table = dynamodb.Table('Students')
    lesson_plans_table = dynamodb.Table('Lesson_Plans')
    student = get_students(uuid)
    tek = student.get("struggleTeks")[0]

    lesson_plans = lesson_plans_table.scan( Select='ALL_ATTRIBUTES', FilterExpression='tek = :s', ExpressionAttributeValues={ ":s": tek })

    #print(lesson_plans) 

    return jsonify({"student": student, "lesson_plans": lesson_plans.get("Items")})

if __name__ == "__main__":
    app.run(host="0.0.0.0", debug=True)
import subprocess
import json
import re


def parse_bgp_routes(bgp_output):
    """
    Parse BGP route table output and return structured data.
    
    Args:
        bgp_output (str): Raw BGP table output
        
    Returns:
        dict: Structured BGP routes data
    """
    routes = []
    
    # Split into lines and find the route entries
    lines = bgp_output.strip().split('\n')
    
    # Find the header line to identify where routes start
    route_start_idx = None
    for i, line in enumerate(lines):
        if 'Network' in line and 'Next Hop' in line:
            route_start_idx = i + 1
            break
    
    if route_start_idx is None:
        return {"routes": routes}
    
    # Parse each route line
    for line in lines[route_start_idx:]:
        line = line.strip()
        
        # Skip empty lines and summary lines
        if not line or line.startswith('Total number') or line.startswith('IPv6'):
            continue
            
        # Skip lines that don't start with route status indicators
        if not line.startswith('*>'):
            continue
        
        # Parse the route line using regex
        # Pattern matches: *> network next_hop metric [locprf] weight path
        # Note: LocPrf column may be empty, so we'll set it to 0 for all records
        pattern = r'^\*>\s+(\S+)\s+(\S+)\s+(\d+)\s+(\d+)\s+(.+)$'
        match = re.match(pattern, line)
        
        if match:
            network = match.group(1)
            next_hop = match.group(2)
            metric = int(match.group(3))
            loc_prf = 0  # Set to 0 for all records as requested
            weight = int(match.group(4))
            path_info = match.group(5).strip()
            
            # Extract AS path (remove origin code)
            path_parts = path_info.split()
            as_path = []
            for part in path_parts:
                if part.isdigit():
                    as_path.append(part)
            
            path = ' '.join(as_path) if as_path else ""
            
            route = {
                "network": network,
                "nextHop": next_hop,
                "metric": metric,
                "locPrf": loc_prf,
                "weight": weight,
                "path": path
            }
            routes.append(route)
    
    return {"routes": routes}


def get_bgp_routes_json():
    """
    Get BGP routes from sample data and return as JSON.
    
    Returns:
        str: JSON string of BGP routes
    """
    output_text = """IPv4
BGP table version is 0, local router ID is 169.254.0.185
Status codes: s suppressed, d damped, h history, * valid, > best, i - internal,
            r RIB-failure, S Stale, R Removed
Origin codes: i - IGP, e - EGP, ? - incomplete

Network          Next Hop            Metric LocPrf Weight Path
*> 10.0.0.0         169.254.0.186          100             0 65100 ?
*> 10.2.0.0/16      169.254.0.185          100         32768 i
*> 10.5.0.0/16      169.254.0.185          100         32768 i
*> 10.6.0.0/16      169.254.0.185          100         32768 i
*> 10.10.0.0/16     169.254.0.185          100         32768 i
*> 10.42.0.0/16     169.254.0.185          100         32768 i
*> 10.56.0.0/16     169.254.0.185          100         32768 i
*> 10.133.0.0/21    169.254.0.185          100         32768 i
*> 10.192.2.0/23    169.254.0.186          100             0 65100 ?
*> 10.192.6.0/23    169.254.0.186          100             0 65100 ?
*> 10.208.0.0/18    169.254.0.185          100         32768 i
*> 10.210.0.0/20    169.254.0.186          100             0 65100 ?
*> 10.210.64.0/18   169.254.0.186          100             0 65100 ?
*> 10.210.128.0/20  169.254.0.186          100             0 65100 ?
*> 10.211.0.0/17    169.254.0.186          100             0 65100 ?
*> 10.211.128.0/17  169.254.0.186          100             0 65100 ?
*> 10.216.0.0/16    169.254.0.186          100             0 65100 ?
*> 10.224.64.0/19   169.254.0.185          100         32768 i
*> 10.227.128.0/18  169.254.0.185          100         32768 i
*> 10.227.192.0/18  169.254.0.185          100         32768 i
*> 10.228.64.0/18   169.254.0.185          100         32768 i
*> 10.228.128.0/18  169.254.0.185          100         32768 i
*> 10.228.192.0/18  169.254.0.185          100         32768 i
*> 10.229.64.0/18   169.254.0.185          100         32768 i
*> 10.229.128.0/18  169.254.0.185          100         32768 i
*> 10.229.192.0/18  169.254.0.185          100         32768 i
*> 10.230.0.0/18    169.254.0.185          100         32768 i
*> 10.230.64.0/18   169.254.0.185          100         32768 i
*> 10.230.128.0/18  169.254.0.185          100         32768 i
*> 10.230.192.0/18  169.254.0.185          100         32768 i
*> 10.246.48.0/20   169.254.0.185          100         32768 i
*> 10.254.192.0/18  169.254.0.185          100         32768 i
*> 10.255.0.0/18    169.254.0.185          100         32768 i
*> 10.255.64.0/18   169.254.0.185          100         32768 i
*> 10.255.128.0/18  169.254.0.185          100         32768 i
*> 10.255.192.0/19  169.254.0.185          100         32768 i
*> 10.255.224.0/19  169.254.0.185          100         32768 i
*> 100.64.0.64/27   169.254.0.185          100         32768 i
*> 100.64.0.96/27   169.254.0.185          100         32768 i
*> 100.64.0.128/27  169.254.0.185          100         32768 i
*> 100.64.0.160/27  169.254.0.185          100         32768 i
*> 172.16.0.0/12    169.254.0.186          100             0 65100 69999 21222 ?
*> 240.0.0.0/4      169.254.0.186          100             0 65100 69999 21222 ?
*> 253.128.0.0/14   169.254.0.186          100             0 65100 ?
*> 253.132.0.0/20   169.254.0.186          100             0 65100 ?

Total number of prefixes 45

IPv6
No BGP network exists"""
    
    # Parse the BGP output and return as JSON
    parsed_data = parse_bgp_routes(output_text)
    return json.dumps(parsed_data, indent=2)

def get_specific_network(prefix):
    # Get all BGP routes first
    all_routes = json.loads(get_bgp_routes_json())
    
    # Search through routes for matching prefix
    for route in all_routes["routes"]:
        if route["network"] == prefix:
            return route
            
    # Return None if no matching prefix is found
    return None

def main():
    """Main function to demonstrate the BGP parser."""
    print(get_bgp_routes_json())
    print(get_specific_network("10.42.0.0/16"))


if __name__ == "__main__":
    main()