From fdcaf77ba7ebac4d45e54ab6a56802ceadfa7df8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Verlouw?= <daniel.verlouw@geant.org> Date: Fri, 21 Mar 2025 14:34:04 +0100 Subject: [PATCH] Initial commit --- CRON-EXAMPLE.txt | 12 + capacity-planner/KentikTrafficMatrix.py | 147 ++++++ capacity-planner/NetworkTopology.py | 118 +++++ capacity-planner/__init__.py | 0 capacity-planner/capacityreport.py | 310 +++++++++++++ capacity-planner/whatifscenarios.py | 587 ++++++++++++++++++++++++ requirements.txt | 14 + 7 files changed, 1188 insertions(+) create mode 100644 CRON-EXAMPLE.txt create mode 100644 capacity-planner/KentikTrafficMatrix.py create mode 100644 capacity-planner/NetworkTopology.py create mode 100644 capacity-planner/__init__.py create mode 100644 capacity-planner/capacityreport.py create mode 100644 capacity-planner/whatifscenarios.py create mode 100644 requirements.txt diff --git a/CRON-EXAMPLE.txt b/CRON-EXAMPLE.txt new file mode 100644 index 0000000..d197985 --- /dev/null +++ b/CRON-EXAMPLE.txt @@ -0,0 +1,12 @@ +KENTIK_API_URL=https://api.kentik.eu/api/next/v5/query/topxdata +KENTIK_API_EMAIL=<email> +KENTIK_API_TOKEN=<api key> + +# run what-if analysis hourly +0 * * * * source <path>/bin/activate && python3 <path>/bin/whatifscenarios.py >> <path>/whatifscenario.log 2>&1 + +# daily consolidated report +45 0 * * * source <path>/bin/activate && python3 <path>/bin/capacityreport.py --daily >> <path>/whatifscenario.log 2>&1 + +# monthly consolidated report +50 0 1 * * source <path>/bin/activate && python3 <path>/bin/capacityreport.py --monthly >> <path>/whatifscenario.log 2>&1 \ No newline at end of file diff --git a/capacity-planner/KentikTrafficMatrix.py b/capacity-planner/KentikTrafficMatrix.py new file mode 100644 index 0000000..0ddb637 --- /dev/null +++ b/capacity-planner/KentikTrafficMatrix.py @@ -0,0 +1,147 @@ +import os +import requests +import json + +# Extract Kentik API settings from environment variables +API_URL = os.environ["KENTIK_API_URL"] +API_EMAIL= os.environ["KENTIK_API_EMAIL"] +API_TOKEN= os.environ["KENTIK_API_TOKEN"] + +# Kentik report period (in minutes) +KENTIK_REPORTING_PERIOD=60 +# Kentik flow aggregation window (in minutes -- lower values improve accuracy at the expense of processing time) +KENTIK_FLOW_AGGR_WINDOW=5 + +# aggregate ingress (source) and egress (destination) on the Kentik 'Site'-level +INGRESS_DIMENSION="i_device_site_name" +EGRESS_DIMENSION ="i_ult_exit_site" + +############################################################################### +# Fetch traffic matrix from the Kentik API +############################################################################### + +def kentik_api_query(payload): + # Headers for authentication + headers = { + "Content-Type": "application/json", + "X-CH-Auth-Email": API_EMAIL, + "X-CH-Auth-API-Token": API_TOKEN + } + response = requests.post(API_URL, headers=headers, json=payload) + + if response.status_code == 200: + return response.json() + else: + print(f"Error fetching data from Kentik API: {response.status_code} - {response.text}") + return None + +# Example in Kentik dashboard: https://portal.kentik.eu/v4/core/explorer/5787b62a7e8ae1ef7d399e08cdea2800 +def fetch_kentik_trafic_matrix(): + # JSON query payload + payload = { + "version": 4, + "queries": [ + { + "bucket": "Table", + "isOverlay": False, + "query": { + "all_devices": True, + "aggregateTypes": [ + "max_in_bits_per_sec" + ], + "depth": 350, + "topx": 350, # 350=max supported by Kentik + "device_name": [], + "fastData": "Auto", + "lookback_seconds": 60 * KENTIK_REPORTING_PERIOD, +# "starting_time": null, +# "ending_time": null, + "matrixBy": [], + "metric": [ + "in_bytes" + ], + "minsPolling": KENTIK_FLOW_AGGR_WINDOW, + "forceMinsPolling": True, + "outsort": "max_in_bits_per_sec", + "viz_type": "table", + "aggregates": [ + { + "value": "max_in_bits_per_sec", + "column": "f_sum_in_bytes", + "fn": "max", + "label": "Bits/s Sampled at Ingress Max", + "unit": "in_bytes", + "parentUnit": "bytes", + "group": "Bits/s Sampled at Ingress", + "origLabel": "Max", + "sample_rate": 1, + "raw": True, + "name": "max_in_bits_per_sec" + } + ], + "filters": { + "connector": "All", + "filterGroups": [ + { + "connector": "All", + "filters": [ + { + "filterField": EGRESS_DIMENSION, + "metric": "", + "aggregate": "", + "operator": "<>", + "filterValue": "", # Filter unassigned/unknown destinations + "rightFilterField": "" + } + ], + "filterGroups": [] + } + ] + }, + "dimension": [ + INGRESS_DIMENSION, + EGRESS_DIMENSION + ] + } + } + ] + } + + response = kentik_api_query(payload) + return response + +############################################################################### +# Transform Kentik data to traffic matrices (one matrix per timestamp) +############################################################################### + +def kentik_to_traffic_matrices(json_data, nodes): + """ + Convert the given JSON structure returned by Kentik into a dictionary + keyed by timestamp. For each timestamp, we store a nested dict of: + traffic_matrices[timestamp][ingress][egress] = traffic_rate_Mbps + """ + + # We'll gather all flows in the JSON + data_entries = json_data["results"][0]["data"] + + # This will be our main lookup: { ts -> {ingress -> { egress -> rate}} } + traffic_matrices = {} + + for entry in data_entries: + ingress= entry[INGRESS_DIMENSION] + egress = entry[EGRESS_DIMENSION] + + # skip unknown ingress and/or egress nodes + if ingress not in nodes: + continue + if egress not in nodes: + continue + + # timeSeries -> in_bits_per_sec -> flow => list of [timestamp, in_bits_per_sec, ignored_interval] + flow_list = entry["timeSeries"]["in_bits_per_sec"]["flow"] + for (timestamp, bits_per_sec, _ignored_interval) in flow_list: + traffic_matrices.setdefault(timestamp, {}) + traffic_matrices[timestamp].setdefault(ingress, {}) + traffic_matrices[timestamp][ingress][egress] = int(bits_per_sec / 1_000_000) # convert bps to Mbps + + return traffic_matrices \ No newline at end of file diff --git a/capacity-planner/NetworkTopology.py b/capacity-planner/NetworkTopology.py new file mode 100644 index 0000000..71d3cf2 --- /dev/null +++ b/capacity-planner/NetworkTopology.py @@ -0,0 +1,118 @@ +# (A) Define each core link: +# ( linkID, nodeA, nodeB, igp_metric, capacity, srlg_list, [normal_threshold], [failure_threshold] ) +# where: +# linkID: network-wide unique numeric ID (e.g. 1001) +# nodeA, nodeB: core link endpoints +# igp_metric: IGP cost/distance +# capacity: full-duplex link capacity in Gbps +# srlg_list: list of Shared Risk Link Group (SRLG) names (or empty) +# normal_threshold: fraction for normal usage. If omitted default is used +# failure_threshold: fraction for usage under failure. If omitted default is used +CORELINKS = [ + ( 1, "AMS", "FRA", 2016, 800, []), + ( 2, "AMS", "LON", 1428, 800, []), + ( 3, "FRA", "GEN", 2478, 800, []), + ( 4, "GEN", "PAR", 2421, 800, []), + ( 5, "LON", "LON2", 180, 800, []), + ( 6, "LON2", "PAR", 1866, 800, []), + ( 7, "FRA", "PRA", 3315, 300, []), + ( 8, "GEN", "MIL2", 4280, 300, []), + ( 9, "GEN", "MAR", 2915, 400, []), + (10, "HAM", "POZ", 3435, 300, []), + (11, "MAR", "MIL2", 3850, 300, []), + (12, "MIL2", "VIE", 5400, 400, []), + (13, "POZ", "PRA", 3215, 300, []), + (14, "PRA", "VIE", 2255, 300, []), + (15, "AMS", "HAM", 3270, 300, []), + (16, "BRU", "PAR", 50000, 100, []), + (17, "AMS", "BRU", 50000, 100, []), + (18, "BIL", "PAR", 5600, 300, []), + (19, "BUD", "ZAG", 2370, 200, []), + (20, "COR", "DUB", 5500, 100, []), + (21, "DUB", "LON", 8850, 100, []), + (22, "MAD", "MAR", 6805, 300, []), + (23, "BUC", "SOF", 5680, 200, []), + (24, "BRA", "VIE", 50000, 100, ["BRA-VIE"]), + (25, "LJU", "MIL2", 3330, 300, []), + (26, "LJU", "ZAG", 985, 200, []), + (27, "BUC", "BUD", 11850, 200, []), + (28, "LIS", "MAD", 8970, 200, []), + (29, "BIL", "POR", 9250, 200, []), + (30, "LIS", "POR", 3660, 200, []), + (31, "BIL", "MAD", 4000, 200, []), + (32, "ATH2", "MIL2", 25840, 100, ["MIL2-PRE"]), + (33, "ATH2", "THE", 8200, 100, []), + (34, "SOF", "THE", 5800, 100, []), +# ("COP", "HAM", 480, 400, []), +# ("COP", "STO", 600, 400, []), +# ("HEL", "STO", 630, 400, []), + (35, "RIG", "TAR", 2900, 100, []), + (36, "KAU", "POZ", 10050, 100, []), +# ("BEL", "SOF", 444, 400, []), +# ("BEL", "ZAG", 528, 400, []), + (37, "ZAG", "SOF", 9720, 200, []), + (38, "BRA", "BUD", 50000, 100, ["BRA-BUD"]), + (39, "COR", "LON2", 7160, 100, ["COR-LON2"]), +# ("HEL", "TAR", 227, 400, []), + (40, "KAU", "RIG", 4500, 100, []), +# ("BRU", "LUX", 380, 400, []), +# ("FRA", "LUX", 312, 400, []), +# ("RIG", "STO", 400, 400, []), +# ("COP", "POZ", 750, 400, []), + (41, "COR", "PAR", 12549, 100, ["COR-LON2"]), + (42, "KIE", "POZ", 50000, 100, []), + (43, "CHI", "BUC", 50000, 40, []), + (44, "CHI", "KIE", 10000, 100, []), + (45, "HAM", "TAR", 12490, 100, []), + (46, "BUD", "VIE", 1725, 200, ["BRA-BUD", "BRA-VIE"]), + (47, "MIL2", "THE", 29200, 100, ["MIL2-PRE"]), + (48, "AMS", "LON", 50000, 100, []), # backup link via SURF + (49, "BUC", "FRA", 50000, 100, []), # backup link via TTI +] + +# (B) List of nodes (must match the node names in CORELINKS and node/site names retrieved from Kentik) +# to be added: "BEL", "COP","HEL","LUX","STO" +NODES = [ + "AMS","ATH2","BIL","BRA","BRU","BUC","BUD","CHI","COR","DUB","FRA", + "GEN","HAM","KAU","KIE","LIS","LJU","LON","LON2","MAD","MAR","MIL2", + "PAR","POR","POZ","PRA","RIG","SOF","TAR","THE","VIE","ZAG" +] + +# Node failover ratio map for single-node fails +NODE_FAILOVER_RATIOS = { + "AMS": {"LON": 0.6, "FRA": 0.4 }, + "ATH": {"THE": 0.5, "MAR": 0.5 }, +# "BEL": {"ZAG": 1.0 }, + "BIL": {"MAD": 1.0 }, + "BRA": {"VIE": 1.0 }, + "BRU": {"AMS": 1.0 }, + "BUC": {"VIE": 0.5, "SOF": 0.5 }, + "BUD": {"ZAG": 1.0 }, +# "COP": {"HAM": 0.5, "STO": 0.5 }, + "COR": {"DUB": 1.0 }, + "DUB": {"COR": 1.0 }, + "FRA": {"HAM": 0.4, "AMS": 0.4, "LON": 0.2 }, + "GEN": {"PAR": 0.6, "MIL2": 0.4 }, + "HAM": {"FRA": 0.5, "POZ": 0.2, "LON": 0.3 }, +# "HEL": {"TAR": 0.3, "HAM": 0.7 }, + "KAU": {"RIG": 1.0 }, + "LIS": {"POR": 1.0 }, + "LJU": {"ZAG": 1.0 }, + "LON": {"AMS": 0.4, "HAM": 0.2, "FRA": 0.4}, + "LON2": {"LON": 1.0 }, +# "LUX": {"BRU": 1.0 }, + "MAD": {"BIL": 1.0 }, + "MAR": {"MIL2": 0.6, "ATH": 0.4 }, + "MIL2": {"GEN": 0.3, "MAR": 0.3, "VIE": 0.3 }, + "PAR": {"GEN": 1.0 }, + "POR": {"LIS": 1.0 }, + "POZ": {"HAM": 1.0 }, + "PRA": {"VIE": 1.0 }, + "RIG": {"KAU": 1.0 }, + "SOF": {"THE": 0.5, "BUC": 0.5 }, +# "STO": {"COP": 0.5, "HEL": 0.5 }, + "TAR": {"RIG": 1.0 }, + "THE": {"ATH": 0.5, "SOF": 0.5 }, + "VIE": {"MIL2": 0.6, "PRA": 0.2, "BUC": 0.2 }, + "ZAG": {"BUD": 0.5, "LJU": 0.5 }, +} diff --git a/capacity-planner/__init__.py b/capacity-planner/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/capacity-planner/capacityreport.py b/capacity-planner/capacityreport.py new file mode 100644 index 0000000..199c04a --- /dev/null +++ b/capacity-planner/capacityreport.py @@ -0,0 +1,310 @@ +import os +import re +import argparse +import sys +import pandas as pd # https://pandas.pydata.org +from datetime import datetime, timezone, timedelta +from pathlib import Path +import numpy as np + +############################################################################### +# INPUT DATA SECTION +############################################################################### + +# make sure this matches with the What-If Scenario runner script +RAW_REPORT_DIRECTORY="/Users/daniel.verlouw/Desktop/rawreports" +RAW_REPORT_FILE_PREFIX="raw_capacityreport_" +RAW_REPORT_FILE_SUFFIX="csv" + +CONSOLIDATED_REPORT_DIRECTORY="/Users/daniel.verlouw/Desktop/consolidatedreports" +CONSOLIDATED_REPORT_FILE_PREFIX="consolidated_capacityreport_" +CONSOLIDATED_REPORT_FILE_SUFFIX_RAW="csv" +CONSOLIDATED_REPORT_FILE_SUFFIX_HUMAN="txt" + +# Scenarios triggering a 'MUST UPGRADE' if threshold is exceeded +MUST_UPGRADE_SCENARIOS=["normal","onelinkfail","onenodefail"] +# Scenarios triggering a 'SHOULD UPGRADE' if threshold is exceeded +SHOULD_UPGRADE_SCENARIOS=["twolinkfail","nodelinkfail"] + +# ISO 8601 basic timestamp format (YYYYMMDDTHHMMZ) +ISO8601_FORMAT = '%Y%m%dT%H%MZ' +ISO8601_REGEXP = r'\d{4}\d{2}\d{2}T\d{2}\d{2}Z' + +############################################################################### +# RAW CONSOLIDATED REPORT +############################################################################### + +# --- Helper function to get the row of the max usage for a given column, handling empty/missing columns --- +def get_max_usage_row(group, usage_col): + """ + Returns a single row (as a Series) within `group` that has the maximum value in `usage_col`. + If `usage_col` does not exist or is entirely NaN, returns None. + """ + # If the column doesn't exist or has all null values, return None + if usage_col not in group.columns or group[usage_col].dropna().empty: + return None + max_val = group[usage_col].max() + # Filter to rows where usage_col equals the maximum value. + max_rows = group[group[usage_col] == max_val] + # Return only the first row among those with the maximum value. + return max_rows.iloc[0] + +def extract_usage_details(group): + """ + For a single group of rows (all links with the same ID), find the row with the max usage for each usage field (Gbps) + and extract the relevant columns. + Booleans are set to True if at least one row in the group is True. + """ + # We'll create a dict to hold the final data for this ID. + out = {} + + # Basic info columns (these are assumed to be consistent within the group) + first_row = group.iloc[0] + out['ID'] = first_row.get('ID', None) + out['NodeA'] = first_row.get('NodeA', None) + out['NodeB'] = first_row.get('NodeB', None) + out['CapacityGbps'] = first_row.get('CapacityGbps', None) + out['ConfNormalThrPct'] = first_row.get('ConfNormalThrPct', None) + out['ConfNormalThrGbps'] = first_row.get('ConfNormalThrGbps', None) + out['ConfFailThrPct'] = first_row.get('ConfFailThrPct', None) + out['ConfFailThrGbps'] = first_row.get('ConfFailThrGbps', None) + + # Normal usage + normal_row = get_max_usage_row(group, 'NormalUsageGbps') + out['NormalEnabled'] = bool(group['NormalEnabled'].any()) if 'NormalEnabled' in group.columns else False + out['NormalUsagePct'] = normal_row.get('NormalUsagePct', np.nan) if normal_row is not None else np.nan + out['NormalUsageGbps'] = normal_row.get('NormalUsageGbps', np.nan) if normal_row is not None else np.nan + out['NormalUsageExceed'] = bool(group['NormalUsageExceed'].any()) if 'NormalUsageExceed' in group.columns else False + out['NormalDateTime'] = normal_row.get('NormalDateTime', pd.NaT) if normal_row is not None else pd.NaT + + # 1 Link Fail usage + one_link_row = get_max_usage_row(group, '1LinkFailUsageGbps') + out['1LinkFailEnabled'] = bool(group['1LinkFailEnabled'].any()) if '1LinkFailEnabled' in group.columns else False + out['1LinkFailScenario'] = one_link_row.get('1LinkFailScenario', None) if one_link_row is not None else None + out['1LinkFailUsagePct'] = one_link_row.get('1LinkFailUsagePct', np.nan) if one_link_row is not None else np.nan + out['1LinkFailUsageGbps'] = one_link_row.get('1LinkFailUsageGbps', np.nan) if one_link_row is not None else np.nan + out['1LinkFailUsageExceed'] = bool(group['1LinkFailUsageExceed'].any()) if '1LinkFailUsageExceed' in group.columns else False + out['1LinkFailUsageTime'] = one_link_row.get('1LinkFailUsageTime', pd.NaT) if one_link_row is not None else pd.NaT + + # 2 Link Fail usage + two_link_row = get_max_usage_row(group, '2LinkFailUsageGbps') + out['2LinkFailEnabled'] = bool(group['2LinkFailEnabled'].any()) if '2LinkFailEnabled' in group.columns else False + out['2LinkFailScenario'] = two_link_row.get('2LinkFailScenario', None) if two_link_row is not None else None + out['2LinkFailUsagePct'] = two_link_row.get('2LinkFailUsagePct', np.nan) if two_link_row is not None else np.nan + out['2LinkFailUsageGbps'] = two_link_row.get('2LinkFailUsageGbps', np.nan) if two_link_row is not None else np.nan + out['2LinkFailUsageExceed'] = bool(group['2LinkFailUsageExceed'].any()) if '2LinkFailUsageExceed' in group.columns else False + out['2LinkFailUsageTime'] = two_link_row.get('2LinkFailUsageTime', pd.NaT) if two_link_row is not None else pd.NaT + + # 1 Node Fail usage + one_node_row = get_max_usage_row(group, '1NodeFailUsageGbps') + out['1NodeFailEnabled'] = bool(group['1NodeFailEnabled'].any()) if '1NodeFailEnabled' in group.columns else False + out['1NodeFailScenario'] = one_node_row.get('1NodeFailScenario', None) if one_node_row is not None else None + out['1NodeFailUsagePct'] = one_node_row.get('1NodeFailUsagePct', np.nan) if one_node_row is not None else np.nan + out['1NodeFailUsageGbps'] = one_node_row.get('1NodeFailUsageGbps', np.nan) if one_node_row is not None else np.nan + out['1NodeFailUsageExceed'] = bool(group['1NodeFailUsageExceed'].any()) if '1NodeFailUsageExceed' in group.columns else False + out['1NodeFailUsageTime'] = one_node_row.get('1NodeFailUsageTime', pd.NaT) if one_node_row is not None else pd.NaT + + # Node+1 Link Fail usage + node_plus_link_row = get_max_usage_row(group, 'Node+1LinkFailUsageGbps') + out['Node+1LinkFailEnabled'] = bool(group['Node+1LinkFailEnabled'].any()) if 'Node+1LinkFailEnabled' in group.columns else False + out['Node+1LinkFailScenario'] = node_plus_link_row.get('Node+1LinkFailScenario', None) if node_plus_link_row is not None else None + out['Node+1LinkFailUsagePct'] = node_plus_link_row.get('Node+1LinkFailUsagePct', np.nan) if node_plus_link_row is not None else np.nan + out['Node+1LinkFailUsageGbps'] = node_plus_link_row.get('Node+1LinkFailUsageGbps', np.nan) if node_plus_link_row is not None else np.nan + out['Node+1LinkFailUsageExceed'] = bool(group['Node+1LinkFailUsageExceed'].any()) if 'Node+1LinkFailUsageExceed' in group.columns else False + out['Node+1LinkFailUsageTime'] = node_plus_link_row.get('Node+1LinkFailUsageTime', pd.NaT) if node_plus_link_row is not None else pd.NaT + + # Finally, consolidated upgrade flags + out['MustUpgrade'] = bool(group['MustUpgrade'].any()) if 'MustUpgrade' in group.columns else False + out['ShouldUpgrade'] = bool(group['ShouldUpgrade'].any()) if 'ShouldUpgrade' in group.columns else False + + return pd.Series(out) + +############################################################################### +# HUMAN READABLE CONSOLIDATED REPORT +############################################################################### + +def build_human_report(df_raw): + df_human= df_raw.copy() + + # Helper formatting functions + def mark_upgrade(scenario, exceed): + if scenario in MUST_UPGRADE_SCENARIOS and exceed: + return f" (**)" + elif scenario in SHOULD_UPGRADE_SCENARIOS and exceed: + return f" (*)" + else: + return "" + + def normal_thr(row): + return f"{row['ConfNormalThrPct']*100:.0f}% / {row['ConfNormalThrGbps']:.0f}G" + + def fail_thr(row): + return f"{row['ConfFailThrPct']*100:.0f}% / {row['ConfFailThrGbps']:.0f}G" + + def normal_usage(row): + return f"{row['NormalUsagePct']*100:.0f}% / {row['NormalUsageGbps']:.1f}G" + mark_upgrade("normal", row['NormalUsageExceed']) + + def onelinkfail_usage(row): + return f"{row['1LinkFailUsagePct']*100:.0f}% / {row['1LinkFailUsageGbps']:.1f}G" + mark_upgrade("onelinkfail", row['1LinkFailUsageExceed']) + + def twolinkfail_usage(row): + return f"{row['2LinkFailUsagePct']*100:.0f}% / {row['2LinkFailUsageGbps']:.1f}G" + mark_upgrade("twolinkfail", row['2LinkFailUsageExceed']) + + def onenodefail_usage(row): + return f"{row['1NodeFailUsagePct']*100:.0f}% / {row['1NodeFailUsageGbps']:.1f}G" + mark_upgrade("onenodefail", row['1NodeFailUsageExceed']) + + def nodelinkfail_usage(row): + return f"{row['Node+1LinkFailUsagePct']*100:.0f}% / {row['Node+1LinkFailUsageGbps']:.1f}G" + mark_upgrade("nodelinkfail", row['Node+1LinkFailUsageExceed']) + + def upgrade(row): + return "MUST (**)" if row['MustUpgrade'] else ("SHOULD (*)" if row['ShouldUpgrade'] else "NO") + + df_human['ConfNormalThr'] = df_human.apply(normal_thr, axis=1) + df_human['ConfFailThr'] = df_human.apply(fail_thr, axis=1) + df_human['NormalUsage'] = df_human.apply(normal_usage, axis=1) + df_human['1LinkFailUsage'] = df_human.apply(onelinkfail_usage, axis=1) + df_human['2LinkFailUsage'] = df_human.apply(twolinkfail_usage, axis=1) + df_human['1NodeFailUsage'] = df_human.apply(onenodefail_usage, axis=1) + df_human['Node+1LinkFailUsage'] = df_human.apply(nodelinkfail_usage, axis=1) + df_human['Upgrade'] = df_human.apply(upgrade, axis=1) + + # Drop unused columns + df_human.drop(['ConfNormalThrPct','ConfNormalThrGbps'],axis=1, inplace=True) + df_human.drop(['ConfFailThrPct','ConfFailThrGbps'],axis=1, inplace=True) + df_human.drop(['NormalUsagePct','NormalUsageGbps','NormalUsageExceed'],axis=1, inplace=True) + df_human.drop(['1LinkFailUsagePct','1LinkFailUsageGbps','1LinkFailUsageExceed','1LinkFailUsageTime'],axis=1, inplace=True) + df_human.drop(['2LinkFailUsagePct','2LinkFailUsageGbps','2LinkFailUsageExceed','2LinkFailUsageTime'],axis=1, inplace=True) + df_human.drop(['1NodeFailUsagePct','1NodeFailUsageGbps','1NodeFailUsageExceed','1NodeFailUsageTime'],axis=1, inplace=True) + df_human.drop(['Node+1LinkFailUsagePct','Node+1LinkFailUsageGbps','Node+1LinkFailUsageExceed','Node+1LinkFailUsageTime'],axis=1, inplace=True) + df_human.drop(['MustUpgrade','ShouldUpgrade'],axis=1, inplace=True) + + # Replace NaN and NaT values with "N/A" + df_human['NormalDateTime'] = df_human['NormalDateTime'].astype(object) + df_human.fillna("N/A", inplace=True) + + return df_human[['NodeA', 'NodeB', 'CapacityGbps', 'ConfNormalThr', 'ConfFailThr', + 'NormalUsage', 'NormalDateTime', + '1LinkFailScenario', '1LinkFailUsage', + '2LinkFailScenario', '2LinkFailUsage', + '1NodeFailScenario', '1NodeFailUsage', + 'Node+1LinkFailScenario', 'Node+1LinkFailUsage', + 'Upgrade']] + +############################################################################### +# FILE FUNCTIONS +############################################################################### + +def find_files_by_timeframe(directory, prefix, suffix, start_datetime, end_datetime): + # List all raw reports in directory + all_raw_reports = [ + file for file in os.listdir(directory) + if os.path.isfile(os.path.join(directory, file)) + and file.startswith(prefix) + and file.endswith(suffix) + and re.search(ISO8601_REGEXP, file) + ] + + # Filter to files that match the timestamp pattern within the specified datetime range + matching_files = [] + for file in all_raw_reports: + match = re.search(ISO8601_REGEXP, file) + file_date = datetime.strptime(match.group(), ISO8601_FORMAT).replace(tzinfo=timezone.utc) + + if start_datetime <= file_date <= end_datetime: + matching_files.append(os.path.join(directory, file)) + + return matching_files + + +def store_consolidated(df_consolidated, directory, prefix, suffix): + path = Path(directory) + path.mkdir(parents=True, exist_ok=True) # Create directory if it doesn't exist + + # Create a ISO8601 basic format UTC timestamped filename + timestamp = datetime.now(timezone.utc).strftime(ISO8601_FORMAT) + filename = f'{prefix}{timestamp}.{suffix}' + + if suffix == "csv": + df_consolidated.to_csv(os.path.join(path, filename), sep=',', encoding='utf-8', date_format=ISO8601_FORMAT, header=True) + + elif suffix == "txt": + markdown = df_consolidated.to_markdown(headers='keys', tablefmt='psql') + # Write the markdown string to a file + with open(os.path.join(path, filename), "w") as file: + file.write(markdown) + + +############################################################################### +# MAIN +############################################################################### + +def main(): + # Parse commandline arguments + parser = argparse.ArgumentParser(description='Script usage:') + parser.add_argument('--daily', action='store_true', help='Create daily report (past day)') + parser.add_argument('--weekly', action='store_true', help='Create weekly report (past week)') + parser.add_argument('--monthly', action='store_true', help='Create monthly report (past month)') + parser.add_argument('--quarterly', action='store_true', help='Create quarterly report (past quarter)') + parser.add_argument('--yearly', action='store_true', help='Create yearly report (past year)') + + if len(sys.argv) == 1: + # No arguments were provided; print help message. + parser.print_help() + sys.exit(1) + + args = parser.parse_args() + today = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0) + first_day_of_current_month = today.replace(day=1) + + if args.daily: + start_datetime= today - timedelta(days=1) + end_datetime = today + elif args.weekly: + start_datetime= today - timedelta(weeks=1) + end_datetime = today + elif args.monthly: + # First day of last month + start_datetime= (first_day_of_current_month - timedelta(days=1)).replace(day=1) + # First day of current month + end_datetime = first_day_of_current_month + elif args.quarterly: + # Approximates the quarter as 90 days before the start of the current month. + start_datetime= (first_day_of_current_month - timedelta(days=1)).replace(day=1) - timedelta(days=(first_day_of_current_month.month - 1) % 3 * 30) + end_datetime = (first_day_of_current_month - timedelta(days=1)).replace(day=1) + elif args.yearly: + # First day of the previous year + start_datetime= today.replace(year=today.year - 1, month=1, day=1) + end_datetime = today.replace(year=today.year, month=1, day=1) + + matching_files= find_files_by_timeframe(RAW_REPORT_DIRECTORY, RAW_REPORT_FILE_PREFIX, RAW_REPORT_FILE_SUFFIX, start_datetime, end_datetime) + + if len(matching_files) > 0: + print(f"Generating consolidated report for {len(matching_files)} raw reports for timeframe {start_datetime} through {end_datetime}") + + # List of columns that should be parsed as dates from CSV + date_columns = ['NormalDateTime', '1LinkFailUsageTime', '2LinkFailUsageTime', '1NodeFailUsageTime', 'Node+1LinkFailUsageTime'] + + # Read and concat CSVs + dataframes= [pd.read_csv(file, sep=',', encoding='utf-8', parse_dates=date_columns, date_format=ISO8601_FORMAT, float_precision='round_trip') for file in matching_files] + concat_df = pd.concat(dataframes) + + # Walk over the results for each link and extract and store the highest usage for each scenario + results = [] + for id_val, group in concat_df.groupby('ID'): + details = extract_usage_details(group) + # Overwrite ID with the group key to be sure + details['ID'] = id_val + results.append(details) + + consolidated_raw = pd.DataFrame(results) + consolidated_raw.set_index("ID", inplace=True) + store_consolidated(consolidated_raw, CONSOLIDATED_REPORT_DIRECTORY, CONSOLIDATED_REPORT_FILE_PREFIX, CONSOLIDATED_REPORT_FILE_SUFFIX_RAW) + + consolidated_human= build_human_report(consolidated_raw) + store_consolidated(consolidated_human, CONSOLIDATED_REPORT_DIRECTORY, CONSOLIDATED_REPORT_FILE_PREFIX, CONSOLIDATED_REPORT_FILE_SUFFIX_HUMAN) + + else: + print(f"No raw files found for timeframe {start_datetime} through {end_datetime}") + +if __name__=="__main__": + main() diff --git a/capacity-planner/whatifscenarios.py b/capacity-planner/whatifscenarios.py new file mode 100644 index 0000000..e6da0ec --- /dev/null +++ b/capacity-planner/whatifscenarios.py @@ -0,0 +1,587 @@ +import math +import itertools +import networkx as nx # https://networkx.org +import pandas as pd # https://pandas.pydata.org +import multiprocessing +from datetime import datetime, timezone +from pathlib import Path +import KentikTrafficMatrix as ktm +import NetworkTopology as nettopo + +############################################################################### +# 1) INPUT DATA SECTION +############################################################################### + +# If normal threshold not specified on a link, default to 0.40 (allowing upto 40% link utilization in normal non-failure scenario) +DEFAULT_NORMAL_THRESHOLD = 0.40 +# If failure threshold not specified on a link, default to 0.80 (allowing upto 80% link utilization in worst-case failure scenario) +DEFAULT_FAILURE_THRESHOLD= 0.80 + +ENABLED_SCENARIOS=["normal","onelinkfail","twolinkfail","onenodefail","nodelinkfail"] + +# Scenarios triggering a 'MUST UPGRADE' if threshold is exceeded +MUST_UPGRADE_SCENARIOS=["normal","onelinkfail","onenodefail"] +# Scenarios triggering a 'SHOULD UPGRADE' if threshold is exceeded +SHOULD_UPGRADE_SCENARIOS=["twolinkfail","nodelinkfail"] + +RAW_REPORT_DIRECTORY="/Users/daniel.verlouw/Desktop/rawreports" +RAW_REPORT_FILE_PREFIX="raw_capacityreport_" +RAW_REPORT_FILE_SUFFIX="csv" + +# ISO 8601 basic timestamp format (YYYYMMDDTHHMMZ) +ISO8601_FORMAT = '%Y%m%dT%H%MZ' +ISO8601_REGEXP = r'\d{4}\d{2}\d{2}T\d{2}\d{2}Z' + +############################################################################### +# 2) Build a local NetworkX multi-directional graph for scenario computations +############################################################################### + +def build_graph(nodes, links): + # MultiDiGraph supports multiple edges (ECMP links) between two nodes + # and is directional, allowing modelling of Full-Duplex links and taking + # traffic direction into account + graph= nx.MultiDiGraph() + graph.add_nodes_from(nodes) + + for link in links: + # ( linkID, nodeA, nodeB, igp_metric, capacity, srlg_list, [norm_thr], [fail_thr] ) + lID = link[0] + nodeA= link[1] + nodeB= link[2] + igp = link[3] + + # Add Full-Duplex edges in both directions with linkID + # assumes the IGP metric is configured symmetrically + graph.add_edge(nodeA,nodeB,key=lID, igpmetric=igp) + graph.add_edge(nodeB,nodeA,key=lID, igpmetric=igp) + + return graph + +############################################################################### +# 3) Helper functions to ease lookups for core links and SRLGs +############################################################################### + +def build_corelink_map(links): + # Creating a lookup dictionary for fast lookup by linkID + corelink_dict = {link[0]: link for link in links} + return corelink_dict + +def build_srlg_map(links): + # Maps SRLG name to one or more core links (linkIDs) + srlg_map={} + + for link in links: + # ( linkID, nodeA, nodeB, igp_metric, capacity, srlg_list, [norm_thr], [fail_thr] ) + lID = link[0] + nodeA= link[1] + nodeB= link[2] + igp = link[3] + + if len(link)>5 and isinstance(link[5], list): + for srlg in link[5]: + srlg_map.setdefault(srlg, set()) + srlg_map[srlg].add(lID) + + return srlg_map + +############################################################################### +# 4) Node failover logic - redistribute traffic to other core nodes +############################################################################### + +def redistribute_local_traffic(t_xx, ratio_map): + """ + We treat local traffic T[X,X] as two endpoints at X. + Each endpoint re-homes to neighbor i or j with probability ratio_map[i], ratio_map[j]. + => fraction = ratio_map[i]*ratio_map[j]*t_xx => T[i,j]. + If i=j, it's T[i,i] (local on neighbor i). + Returns local_matrix[i][j] = fraction (Mbps). + """ + local_matrix={} + nbrs= list(ratio_map.keys()) + for i in nbrs: + local_matrix.setdefault(i,{}) + for j in nbrs: + local_matrix[i][j]= ratio_map[i]* ratio_map[j]* t_xx + + return local_matrix + +def build_traffic_with_node_fail(baseTraffic, nodefail_ratios, failingNode): + """ + Return scenario-specific traffic matrix in Mbps, + re-homing T[failingNode,*], T[*,failingNode], T[failingNode,failingNode]. + """ + scenario_traffic={} + ratio_map= nodefail_ratios.get(failingNode,{}) + + for s in baseTraffic: + scenario_traffic[s]={} + for d in baseTraffic[s]: + val= baseTraffic[s][d] + if val<=0: + scenario_traffic[s][d]=0 + continue + + # traffic is local - T[failingNode,failingNode] + if s==failingNode and d==failingNode: + scenario_traffic[s][d]=0 + mat= redistribute_local_traffic(val, ratio_map) + for i in mat: + for j in mat[i]: + portion= mat[i][j] + if portion>0: + scenario_traffic.setdefault(i,{}) + scenario_traffic[i][j]= scenario_traffic[i].get(j,0)+ portion + + # T[failingNode,*] + elif s==failingNode: + scenario_traffic[s][d]=0 + for nbr,r in ratio_map.items(): + scenario_traffic.setdefault(nbr,{}) + scenario_traffic[nbr][d]= scenario_traffic[nbr].get(d,0)+ val*r + + # T[*,failingNode] + elif d==failingNode: + scenario_traffic[s][d]=0 + for nbr,r in ratio_map.items(): + scenario_traffic[s][nbr]= scenario_traffic[s].get(nbr,0)+ val*r + + # Unaffected + else: + scenario_traffic[s][d]= val + + return scenario_traffic + +############################################################################### +# 5) Remove SRLG or corelink from graph +############################################################################### + +def remove_corelink(graph, removeLink): + edges_rm=[] + for (u,v,lID) in graph.edges(keys=True): + if lID==removeLink: + edges_rm.append((u,v,lID)) + for e in edges_rm: + graph.remove_edge(*e) + +def remove_corelink_or_srlg(graph, failedElement, srlg_map): + etype, data= failedElement + if etype=="LINK": + linkID= data + remove_corelink(graph, linkID) + + elif etype=="SRLG": + if data in srlg_map: + for linkID in srlg_map[data]: + remove_corelink(graph, linkID) + +############################################################################### +# 6) Compute scenario load in MultiDiGraph +############################################################################### + +def compute_scenario_load(graph, scenario_traffic): + # initialize empty usage map + usageMap={} + for(u,v,lID) in graph.edges(keys=True): + usageMap[(u,v,lID)] = 0.0 + + # walk over [ingress][egress] traffic demands + for s in scenario_traffic: + for d in scenario_traffic[s]: + demand_mbps= scenario_traffic[s][d] + if demand_mbps<=0 or s==d or (s not in graph) or (d not in graph): + continue + demand_gbps= demand_mbps/1000.0 # convert Mbps to Gbps + + # Run Dijkstra algorithm + try: + best_dist= nx.dijkstra_path_length(graph, s, d, weight='igpmetric') + except nx.NetworkXNoPath: + continue + + # Enumerate all shortest paths from (s)ource to (d)estination + all_paths= list(nx.all_shortest_paths(graph, s, d, weight='igpmetric')) + valid=[] + for path in all_paths: + dist_sum=0 + for i in range(len(path)-1): + p= path[i] + q= path[i+1] + # pick minimal igp among parallel edges p->q + best_igp= math.inf + for k2 in graph[p][q]: + igp= graph[p][q][k2]['igpmetric'] + if igp<best_igp: + best_igp= igp + dist_sum+= best_igp + + if math.isclose(dist_sum,best_dist): + valid.append(path) + + if not valid: + continue + + # Now equally divide traffic demand across all shortest paths (ECMP) + share= demand_gbps/ len(valid) + for path in valid: + for i in range(len(path)-1): + p= path[i] + q= path[i+1] + # pick minimal link p->q + best_igp= math.inf + best_k= None + for k2 in graph[p][q]: + igp= graph[p][q][k2]['igpmetric'] + if igp<best_igp: + best_igp= igp + best_k= k2 + usageMap[(p,q,best_k)]+= share + + return usageMap + +############################################################################### +# 7) Individual Normal/Failure Scenario Runners +############################################################################### + +def run_normal_scenario(graph, trafficMatrix): + """ + No failures => just compute load with the base traffic. + """ + return compute_scenario_load(graph, trafficMatrix) + +def run_onenode_failure(graph, trafficMatrix, nodefail_ratios, nodeFail): + """ + Remove nodeFail from graph, shift its traffic T[nodeFail,*], T[*,nodeFail], T[nodeFail,nodeFail]. + """ + if nodeFail in graph: + graph.remove_node(nodeFail) + scenario_traffic= build_traffic_with_node_fail(trafficMatrix, nodefail_ratios, nodeFail) + return compute_scenario_load(graph, scenario_traffic) + +def run_onelink_failure(graph, trafficMatrix, srlg_map, linkFail): + """ + linkFail = ("LINK",(u,v)) or ("SRLG",{(x,y),...}). + Remove link from a copy of graph, then compute load with base traffic. + """ + remove_corelink_or_srlg(graph, linkFail, srlg_map) + return compute_scenario_load(graph, trafficMatrix) + +def run_twolink_failure(graph, trafficMatrix, srlg_map, linkFail1, linkFail2): + """ + linkFail1, linkFail2 each = ("LINK",(u,v)) or ("SRLG",{(x,y),...}). + Remove them from a copy of graph, then compute load with base traffic. + """ + remove_corelink_or_srlg(graph, linkFail1, srlg_map) + remove_corelink_or_srlg(graph, linkFail2, srlg_map) + return compute_scenario_load(graph, trafficMatrix) + +def run_nodelink_failure(graph, trafficMatrix, srlg_map, nodefail_ratios, nodeFail, linkFail): + # first remove node and shift traffic + if nodeFail in graph: + graph.remove_node(nodeFail) + scenario_traffic= build_traffic_with_node_fail(trafficMatrix, nodefail_ratios, nodeFail) + # then remove link + remove_corelink_or_srlg(graph, linkFail, srlg_map) + return compute_scenario_load(graph, scenario_traffic) + +############################################################################### +# 8) Single scenario task runner +############################################################################### + +def expand_link_name(corelink_map, linkFail): + """Helper function--return a string describing one scenario element (LINK linkID or SRLG name).""" + etype,data= linkFail + if etype=="LINK": + # get corelink details by ID + td= corelink_map.get(data) + nodeA= td[1] + nodeB= td[2] + return f"LINK_{data}/{nodeA}-{nodeB}" + + elif etype=="SRLG": + return f"SRLG_{data}" + + else: + return str(linkFail) + +def compute_scenario_task(args): + """ + Each scenario runs as a single task + """ + (graph_orig, corelink_map, srlg_map, nodefail_ratios, ts, trafficMatrix, scenarioType, scenarioData)= args + + # 1) first, create a copy of graph so in-memory copy of original is not modified + graph=graph_orig.copy() + + # 2) run selected normal or failure scenario + usageMap={} + scenarioName="" + if scenarioType=="normal": + usageMap= run_normal_scenario(graph, trafficMatrix) + scenarioName="normal" + + elif scenarioType=="onenodefail": + nodeFail= scenarioData + usageMap= run_onenode_failure(graph, trafficMatrix, nodefail_ratios, nodeFail) + scenarioName= f"NODE_{nodeFail}" + + elif scenarioType=="onelinkfail": + linkFail= scenarioData + usageMap= run_onelink_failure(graph, trafficMatrix, srlg_map, linkFail) + scenarioName= f"{expand_link_name(corelink_map, linkFail)}" + + elif scenarioType=="twolinkfail": + (linkFail1,linkFail2)= scenarioData + usageMap= run_twolink_failure(graph, trafficMatrix, srlg_map, linkFail1, linkFail2) + scenarioName= f"{expand_link_name(corelink_map, linkFail1)} + {expand_link_name(corelink_map, linkFail2)}" + + elif scenarioType=="nodelinkfail": + (nodeFail,linkFail)= scenarioData + usageMap= run_nodelink_failure(graph, trafficMatrix, srlg_map, nodefail_ratios, nodeFail, linkFail) + scenarioName= f"NODE_{nodeFail} + {expand_link_name(corelink_map, linkFail)}" + + else: + usageMap={} + scenarioName="unknown" + + return (ts, scenarioType, scenarioName, usageMap) + + +############################################################################### +# 9) Build list of tasks to run in parallel +############################################################################### + +def build_parallel_tasks(graph, nodes, nodefail_ratios, corelink_map, srlg_map, traffic_matrices): + # enumerate link failure scenarios => link linkIDs + srlg + linkFailElements=[] + for link_id, _link_data in corelink_map.items(): + linkFailElements.append(("LINK", link_id)) + + for srlg_name, _srlg_data in srlg_map.items(): + linkFailElements.append(("SRLG", srlg_name)) + + tasks=[] + for ts in traffic_matrices.keys(): + trafficMatrix= traffic_matrices[ts] + + # add normal non-failure scenario + if "normal" in ENABLED_SCENARIOS: + tasks.append((graph, corelink_map, srlg_map, nodefail_ratios, ts, trafficMatrix, "normal", None)) + + # add single link fail scenarios + if "onelinkfail" in ENABLED_SCENARIOS: + for linkFail in linkFailElements: + tasks.append((graph, corelink_map, srlg_map, nodefail_ratios, ts, trafficMatrix, "onelinkfail", linkFail)) + + # add unique two link fail scenarios + if "twolinkfail" in ENABLED_SCENARIOS: + fe_twofail_combos= list(itertools.combinations(linkFailElements,2)) + for linkFail1,linkFail2 in fe_twofail_combos: + tasks.append((graph, corelink_map, srlg_map, nodefail_ratios, ts, trafficMatrix, "twolinkfail", (linkFail1,linkFail2))) + + for nodeFail in nodes: + # add node fail scenarios + if "onenodefail" in ENABLED_SCENARIOS: + tasks.append((graph, corelink_map, srlg_map, nodefail_ratios, ts, trafficMatrix, "onenodefail", nodeFail)) + + # add node + link fail scenarios + if "nodelinkfail" in ENABLED_SCENARIOS: + for linkFail in linkFailElements: + tasks.append((graph, corelink_map, srlg_map, nodefail_ratios, ts, trafficMatrix, "nodelinkfail", (nodeFail,linkFail))) + + return tasks + +############################################################################### +# 10) Report generator +############################################################################### + +def link_usage_aggregator(corelink_map, results): + # initialize empty link usage aggregator + aggLinkUsage={} + for link_id, _link_data in corelink_map.items(): + aggLinkUsage[link_id]= {} + for scenarioType in ENABLED_SCENARIOS: + aggLinkUsage[link_id][scenarioType] = (0.0,pd.NaT,None) + + # unify results across all tasks - for each scenario, record worst-case (highest) load on each link + for (ts, scenarioType, scenarioName, usageMap) in results: + for (u,v,lID) in usageMap: + usage= usageMap[(u,v,lID)] + oldUsage, oldTs, oldSc= aggLinkUsage[lID][scenarioType] + if usage> oldUsage: + aggLinkUsage[lID][scenarioType]= (usage, ts, f"{scenarioName}") + + # return aggLinkUsage[linkID][scenarioType] => (usage,ts,scenarioName) + return aggLinkUsage + +def build_raw_report(corelink_map, aggLinkUsage): + final_report_raw=[] + + # Helper print functions + def exceed(scenario, thr, capacity): + return aggLinkUsage[lID][scenario][0]>thr*capacity if scenario in ENABLED_SCENARIOS else pd.NA + + def usagePct(scenario, capacity): + # (usage,ts,scenarioName) + return aggLinkUsage[lID][scenario][0]/capacity if scenario in ENABLED_SCENARIOS else pd.NA + + def usageGbps(scenario): + return aggLinkUsage[lID][scenario][0] if scenario in ENABLED_SCENARIOS else pd.NA + + def scenarioTimestamp(scenario): + return pd.to_datetime(aggLinkUsage[lID][scenario][1], unit='ms', errors='coerce', utc=True) if scenario in ENABLED_SCENARIOS else pd.NaT + + def failScenarioName(scenario): + return aggLinkUsage[lID][scenario][2] if scenario in ENABLED_SCENARIOS else None + + def scenarioEnabled(scenario): + return scenario in ENABLED_SCENARIOS + + # Iterate over each core link + for link_id, link_data in corelink_map.items(): + # ( linkID, nodeA, nodeB, igp_metric, capacity, srlg_list, [norm_thr], [fail_thr] ) + lID = link_data[0] + nodeA = link_data[1] + nodeB = link_data[2] + capacity= link_data[4] + # parse thresholds if present, else use defaults + norm_thr= link_data[6] if len(link_data)>6 and isinstance(link_data[6], (float,int)) else DEFAULT_NORMAL_THRESHOLD + fail_thr= link_data[7] if len(link_data)>7 and isinstance(link_data[7], (float,int)) else DEFAULT_FAILURE_THRESHOLD + + # threshold check + mustUpgrade=False + shouldUpgrade=False + for scenarioType in ENABLED_SCENARIOS: + (usage,ts,scenarioName) = aggLinkUsage[lID][scenarioType] + thr= 1 + if scenarioType=="normal" and norm_thr>0: + thr=norm_thr + elif "fail" in scenarioType and fail_thr>0: + thr=fail_thr + + if usage> thr*capacity and scenarioType in MUST_UPGRADE_SCENARIOS: + mustUpgrade=True + elif usage> thr*capacity and scenarioType in SHOULD_UPGRADE_SCENARIOS: + shouldUpgrade=True + + # print one row per core link + final_report_raw.append({ + "ID": lID, + "NodeA": nodeA, + "NodeB": nodeB, + "CapacityGbps": capacity, + + "ConfNormalThrPct": norm_thr, + "ConfNormalThrGbps": norm_thr*capacity, + + "ConfFailThrPct": fail_thr, + "ConfFailThrGbps": fail_thr*capacity, + + "NormalEnabled": scenarioEnabled("normal"), + "NormalUsagePct": usagePct("normal", capacity), + "NormalUsageGbps": usageGbps("normal"), + "NormalUsageExceed": exceed("normal", norm_thr, capacity), + "NormalDateTime": scenarioTimestamp("normal"), + + "1LinkFailEnabled": scenarioEnabled("onelinkfail"), + "1LinkFailScenario": failScenarioName("onelinkfail"), + "1LinkFailUsagePct": usagePct("onelinkfail", capacity), + "1LinkFailUsageGbps": usageGbps("onelinkfail"), + "1LinkFailUsageExceed": exceed("onelinkfail", fail_thr, capacity), + "1LinkFailUsageTime": scenarioTimestamp("onelinkfail"), + + "2LinkFailEnabled": scenarioEnabled("twolinkfail"), + "2LinkFailScenario": failScenarioName("twolinkfail"), + "2LinkFailUsagePct": usagePct("twolinkfail", capacity), + "2LinkFailUsageGbps": usageGbps("twolinkfail"), + "2LinkFailUsageExceed": exceed("twolinkfail", fail_thr, capacity), + "2LinkFailUsageTime": scenarioTimestamp("twolinkfail"), + + "1NodeFailEnabled": scenarioEnabled("onenodefail"), + "1NodeFailScenario": failScenarioName("onenodefail"), + "1NodeFailUsagePct": usagePct("onenodefail", capacity), + "1NodeFailUsageGbps": usageGbps("onenodefail"), + "1NodeFailUsageExceed": exceed("onenodefail", fail_thr, capacity), + "1NodeFailUsageTime": scenarioTimestamp("onenodefail"), + + "Node+1LinkFailEnabled": scenarioEnabled("nodelinkfail"), + "Node+1LinkFailScenario": failScenarioName("nodelinkfail"), + "Node+1LinkFailUsagePct": usagePct("nodelinkfail", capacity), + "Node+1LinkFailUsageGbps": usageGbps("nodelinkfail"), + "Node+1LinkFailUsageExceed": exceed("nodelinkfail", fail_thr, capacity), + "Node+1LinkFailUsageTime": scenarioTimestamp("nodelinkfail"), + + "MustUpgrade": mustUpgrade, + "ShouldUpgrade": shouldUpgrade + }) + + df_raw= pd.DataFrame(final_report_raw, columns=["ID","NodeA","NodeB","CapacityGbps", + "ConfNormalThrPct","ConfNormalThrGbps", + "ConfFailThrPct","ConfFailThrGbps", + "NormalEnabled","NormalUsagePct","NormalUsageGbps","NormalUsageExceed","NormalDateTime", + "1LinkFailEnabled","1LinkFailScenario","1LinkFailUsagePct","1LinkFailUsageGbps","1LinkFailUsageExceed","1LinkFailUsageTime", + "2LinkFailEnabled","2LinkFailScenario","2LinkFailUsagePct","2LinkFailUsageGbps","2LinkFailUsageExceed","2LinkFailUsageTime", + "1NodeFailEnabled","1NodeFailScenario","1NodeFailUsagePct","1NodeFailUsageGbps","1NodeFailUsageExceed","1NodeFailUsageTime", + "Node+1LinkFailEnabled","Node+1LinkFailScenario","Node+1LinkFailUsagePct","Node+1LinkFailUsageGbps","Node+1LinkFailUsageExceed","Node+1LinkFailUsageTime", + "MustUpgrade","ShouldUpgrade",]) + df_raw.set_index("ID", inplace=True) + + return df_raw + +def store_raw_report(df_raw): + directory = Path(RAW_REPORT_DIRECTORY) + directory.mkdir(parents=True, exist_ok=True) # Create directory if it doesn't exist + + # Create a ISO8601 basic format UTC timestamped filename + timestamp = datetime.now(timezone.utc).strftime(ISO8601_FORMAT) + filename = f'{RAW_REPORT_FILE_PREFIX}{timestamp}.{RAW_REPORT_FILE_SUFFIX}' + filepath = directory / filename + + df_raw.to_csv(filepath, sep=',', encoding='utf-8', date_format=ISO8601_FORMAT, header=True) + + +############################################################################### +# MAIN +############################################################################### + +def main(): + # Record and display the start time of the script + start_time = datetime.now(timezone.utc) + print(f"===============================================================") + print("Execution started at:", start_time.strftime(ISO8601_FORMAT)) + + # 1) query Kentik to build traffic demand matrices + print(f"Querying Kentik API for traffic demand matrix...") + kentik_json = ktm.fetch_kentik_trafic_matrix() + print(f"Processing traffic demand matrices from Kentik data...") + traffic_matrices= ktm.kentik_to_traffic_matrices(kentik_json, nettopo.NODES) # returns traffic_matrices[timestamp][ingress][egress] = traffic_rate_Mbps + + # 2) build graph and lookup data structures + graph= build_graph(nettopo.NODES, nettopo.CORELINKS) + corelink_map= build_corelink_map(nettopo.CORELINKS) + srlg_map= build_srlg_map(nettopo.CORELINKS) + + # 3) define tasks to run in parallel + tasks= build_parallel_tasks(graph, nettopo.NODES, nettopo.NODE_FAILOVER_RATIOS, corelink_map, srlg_map, traffic_matrices) + print(f"Number of traffic simulation tasks: {len(tasks)} across {len(traffic_matrices)} timeslots with {len(tasks)/len(traffic_matrices):.0f} scenarios each") + + # 4) now run all tasks in a single global pool + print(f"Executing parallel traffic simulation tasks...") + with multiprocessing.Pool() as pool: + tasks_results= pool.map(compute_scenario_task, tasks) + + # 5) aggregate link usage across all tasks, recording the worst-case (highest) load on each link + aggLinkUsage= link_usage_aggregator(corelink_map, tasks_results) + + # 6) create and store final report + raw_report= build_raw_report(corelink_map, aggLinkUsage) + store_raw_report(raw_report) + + # Display the end time, and total execution duration + end_time = datetime.now(timezone.utc) + duration = end_time - start_time + print("Execution ended at:", end_time.strftime(ISO8601_FORMAT)) + print("Total duration in seconds:", duration.total_seconds()) + print(f"===============================================================") + +if __name__=="__main__": + main() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..1d4f386 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,14 @@ +certifi==2024.12.14 +charset-normalizer==3.4.1 +idna==3.10 +networkx==3.4.2 +numpy==2.2.2 +pandas==2.2.3 +python-dateutil==2.9.0.post0 +pytz==2025.1 +requests==2.32.3 +setuptools==75.8.0 +six==1.17.0 +tabulate==0.9.0 +tzdata==2025.1 +urllib3==2.3.0 -- GitLab