diff --git a/capacity-planner/KentikTrafficMatrix.py b/capacity-planner/KentikTrafficMatrix.py deleted file mode 100644 index 0ddb6376393a045da2dcad320bf3a786025252c0..0000000000000000000000000000000000000000 --- a/capacity-planner/KentikTrafficMatrix.py +++ /dev/null @@ -1,147 +0,0 @@ -import os -import requests -import json - -# Extract Kentik API settings from environment variables -API_URL = os.environ["KENTIK_API_URL"] -API_EMAIL= os.environ["KENTIK_API_EMAIL"] -API_TOKEN= os.environ["KENTIK_API_TOKEN"] - -# Kentik report period (in minutes) -KENTIK_REPORTING_PERIOD=60 -# Kentik flow aggregation window (in minutes -- lower values improve accuracy at the expense of processing time) -KENTIK_FLOW_AGGR_WINDOW=5 - -# aggregate ingress (source) and egress (destination) on the Kentik 'Site'-level -INGRESS_DIMENSION="i_device_site_name" -EGRESS_DIMENSION ="i_ult_exit_site" - -############################################################################### -# Fetch traffic matrix from the Kentik API -############################################################################### - -def kentik_api_query(payload): - # Headers for authentication - headers = { - "Content-Type": "application/json", - "X-CH-Auth-Email": API_EMAIL, - "X-CH-Auth-API-Token": API_TOKEN - } - response = requests.post(API_URL, headers=headers, json=payload) - - if response.status_code == 200: - return response.json() - else: - print(f"Error fetching data from Kentik API: {response.status_code} - {response.text}") - return None - -# Example in Kentik dashboard: https://portal.kentik.eu/v4/core/explorer/5787b62a7e8ae1ef7d399e08cdea2800 -def fetch_kentik_trafic_matrix(): - # JSON query payload - payload = { - "version": 4, - "queries": [ - { - "bucket": "Table", - "isOverlay": False, - "query": { - "all_devices": True, - "aggregateTypes": [ - "max_in_bits_per_sec" - ], - "depth": 350, - "topx": 350, # 350=max supported by Kentik - "device_name": [], - "fastData": "Auto", - "lookback_seconds": 60 * KENTIK_REPORTING_PERIOD, -# "starting_time": null, -# "ending_time": null, - "matrixBy": [], - "metric": [ - "in_bytes" - ], - "minsPolling": KENTIK_FLOW_AGGR_WINDOW, - "forceMinsPolling": True, - "outsort": "max_in_bits_per_sec", - "viz_type": "table", - "aggregates": [ - { - "value": "max_in_bits_per_sec", - "column": "f_sum_in_bytes", - "fn": "max", - "label": "Bits/s Sampled at Ingress Max", - "unit": "in_bytes", - "parentUnit": "bytes", - "group": "Bits/s Sampled at Ingress", - "origLabel": "Max", - "sample_rate": 1, - "raw": True, - "name": "max_in_bits_per_sec" - } - ], - "filters": { - "connector": "All", - "filterGroups": [ - { - "connector": "All", - "filters": [ - { - "filterField": EGRESS_DIMENSION, - "metric": "", - "aggregate": "", - "operator": "<>", - "filterValue": "", # Filter unassigned/unknown destinations - "rightFilterField": "" - } - ], - "filterGroups": [] - } - ] - }, - "dimension": [ - INGRESS_DIMENSION, - EGRESS_DIMENSION - ] - } - } - ] - } - - response = kentik_api_query(payload) - return response - -############################################################################### -# Transform Kentik data to traffic matrices (one matrix per timestamp) -############################################################################### - -def kentik_to_traffic_matrices(json_data, nodes): - """ - Convert the given JSON structure returned by Kentik into a dictionary - keyed by timestamp. For each timestamp, we store a nested dict of: - traffic_matrices[timestamp][ingress][egress] = traffic_rate_Mbps - """ - - # We'll gather all flows in the JSON - data_entries = json_data["results"][0]["data"] - - # This will be our main lookup: { ts -> {ingress -> { egress -> rate}} } - traffic_matrices = {} - - for entry in data_entries: - ingress= entry[INGRESS_DIMENSION] - egress = entry[EGRESS_DIMENSION] - - # skip unknown ingress and/or egress nodes - if ingress not in nodes: - continue - if egress not in nodes: - continue - - # timeSeries -> in_bits_per_sec -> flow => list of [timestamp, in_bits_per_sec, ignored_interval] - flow_list = entry["timeSeries"]["in_bits_per_sec"]["flow"] - for (timestamp, bits_per_sec, _ignored_interval) in flow_list: - traffic_matrices.setdefault(timestamp, {}) - traffic_matrices[timestamp].setdefault(ingress, {}) - traffic_matrices[timestamp][ingress][egress] = int(bits_per_sec / 1_000_000) # convert bps to Mbps - - return traffic_matrices \ No newline at end of file diff --git a/capacity-planner/NetworkTopology.py b/capacity-planner/NetworkTopology.py deleted file mode 100644 index 71d3cf27250973635343cb854fd3e578c84fb811..0000000000000000000000000000000000000000 --- a/capacity-planner/NetworkTopology.py +++ /dev/null @@ -1,118 +0,0 @@ -# (A) Define each core link: -# ( linkID, nodeA, nodeB, igp_metric, capacity, srlg_list, [normal_threshold], [failure_threshold] ) -# where: -# linkID: network-wide unique numeric ID (e.g. 1001) -# nodeA, nodeB: core link endpoints -# igp_metric: IGP cost/distance -# capacity: full-duplex link capacity in Gbps -# srlg_list: list of Shared Risk Link Group (SRLG) names (or empty) -# normal_threshold: fraction for normal usage. If omitted default is used -# failure_threshold: fraction for usage under failure. If omitted default is used -CORELINKS = [ - ( 1, "AMS", "FRA", 2016, 800, []), - ( 2, "AMS", "LON", 1428, 800, []), - ( 3, "FRA", "GEN", 2478, 800, []), - ( 4, "GEN", "PAR", 2421, 800, []), - ( 5, "LON", "LON2", 180, 800, []), - ( 6, "LON2", "PAR", 1866, 800, []), - ( 7, "FRA", "PRA", 3315, 300, []), - ( 8, "GEN", "MIL2", 4280, 300, []), - ( 9, "GEN", "MAR", 2915, 400, []), - (10, "HAM", "POZ", 3435, 300, []), - (11, "MAR", "MIL2", 3850, 300, []), - (12, "MIL2", "VIE", 5400, 400, []), - (13, "POZ", "PRA", 3215, 300, []), - (14, "PRA", "VIE", 2255, 300, []), - (15, "AMS", "HAM", 3270, 300, []), - (16, "BRU", "PAR", 50000, 100, []), - (17, "AMS", "BRU", 50000, 100, []), - (18, "BIL", "PAR", 5600, 300, []), - (19, "BUD", "ZAG", 2370, 200, []), - (20, "COR", "DUB", 5500, 100, []), - (21, "DUB", "LON", 8850, 100, []), - (22, "MAD", "MAR", 6805, 300, []), - (23, "BUC", "SOF", 5680, 200, []), - (24, "BRA", "VIE", 50000, 100, ["BRA-VIE"]), - (25, "LJU", "MIL2", 3330, 300, []), - (26, "LJU", "ZAG", 985, 200, []), - (27, "BUC", "BUD", 11850, 200, []), - (28, "LIS", "MAD", 8970, 200, []), - (29, "BIL", "POR", 9250, 200, []), - (30, "LIS", "POR", 3660, 200, []), - (31, "BIL", "MAD", 4000, 200, []), - (32, "ATH2", "MIL2", 25840, 100, ["MIL2-PRE"]), - (33, "ATH2", "THE", 8200, 100, []), - (34, "SOF", "THE", 5800, 100, []), -# ("COP", "HAM", 480, 400, []), -# ("COP", "STO", 600, 400, []), -# ("HEL", "STO", 630, 400, []), - (35, "RIG", "TAR", 2900, 100, []), - (36, "KAU", "POZ", 10050, 100, []), -# ("BEL", "SOF", 444, 400, []), -# ("BEL", "ZAG", 528, 400, []), - (37, "ZAG", "SOF", 9720, 200, []), - (38, "BRA", "BUD", 50000, 100, ["BRA-BUD"]), - (39, "COR", "LON2", 7160, 100, ["COR-LON2"]), -# ("HEL", "TAR", 227, 400, []), - (40, "KAU", "RIG", 4500, 100, []), -# ("BRU", "LUX", 380, 400, []), -# ("FRA", "LUX", 312, 400, []), -# ("RIG", "STO", 400, 400, []), -# ("COP", "POZ", 750, 400, []), - (41, "COR", "PAR", 12549, 100, ["COR-LON2"]), - (42, "KIE", "POZ", 50000, 100, []), - (43, "CHI", "BUC", 50000, 40, []), - (44, "CHI", "KIE", 10000, 100, []), - (45, "HAM", "TAR", 12490, 100, []), - (46, "BUD", "VIE", 1725, 200, ["BRA-BUD", "BRA-VIE"]), - (47, "MIL2", "THE", 29200, 100, ["MIL2-PRE"]), - (48, "AMS", "LON", 50000, 100, []), # backup link via SURF - (49, "BUC", "FRA", 50000, 100, []), # backup link via TTI -] - -# (B) List of nodes (must match the node names in CORELINKS and node/site names retrieved from Kentik) -# to be added: "BEL", "COP","HEL","LUX","STO" -NODES = [ - "AMS","ATH2","BIL","BRA","BRU","BUC","BUD","CHI","COR","DUB","FRA", - "GEN","HAM","KAU","KIE","LIS","LJU","LON","LON2","MAD","MAR","MIL2", - "PAR","POR","POZ","PRA","RIG","SOF","TAR","THE","VIE","ZAG" -] - -# Node failover ratio map for single-node fails -NODE_FAILOVER_RATIOS = { - "AMS": {"LON": 0.6, "FRA": 0.4 }, - "ATH": {"THE": 0.5, "MAR": 0.5 }, -# "BEL": {"ZAG": 1.0 }, - "BIL": {"MAD": 1.0 }, - "BRA": {"VIE": 1.0 }, - "BRU": {"AMS": 1.0 }, - "BUC": {"VIE": 0.5, "SOF": 0.5 }, - "BUD": {"ZAG": 1.0 }, -# "COP": {"HAM": 0.5, "STO": 0.5 }, - "COR": {"DUB": 1.0 }, - "DUB": {"COR": 1.0 }, - "FRA": {"HAM": 0.4, "AMS": 0.4, "LON": 0.2 }, - "GEN": {"PAR": 0.6, "MIL2": 0.4 }, - "HAM": {"FRA": 0.5, "POZ": 0.2, "LON": 0.3 }, -# "HEL": {"TAR": 0.3, "HAM": 0.7 }, - "KAU": {"RIG": 1.0 }, - "LIS": {"POR": 1.0 }, - "LJU": {"ZAG": 1.0 }, - "LON": {"AMS": 0.4, "HAM": 0.2, "FRA": 0.4}, - "LON2": {"LON": 1.0 }, -# "LUX": {"BRU": 1.0 }, - "MAD": {"BIL": 1.0 }, - "MAR": {"MIL2": 0.6, "ATH": 0.4 }, - "MIL2": {"GEN": 0.3, "MAR": 0.3, "VIE": 0.3 }, - "PAR": {"GEN": 1.0 }, - "POR": {"LIS": 1.0 }, - "POZ": {"HAM": 1.0 }, - "PRA": {"VIE": 1.0 }, - "RIG": {"KAU": 1.0 }, - "SOF": {"THE": 0.5, "BUC": 0.5 }, -# "STO": {"COP": 0.5, "HEL": 0.5 }, - "TAR": {"RIG": 1.0 }, - "THE": {"ATH": 0.5, "SOF": 0.5 }, - "VIE": {"MIL2": 0.6, "PRA": 0.2, "BUC": 0.2 }, - "ZAG": {"BUD": 0.5, "LJU": 0.5 }, -} diff --git a/capacity-planner/capacityreport.py b/capacity-planner/capacityreport.py deleted file mode 100644 index 199c04a0f23520caa8f57465f90d8f7b65a39502..0000000000000000000000000000000000000000 --- a/capacity-planner/capacityreport.py +++ /dev/null @@ -1,310 +0,0 @@ -import os -import re -import argparse -import sys -import pandas as pd # https://pandas.pydata.org -from datetime import datetime, timezone, timedelta -from pathlib import Path -import numpy as np - -############################################################################### -# INPUT DATA SECTION -############################################################################### - -# make sure this matches with the What-If Scenario runner script -RAW_REPORT_DIRECTORY="/Users/daniel.verlouw/Desktop/rawreports" -RAW_REPORT_FILE_PREFIX="raw_capacityreport_" -RAW_REPORT_FILE_SUFFIX="csv" - -CONSOLIDATED_REPORT_DIRECTORY="/Users/daniel.verlouw/Desktop/consolidatedreports" -CONSOLIDATED_REPORT_FILE_PREFIX="consolidated_capacityreport_" -CONSOLIDATED_REPORT_FILE_SUFFIX_RAW="csv" -CONSOLIDATED_REPORT_FILE_SUFFIX_HUMAN="txt" - -# Scenarios triggering a 'MUST UPGRADE' if threshold is exceeded -MUST_UPGRADE_SCENARIOS=["normal","onelinkfail","onenodefail"] -# Scenarios triggering a 'SHOULD UPGRADE' if threshold is exceeded -SHOULD_UPGRADE_SCENARIOS=["twolinkfail","nodelinkfail"] - -# ISO 8601 basic timestamp format (YYYYMMDDTHHMMZ) -ISO8601_FORMAT = '%Y%m%dT%H%MZ' -ISO8601_REGEXP = r'\d{4}\d{2}\d{2}T\d{2}\d{2}Z' - -############################################################################### -# RAW CONSOLIDATED REPORT -############################################################################### - -# --- Helper function to get the row of the max usage for a given column, handling empty/missing columns --- -def get_max_usage_row(group, usage_col): - """ - Returns a single row (as a Series) within `group` that has the maximum value in `usage_col`. - If `usage_col` does not exist or is entirely NaN, returns None. - """ - # If the column doesn't exist or has all null values, return None - if usage_col not in group.columns or group[usage_col].dropna().empty: - return None - max_val = group[usage_col].max() - # Filter to rows where usage_col equals the maximum value. - max_rows = group[group[usage_col] == max_val] - # Return only the first row among those with the maximum value. - return max_rows.iloc[0] - -def extract_usage_details(group): - """ - For a single group of rows (all links with the same ID), find the row with the max usage for each usage field (Gbps) - and extract the relevant columns. - Booleans are set to True if at least one row in the group is True. - """ - # We'll create a dict to hold the final data for this ID. - out = {} - - # Basic info columns (these are assumed to be consistent within the group) - first_row = group.iloc[0] - out['ID'] = first_row.get('ID', None) - out['NodeA'] = first_row.get('NodeA', None) - out['NodeB'] = first_row.get('NodeB', None) - out['CapacityGbps'] = first_row.get('CapacityGbps', None) - out['ConfNormalThrPct'] = first_row.get('ConfNormalThrPct', None) - out['ConfNormalThrGbps'] = first_row.get('ConfNormalThrGbps', None) - out['ConfFailThrPct'] = first_row.get('ConfFailThrPct', None) - out['ConfFailThrGbps'] = first_row.get('ConfFailThrGbps', None) - - # Normal usage - normal_row = get_max_usage_row(group, 'NormalUsageGbps') - out['NormalEnabled'] = bool(group['NormalEnabled'].any()) if 'NormalEnabled' in group.columns else False - out['NormalUsagePct'] = normal_row.get('NormalUsagePct', np.nan) if normal_row is not None else np.nan - out['NormalUsageGbps'] = normal_row.get('NormalUsageGbps', np.nan) if normal_row is not None else np.nan - out['NormalUsageExceed'] = bool(group['NormalUsageExceed'].any()) if 'NormalUsageExceed' in group.columns else False - out['NormalDateTime'] = normal_row.get('NormalDateTime', pd.NaT) if normal_row is not None else pd.NaT - - # 1 Link Fail usage - one_link_row = get_max_usage_row(group, '1LinkFailUsageGbps') - out['1LinkFailEnabled'] = bool(group['1LinkFailEnabled'].any()) if '1LinkFailEnabled' in group.columns else False - out['1LinkFailScenario'] = one_link_row.get('1LinkFailScenario', None) if one_link_row is not None else None - out['1LinkFailUsagePct'] = one_link_row.get('1LinkFailUsagePct', np.nan) if one_link_row is not None else np.nan - out['1LinkFailUsageGbps'] = one_link_row.get('1LinkFailUsageGbps', np.nan) if one_link_row is not None else np.nan - out['1LinkFailUsageExceed'] = bool(group['1LinkFailUsageExceed'].any()) if '1LinkFailUsageExceed' in group.columns else False - out['1LinkFailUsageTime'] = one_link_row.get('1LinkFailUsageTime', pd.NaT) if one_link_row is not None else pd.NaT - - # 2 Link Fail usage - two_link_row = get_max_usage_row(group, '2LinkFailUsageGbps') - out['2LinkFailEnabled'] = bool(group['2LinkFailEnabled'].any()) if '2LinkFailEnabled' in group.columns else False - out['2LinkFailScenario'] = two_link_row.get('2LinkFailScenario', None) if two_link_row is not None else None - out['2LinkFailUsagePct'] = two_link_row.get('2LinkFailUsagePct', np.nan) if two_link_row is not None else np.nan - out['2LinkFailUsageGbps'] = two_link_row.get('2LinkFailUsageGbps', np.nan) if two_link_row is not None else np.nan - out['2LinkFailUsageExceed'] = bool(group['2LinkFailUsageExceed'].any()) if '2LinkFailUsageExceed' in group.columns else False - out['2LinkFailUsageTime'] = two_link_row.get('2LinkFailUsageTime', pd.NaT) if two_link_row is not None else pd.NaT - - # 1 Node Fail usage - one_node_row = get_max_usage_row(group, '1NodeFailUsageGbps') - out['1NodeFailEnabled'] = bool(group['1NodeFailEnabled'].any()) if '1NodeFailEnabled' in group.columns else False - out['1NodeFailScenario'] = one_node_row.get('1NodeFailScenario', None) if one_node_row is not None else None - out['1NodeFailUsagePct'] = one_node_row.get('1NodeFailUsagePct', np.nan) if one_node_row is not None else np.nan - out['1NodeFailUsageGbps'] = one_node_row.get('1NodeFailUsageGbps', np.nan) if one_node_row is not None else np.nan - out['1NodeFailUsageExceed'] = bool(group['1NodeFailUsageExceed'].any()) if '1NodeFailUsageExceed' in group.columns else False - out['1NodeFailUsageTime'] = one_node_row.get('1NodeFailUsageTime', pd.NaT) if one_node_row is not None else pd.NaT - - # Node+1 Link Fail usage - node_plus_link_row = get_max_usage_row(group, 'Node+1LinkFailUsageGbps') - out['Node+1LinkFailEnabled'] = bool(group['Node+1LinkFailEnabled'].any()) if 'Node+1LinkFailEnabled' in group.columns else False - out['Node+1LinkFailScenario'] = node_plus_link_row.get('Node+1LinkFailScenario', None) if node_plus_link_row is not None else None - out['Node+1LinkFailUsagePct'] = node_plus_link_row.get('Node+1LinkFailUsagePct', np.nan) if node_plus_link_row is not None else np.nan - out['Node+1LinkFailUsageGbps'] = node_plus_link_row.get('Node+1LinkFailUsageGbps', np.nan) if node_plus_link_row is not None else np.nan - out['Node+1LinkFailUsageExceed'] = bool(group['Node+1LinkFailUsageExceed'].any()) if 'Node+1LinkFailUsageExceed' in group.columns else False - out['Node+1LinkFailUsageTime'] = node_plus_link_row.get('Node+1LinkFailUsageTime', pd.NaT) if node_plus_link_row is not None else pd.NaT - - # Finally, consolidated upgrade flags - out['MustUpgrade'] = bool(group['MustUpgrade'].any()) if 'MustUpgrade' in group.columns else False - out['ShouldUpgrade'] = bool(group['ShouldUpgrade'].any()) if 'ShouldUpgrade' in group.columns else False - - return pd.Series(out) - -############################################################################### -# HUMAN READABLE CONSOLIDATED REPORT -############################################################################### - -def build_human_report(df_raw): - df_human= df_raw.copy() - - # Helper formatting functions - def mark_upgrade(scenario, exceed): - if scenario in MUST_UPGRADE_SCENARIOS and exceed: - return f" (**)" - elif scenario in SHOULD_UPGRADE_SCENARIOS and exceed: - return f" (*)" - else: - return "" - - def normal_thr(row): - return f"{row['ConfNormalThrPct']*100:.0f}% / {row['ConfNormalThrGbps']:.0f}G" - - def fail_thr(row): - return f"{row['ConfFailThrPct']*100:.0f}% / {row['ConfFailThrGbps']:.0f}G" - - def normal_usage(row): - return f"{row['NormalUsagePct']*100:.0f}% / {row['NormalUsageGbps']:.1f}G" + mark_upgrade("normal", row['NormalUsageExceed']) - - def onelinkfail_usage(row): - return f"{row['1LinkFailUsagePct']*100:.0f}% / {row['1LinkFailUsageGbps']:.1f}G" + mark_upgrade("onelinkfail", row['1LinkFailUsageExceed']) - - def twolinkfail_usage(row): - return f"{row['2LinkFailUsagePct']*100:.0f}% / {row['2LinkFailUsageGbps']:.1f}G" + mark_upgrade("twolinkfail", row['2LinkFailUsageExceed']) - - def onenodefail_usage(row): - return f"{row['1NodeFailUsagePct']*100:.0f}% / {row['1NodeFailUsageGbps']:.1f}G" + mark_upgrade("onenodefail", row['1NodeFailUsageExceed']) - - def nodelinkfail_usage(row): - return f"{row['Node+1LinkFailUsagePct']*100:.0f}% / {row['Node+1LinkFailUsageGbps']:.1f}G" + mark_upgrade("nodelinkfail", row['Node+1LinkFailUsageExceed']) - - def upgrade(row): - return "MUST (**)" if row['MustUpgrade'] else ("SHOULD (*)" if row['ShouldUpgrade'] else "NO") - - df_human['ConfNormalThr'] = df_human.apply(normal_thr, axis=1) - df_human['ConfFailThr'] = df_human.apply(fail_thr, axis=1) - df_human['NormalUsage'] = df_human.apply(normal_usage, axis=1) - df_human['1LinkFailUsage'] = df_human.apply(onelinkfail_usage, axis=1) - df_human['2LinkFailUsage'] = df_human.apply(twolinkfail_usage, axis=1) - df_human['1NodeFailUsage'] = df_human.apply(onenodefail_usage, axis=1) - df_human['Node+1LinkFailUsage'] = df_human.apply(nodelinkfail_usage, axis=1) - df_human['Upgrade'] = df_human.apply(upgrade, axis=1) - - # Drop unused columns - df_human.drop(['ConfNormalThrPct','ConfNormalThrGbps'],axis=1, inplace=True) - df_human.drop(['ConfFailThrPct','ConfFailThrGbps'],axis=1, inplace=True) - df_human.drop(['NormalUsagePct','NormalUsageGbps','NormalUsageExceed'],axis=1, inplace=True) - df_human.drop(['1LinkFailUsagePct','1LinkFailUsageGbps','1LinkFailUsageExceed','1LinkFailUsageTime'],axis=1, inplace=True) - df_human.drop(['2LinkFailUsagePct','2LinkFailUsageGbps','2LinkFailUsageExceed','2LinkFailUsageTime'],axis=1, inplace=True) - df_human.drop(['1NodeFailUsagePct','1NodeFailUsageGbps','1NodeFailUsageExceed','1NodeFailUsageTime'],axis=1, inplace=True) - df_human.drop(['Node+1LinkFailUsagePct','Node+1LinkFailUsageGbps','Node+1LinkFailUsageExceed','Node+1LinkFailUsageTime'],axis=1, inplace=True) - df_human.drop(['MustUpgrade','ShouldUpgrade'],axis=1, inplace=True) - - # Replace NaN and NaT values with "N/A" - df_human['NormalDateTime'] = df_human['NormalDateTime'].astype(object) - df_human.fillna("N/A", inplace=True) - - return df_human[['NodeA', 'NodeB', 'CapacityGbps', 'ConfNormalThr', 'ConfFailThr', - 'NormalUsage', 'NormalDateTime', - '1LinkFailScenario', '1LinkFailUsage', - '2LinkFailScenario', '2LinkFailUsage', - '1NodeFailScenario', '1NodeFailUsage', - 'Node+1LinkFailScenario', 'Node+1LinkFailUsage', - 'Upgrade']] - -############################################################################### -# FILE FUNCTIONS -############################################################################### - -def find_files_by_timeframe(directory, prefix, suffix, start_datetime, end_datetime): - # List all raw reports in directory - all_raw_reports = [ - file for file in os.listdir(directory) - if os.path.isfile(os.path.join(directory, file)) - and file.startswith(prefix) - and file.endswith(suffix) - and re.search(ISO8601_REGEXP, file) - ] - - # Filter to files that match the timestamp pattern within the specified datetime range - matching_files = [] - for file in all_raw_reports: - match = re.search(ISO8601_REGEXP, file) - file_date = datetime.strptime(match.group(), ISO8601_FORMAT).replace(tzinfo=timezone.utc) - - if start_datetime <= file_date <= end_datetime: - matching_files.append(os.path.join(directory, file)) - - return matching_files - - -def store_consolidated(df_consolidated, directory, prefix, suffix): - path = Path(directory) - path.mkdir(parents=True, exist_ok=True) # Create directory if it doesn't exist - - # Create a ISO8601 basic format UTC timestamped filename - timestamp = datetime.now(timezone.utc).strftime(ISO8601_FORMAT) - filename = f'{prefix}{timestamp}.{suffix}' - - if suffix == "csv": - df_consolidated.to_csv(os.path.join(path, filename), sep=',', encoding='utf-8', date_format=ISO8601_FORMAT, header=True) - - elif suffix == "txt": - markdown = df_consolidated.to_markdown(headers='keys', tablefmt='psql') - # Write the markdown string to a file - with open(os.path.join(path, filename), "w") as file: - file.write(markdown) - - -############################################################################### -# MAIN -############################################################################### - -def main(): - # Parse commandline arguments - parser = argparse.ArgumentParser(description='Script usage:') - parser.add_argument('--daily', action='store_true', help='Create daily report (past day)') - parser.add_argument('--weekly', action='store_true', help='Create weekly report (past week)') - parser.add_argument('--monthly', action='store_true', help='Create monthly report (past month)') - parser.add_argument('--quarterly', action='store_true', help='Create quarterly report (past quarter)') - parser.add_argument('--yearly', action='store_true', help='Create yearly report (past year)') - - if len(sys.argv) == 1: - # No arguments were provided; print help message. - parser.print_help() - sys.exit(1) - - args = parser.parse_args() - today = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0) - first_day_of_current_month = today.replace(day=1) - - if args.daily: - start_datetime= today - timedelta(days=1) - end_datetime = today - elif args.weekly: - start_datetime= today - timedelta(weeks=1) - end_datetime = today - elif args.monthly: - # First day of last month - start_datetime= (first_day_of_current_month - timedelta(days=1)).replace(day=1) - # First day of current month - end_datetime = first_day_of_current_month - elif args.quarterly: - # Approximates the quarter as 90 days before the start of the current month. - start_datetime= (first_day_of_current_month - timedelta(days=1)).replace(day=1) - timedelta(days=(first_day_of_current_month.month - 1) % 3 * 30) - end_datetime = (first_day_of_current_month - timedelta(days=1)).replace(day=1) - elif args.yearly: - # First day of the previous year - start_datetime= today.replace(year=today.year - 1, month=1, day=1) - end_datetime = today.replace(year=today.year, month=1, day=1) - - matching_files= find_files_by_timeframe(RAW_REPORT_DIRECTORY, RAW_REPORT_FILE_PREFIX, RAW_REPORT_FILE_SUFFIX, start_datetime, end_datetime) - - if len(matching_files) > 0: - print(f"Generating consolidated report for {len(matching_files)} raw reports for timeframe {start_datetime} through {end_datetime}") - - # List of columns that should be parsed as dates from CSV - date_columns = ['NormalDateTime', '1LinkFailUsageTime', '2LinkFailUsageTime', '1NodeFailUsageTime', 'Node+1LinkFailUsageTime'] - - # Read and concat CSVs - dataframes= [pd.read_csv(file, sep=',', encoding='utf-8', parse_dates=date_columns, date_format=ISO8601_FORMAT, float_precision='round_trip') for file in matching_files] - concat_df = pd.concat(dataframes) - - # Walk over the results for each link and extract and store the highest usage for each scenario - results = [] - for id_val, group in concat_df.groupby('ID'): - details = extract_usage_details(group) - # Overwrite ID with the group key to be sure - details['ID'] = id_val - results.append(details) - - consolidated_raw = pd.DataFrame(results) - consolidated_raw.set_index("ID", inplace=True) - store_consolidated(consolidated_raw, CONSOLIDATED_REPORT_DIRECTORY, CONSOLIDATED_REPORT_FILE_PREFIX, CONSOLIDATED_REPORT_FILE_SUFFIX_RAW) - - consolidated_human= build_human_report(consolidated_raw) - store_consolidated(consolidated_human, CONSOLIDATED_REPORT_DIRECTORY, CONSOLIDATED_REPORT_FILE_PREFIX, CONSOLIDATED_REPORT_FILE_SUFFIX_HUMAN) - - else: - print(f"No raw files found for timeframe {start_datetime} through {end_datetime}") - -if __name__=="__main__": - main() diff --git a/capacity-planner/whatifscenarios.py b/capacity-planner/whatifscenarios.py deleted file mode 100644 index e6da0ec79b4ac07637ce2ff088aae2afe0205628..0000000000000000000000000000000000000000 --- a/capacity-planner/whatifscenarios.py +++ /dev/null @@ -1,587 +0,0 @@ -import math -import itertools -import networkx as nx # https://networkx.org -import pandas as pd # https://pandas.pydata.org -import multiprocessing -from datetime import datetime, timezone -from pathlib import Path -import KentikTrafficMatrix as ktm -import NetworkTopology as nettopo - -############################################################################### -# 1) INPUT DATA SECTION -############################################################################### - -# If normal threshold not specified on a link, default to 0.40 (allowing upto 40% link utilization in normal non-failure scenario) -DEFAULT_NORMAL_THRESHOLD = 0.40 -# If failure threshold not specified on a link, default to 0.80 (allowing upto 80% link utilization in worst-case failure scenario) -DEFAULT_FAILURE_THRESHOLD= 0.80 - -ENABLED_SCENARIOS=["normal","onelinkfail","twolinkfail","onenodefail","nodelinkfail"] - -# Scenarios triggering a 'MUST UPGRADE' if threshold is exceeded -MUST_UPGRADE_SCENARIOS=["normal","onelinkfail","onenodefail"] -# Scenarios triggering a 'SHOULD UPGRADE' if threshold is exceeded -SHOULD_UPGRADE_SCENARIOS=["twolinkfail","nodelinkfail"] - -RAW_REPORT_DIRECTORY="/Users/daniel.verlouw/Desktop/rawreports" -RAW_REPORT_FILE_PREFIX="raw_capacityreport_" -RAW_REPORT_FILE_SUFFIX="csv" - -# ISO 8601 basic timestamp format (YYYYMMDDTHHMMZ) -ISO8601_FORMAT = '%Y%m%dT%H%MZ' -ISO8601_REGEXP = r'\d{4}\d{2}\d{2}T\d{2}\d{2}Z' - -############################################################################### -# 2) Build a local NetworkX multi-directional graph for scenario computations -############################################################################### - -def build_graph(nodes, links): - # MultiDiGraph supports multiple edges (ECMP links) between two nodes - # and is directional, allowing modelling of Full-Duplex links and taking - # traffic direction into account - graph= nx.MultiDiGraph() - graph.add_nodes_from(nodes) - - for link in links: - # ( linkID, nodeA, nodeB, igp_metric, capacity, srlg_list, [norm_thr], [fail_thr] ) - lID = link[0] - nodeA= link[1] - nodeB= link[2] - igp = link[3] - - # Add Full-Duplex edges in both directions with linkID - # assumes the IGP metric is configured symmetrically - graph.add_edge(nodeA,nodeB,key=lID, igpmetric=igp) - graph.add_edge(nodeB,nodeA,key=lID, igpmetric=igp) - - return graph - -############################################################################### -# 3) Helper functions to ease lookups for core links and SRLGs -############################################################################### - -def build_corelink_map(links): - # Creating a lookup dictionary for fast lookup by linkID - corelink_dict = {link[0]: link for link in links} - return corelink_dict - -def build_srlg_map(links): - # Maps SRLG name to one or more core links (linkIDs) - srlg_map={} - - for link in links: - # ( linkID, nodeA, nodeB, igp_metric, capacity, srlg_list, [norm_thr], [fail_thr] ) - lID = link[0] - nodeA= link[1] - nodeB= link[2] - igp = link[3] - - if len(link)>5 and isinstance(link[5], list): - for srlg in link[5]: - srlg_map.setdefault(srlg, set()) - srlg_map[srlg].add(lID) - - return srlg_map - -############################################################################### -# 4) Node failover logic - redistribute traffic to other core nodes -############################################################################### - -def redistribute_local_traffic(t_xx, ratio_map): - """ - We treat local traffic T[X,X] as two endpoints at X. - Each endpoint re-homes to neighbor i or j with probability ratio_map[i], ratio_map[j]. - => fraction = ratio_map[i]*ratio_map[j]*t_xx => T[i,j]. - If i=j, it's T[i,i] (local on neighbor i). - Returns local_matrix[i][j] = fraction (Mbps). - """ - local_matrix={} - nbrs= list(ratio_map.keys()) - for i in nbrs: - local_matrix.setdefault(i,{}) - for j in nbrs: - local_matrix[i][j]= ratio_map[i]* ratio_map[j]* t_xx - - return local_matrix - -def build_traffic_with_node_fail(baseTraffic, nodefail_ratios, failingNode): - """ - Return scenario-specific traffic matrix in Mbps, - re-homing T[failingNode,*], T[*,failingNode], T[failingNode,failingNode]. - """ - scenario_traffic={} - ratio_map= nodefail_ratios.get(failingNode,{}) - - for s in baseTraffic: - scenario_traffic[s]={} - for d in baseTraffic[s]: - val= baseTraffic[s][d] - if val<=0: - scenario_traffic[s][d]=0 - continue - - # traffic is local - T[failingNode,failingNode] - if s==failingNode and d==failingNode: - scenario_traffic[s][d]=0 - mat= redistribute_local_traffic(val, ratio_map) - for i in mat: - for j in mat[i]: - portion= mat[i][j] - if portion>0: - scenario_traffic.setdefault(i,{}) - scenario_traffic[i][j]= scenario_traffic[i].get(j,0)+ portion - - # T[failingNode,*] - elif s==failingNode: - scenario_traffic[s][d]=0 - for nbr,r in ratio_map.items(): - scenario_traffic.setdefault(nbr,{}) - scenario_traffic[nbr][d]= scenario_traffic[nbr].get(d,0)+ val*r - - # T[*,failingNode] - elif d==failingNode: - scenario_traffic[s][d]=0 - for nbr,r in ratio_map.items(): - scenario_traffic[s][nbr]= scenario_traffic[s].get(nbr,0)+ val*r - - # Unaffected - else: - scenario_traffic[s][d]= val - - return scenario_traffic - -############################################################################### -# 5) Remove SRLG or corelink from graph -############################################################################### - -def remove_corelink(graph, removeLink): - edges_rm=[] - for (u,v,lID) in graph.edges(keys=True): - if lID==removeLink: - edges_rm.append((u,v,lID)) - for e in edges_rm: - graph.remove_edge(*e) - -def remove_corelink_or_srlg(graph, failedElement, srlg_map): - etype, data= failedElement - if etype=="LINK": - linkID= data - remove_corelink(graph, linkID) - - elif etype=="SRLG": - if data in srlg_map: - for linkID in srlg_map[data]: - remove_corelink(graph, linkID) - -############################################################################### -# 6) Compute scenario load in MultiDiGraph -############################################################################### - -def compute_scenario_load(graph, scenario_traffic): - # initialize empty usage map - usageMap={} - for(u,v,lID) in graph.edges(keys=True): - usageMap[(u,v,lID)] = 0.0 - - # walk over [ingress][egress] traffic demands - for s in scenario_traffic: - for d in scenario_traffic[s]: - demand_mbps= scenario_traffic[s][d] - if demand_mbps<=0 or s==d or (s not in graph) or (d not in graph): - continue - demand_gbps= demand_mbps/1000.0 # convert Mbps to Gbps - - # Run Dijkstra algorithm - try: - best_dist= nx.dijkstra_path_length(graph, s, d, weight='igpmetric') - except nx.NetworkXNoPath: - continue - - # Enumerate all shortest paths from (s)ource to (d)estination - all_paths= list(nx.all_shortest_paths(graph, s, d, weight='igpmetric')) - valid=[] - for path in all_paths: - dist_sum=0 - for i in range(len(path)-1): - p= path[i] - q= path[i+1] - # pick minimal igp among parallel edges p->q - best_igp= math.inf - for k2 in graph[p][q]: - igp= graph[p][q][k2]['igpmetric'] - if igp<best_igp: - best_igp= igp - dist_sum+= best_igp - - if math.isclose(dist_sum,best_dist): - valid.append(path) - - if not valid: - continue - - # Now equally divide traffic demand across all shortest paths (ECMP) - share= demand_gbps/ len(valid) - for path in valid: - for i in range(len(path)-1): - p= path[i] - q= path[i+1] - # pick minimal link p->q - best_igp= math.inf - best_k= None - for k2 in graph[p][q]: - igp= graph[p][q][k2]['igpmetric'] - if igp<best_igp: - best_igp= igp - best_k= k2 - usageMap[(p,q,best_k)]+= share - - return usageMap - -############################################################################### -# 7) Individual Normal/Failure Scenario Runners -############################################################################### - -def run_normal_scenario(graph, trafficMatrix): - """ - No failures => just compute load with the base traffic. - """ - return compute_scenario_load(graph, trafficMatrix) - -def run_onenode_failure(graph, trafficMatrix, nodefail_ratios, nodeFail): - """ - Remove nodeFail from graph, shift its traffic T[nodeFail,*], T[*,nodeFail], T[nodeFail,nodeFail]. - """ - if nodeFail in graph: - graph.remove_node(nodeFail) - scenario_traffic= build_traffic_with_node_fail(trafficMatrix, nodefail_ratios, nodeFail) - return compute_scenario_load(graph, scenario_traffic) - -def run_onelink_failure(graph, trafficMatrix, srlg_map, linkFail): - """ - linkFail = ("LINK",(u,v)) or ("SRLG",{(x,y),...}). - Remove link from a copy of graph, then compute load with base traffic. - """ - remove_corelink_or_srlg(graph, linkFail, srlg_map) - return compute_scenario_load(graph, trafficMatrix) - -def run_twolink_failure(graph, trafficMatrix, srlg_map, linkFail1, linkFail2): - """ - linkFail1, linkFail2 each = ("LINK",(u,v)) or ("SRLG",{(x,y),...}). - Remove them from a copy of graph, then compute load with base traffic. - """ - remove_corelink_or_srlg(graph, linkFail1, srlg_map) - remove_corelink_or_srlg(graph, linkFail2, srlg_map) - return compute_scenario_load(graph, trafficMatrix) - -def run_nodelink_failure(graph, trafficMatrix, srlg_map, nodefail_ratios, nodeFail, linkFail): - # first remove node and shift traffic - if nodeFail in graph: - graph.remove_node(nodeFail) - scenario_traffic= build_traffic_with_node_fail(trafficMatrix, nodefail_ratios, nodeFail) - # then remove link - remove_corelink_or_srlg(graph, linkFail, srlg_map) - return compute_scenario_load(graph, scenario_traffic) - -############################################################################### -# 8) Single scenario task runner -############################################################################### - -def expand_link_name(corelink_map, linkFail): - """Helper function--return a string describing one scenario element (LINK linkID or SRLG name).""" - etype,data= linkFail - if etype=="LINK": - # get corelink details by ID - td= corelink_map.get(data) - nodeA= td[1] - nodeB= td[2] - return f"LINK_{data}/{nodeA}-{nodeB}" - - elif etype=="SRLG": - return f"SRLG_{data}" - - else: - return str(linkFail) - -def compute_scenario_task(args): - """ - Each scenario runs as a single task - """ - (graph_orig, corelink_map, srlg_map, nodefail_ratios, ts, trafficMatrix, scenarioType, scenarioData)= args - - # 1) first, create a copy of graph so in-memory copy of original is not modified - graph=graph_orig.copy() - - # 2) run selected normal or failure scenario - usageMap={} - scenarioName="" - if scenarioType=="normal": - usageMap= run_normal_scenario(graph, trafficMatrix) - scenarioName="normal" - - elif scenarioType=="onenodefail": - nodeFail= scenarioData - usageMap= run_onenode_failure(graph, trafficMatrix, nodefail_ratios, nodeFail) - scenarioName= f"NODE_{nodeFail}" - - elif scenarioType=="onelinkfail": - linkFail= scenarioData - usageMap= run_onelink_failure(graph, trafficMatrix, srlg_map, linkFail) - scenarioName= f"{expand_link_name(corelink_map, linkFail)}" - - elif scenarioType=="twolinkfail": - (linkFail1,linkFail2)= scenarioData - usageMap= run_twolink_failure(graph, trafficMatrix, srlg_map, linkFail1, linkFail2) - scenarioName= f"{expand_link_name(corelink_map, linkFail1)} + {expand_link_name(corelink_map, linkFail2)}" - - elif scenarioType=="nodelinkfail": - (nodeFail,linkFail)= scenarioData - usageMap= run_nodelink_failure(graph, trafficMatrix, srlg_map, nodefail_ratios, nodeFail, linkFail) - scenarioName= f"NODE_{nodeFail} + {expand_link_name(corelink_map, linkFail)}" - - else: - usageMap={} - scenarioName="unknown" - - return (ts, scenarioType, scenarioName, usageMap) - - -############################################################################### -# 9) Build list of tasks to run in parallel -############################################################################### - -def build_parallel_tasks(graph, nodes, nodefail_ratios, corelink_map, srlg_map, traffic_matrices): - # enumerate link failure scenarios => link linkIDs + srlg - linkFailElements=[] - for link_id, _link_data in corelink_map.items(): - linkFailElements.append(("LINK", link_id)) - - for srlg_name, _srlg_data in srlg_map.items(): - linkFailElements.append(("SRLG", srlg_name)) - - tasks=[] - for ts in traffic_matrices.keys(): - trafficMatrix= traffic_matrices[ts] - - # add normal non-failure scenario - if "normal" in ENABLED_SCENARIOS: - tasks.append((graph, corelink_map, srlg_map, nodefail_ratios, ts, trafficMatrix, "normal", None)) - - # add single link fail scenarios - if "onelinkfail" in ENABLED_SCENARIOS: - for linkFail in linkFailElements: - tasks.append((graph, corelink_map, srlg_map, nodefail_ratios, ts, trafficMatrix, "onelinkfail", linkFail)) - - # add unique two link fail scenarios - if "twolinkfail" in ENABLED_SCENARIOS: - fe_twofail_combos= list(itertools.combinations(linkFailElements,2)) - for linkFail1,linkFail2 in fe_twofail_combos: - tasks.append((graph, corelink_map, srlg_map, nodefail_ratios, ts, trafficMatrix, "twolinkfail", (linkFail1,linkFail2))) - - for nodeFail in nodes: - # add node fail scenarios - if "onenodefail" in ENABLED_SCENARIOS: - tasks.append((graph, corelink_map, srlg_map, nodefail_ratios, ts, trafficMatrix, "onenodefail", nodeFail)) - - # add node + link fail scenarios - if "nodelinkfail" in ENABLED_SCENARIOS: - for linkFail in linkFailElements: - tasks.append((graph, corelink_map, srlg_map, nodefail_ratios, ts, trafficMatrix, "nodelinkfail", (nodeFail,linkFail))) - - return tasks - -############################################################################### -# 10) Report generator -############################################################################### - -def link_usage_aggregator(corelink_map, results): - # initialize empty link usage aggregator - aggLinkUsage={} - for link_id, _link_data in corelink_map.items(): - aggLinkUsage[link_id]= {} - for scenarioType in ENABLED_SCENARIOS: - aggLinkUsage[link_id][scenarioType] = (0.0,pd.NaT,None) - - # unify results across all tasks - for each scenario, record worst-case (highest) load on each link - for (ts, scenarioType, scenarioName, usageMap) in results: - for (u,v,lID) in usageMap: - usage= usageMap[(u,v,lID)] - oldUsage, oldTs, oldSc= aggLinkUsage[lID][scenarioType] - if usage> oldUsage: - aggLinkUsage[lID][scenarioType]= (usage, ts, f"{scenarioName}") - - # return aggLinkUsage[linkID][scenarioType] => (usage,ts,scenarioName) - return aggLinkUsage - -def build_raw_report(corelink_map, aggLinkUsage): - final_report_raw=[] - - # Helper print functions - def exceed(scenario, thr, capacity): - return aggLinkUsage[lID][scenario][0]>thr*capacity if scenario in ENABLED_SCENARIOS else pd.NA - - def usagePct(scenario, capacity): - # (usage,ts,scenarioName) - return aggLinkUsage[lID][scenario][0]/capacity if scenario in ENABLED_SCENARIOS else pd.NA - - def usageGbps(scenario): - return aggLinkUsage[lID][scenario][0] if scenario in ENABLED_SCENARIOS else pd.NA - - def scenarioTimestamp(scenario): - return pd.to_datetime(aggLinkUsage[lID][scenario][1], unit='ms', errors='coerce', utc=True) if scenario in ENABLED_SCENARIOS else pd.NaT - - def failScenarioName(scenario): - return aggLinkUsage[lID][scenario][2] if scenario in ENABLED_SCENARIOS else None - - def scenarioEnabled(scenario): - return scenario in ENABLED_SCENARIOS - - # Iterate over each core link - for link_id, link_data in corelink_map.items(): - # ( linkID, nodeA, nodeB, igp_metric, capacity, srlg_list, [norm_thr], [fail_thr] ) - lID = link_data[0] - nodeA = link_data[1] - nodeB = link_data[2] - capacity= link_data[4] - # parse thresholds if present, else use defaults - norm_thr= link_data[6] if len(link_data)>6 and isinstance(link_data[6], (float,int)) else DEFAULT_NORMAL_THRESHOLD - fail_thr= link_data[7] if len(link_data)>7 and isinstance(link_data[7], (float,int)) else DEFAULT_FAILURE_THRESHOLD - - # threshold check - mustUpgrade=False - shouldUpgrade=False - for scenarioType in ENABLED_SCENARIOS: - (usage,ts,scenarioName) = aggLinkUsage[lID][scenarioType] - thr= 1 - if scenarioType=="normal" and norm_thr>0: - thr=norm_thr - elif "fail" in scenarioType and fail_thr>0: - thr=fail_thr - - if usage> thr*capacity and scenarioType in MUST_UPGRADE_SCENARIOS: - mustUpgrade=True - elif usage> thr*capacity and scenarioType in SHOULD_UPGRADE_SCENARIOS: - shouldUpgrade=True - - # print one row per core link - final_report_raw.append({ - "ID": lID, - "NodeA": nodeA, - "NodeB": nodeB, - "CapacityGbps": capacity, - - "ConfNormalThrPct": norm_thr, - "ConfNormalThrGbps": norm_thr*capacity, - - "ConfFailThrPct": fail_thr, - "ConfFailThrGbps": fail_thr*capacity, - - "NormalEnabled": scenarioEnabled("normal"), - "NormalUsagePct": usagePct("normal", capacity), - "NormalUsageGbps": usageGbps("normal"), - "NormalUsageExceed": exceed("normal", norm_thr, capacity), - "NormalDateTime": scenarioTimestamp("normal"), - - "1LinkFailEnabled": scenarioEnabled("onelinkfail"), - "1LinkFailScenario": failScenarioName("onelinkfail"), - "1LinkFailUsagePct": usagePct("onelinkfail", capacity), - "1LinkFailUsageGbps": usageGbps("onelinkfail"), - "1LinkFailUsageExceed": exceed("onelinkfail", fail_thr, capacity), - "1LinkFailUsageTime": scenarioTimestamp("onelinkfail"), - - "2LinkFailEnabled": scenarioEnabled("twolinkfail"), - "2LinkFailScenario": failScenarioName("twolinkfail"), - "2LinkFailUsagePct": usagePct("twolinkfail", capacity), - "2LinkFailUsageGbps": usageGbps("twolinkfail"), - "2LinkFailUsageExceed": exceed("twolinkfail", fail_thr, capacity), - "2LinkFailUsageTime": scenarioTimestamp("twolinkfail"), - - "1NodeFailEnabled": scenarioEnabled("onenodefail"), - "1NodeFailScenario": failScenarioName("onenodefail"), - "1NodeFailUsagePct": usagePct("onenodefail", capacity), - "1NodeFailUsageGbps": usageGbps("onenodefail"), - "1NodeFailUsageExceed": exceed("onenodefail", fail_thr, capacity), - "1NodeFailUsageTime": scenarioTimestamp("onenodefail"), - - "Node+1LinkFailEnabled": scenarioEnabled("nodelinkfail"), - "Node+1LinkFailScenario": failScenarioName("nodelinkfail"), - "Node+1LinkFailUsagePct": usagePct("nodelinkfail", capacity), - "Node+1LinkFailUsageGbps": usageGbps("nodelinkfail"), - "Node+1LinkFailUsageExceed": exceed("nodelinkfail", fail_thr, capacity), - "Node+1LinkFailUsageTime": scenarioTimestamp("nodelinkfail"), - - "MustUpgrade": mustUpgrade, - "ShouldUpgrade": shouldUpgrade - }) - - df_raw= pd.DataFrame(final_report_raw, columns=["ID","NodeA","NodeB","CapacityGbps", - "ConfNormalThrPct","ConfNormalThrGbps", - "ConfFailThrPct","ConfFailThrGbps", - "NormalEnabled","NormalUsagePct","NormalUsageGbps","NormalUsageExceed","NormalDateTime", - "1LinkFailEnabled","1LinkFailScenario","1LinkFailUsagePct","1LinkFailUsageGbps","1LinkFailUsageExceed","1LinkFailUsageTime", - "2LinkFailEnabled","2LinkFailScenario","2LinkFailUsagePct","2LinkFailUsageGbps","2LinkFailUsageExceed","2LinkFailUsageTime", - "1NodeFailEnabled","1NodeFailScenario","1NodeFailUsagePct","1NodeFailUsageGbps","1NodeFailUsageExceed","1NodeFailUsageTime", - "Node+1LinkFailEnabled","Node+1LinkFailScenario","Node+1LinkFailUsagePct","Node+1LinkFailUsageGbps","Node+1LinkFailUsageExceed","Node+1LinkFailUsageTime", - "MustUpgrade","ShouldUpgrade",]) - df_raw.set_index("ID", inplace=True) - - return df_raw - -def store_raw_report(df_raw): - directory = Path(RAW_REPORT_DIRECTORY) - directory.mkdir(parents=True, exist_ok=True) # Create directory if it doesn't exist - - # Create a ISO8601 basic format UTC timestamped filename - timestamp = datetime.now(timezone.utc).strftime(ISO8601_FORMAT) - filename = f'{RAW_REPORT_FILE_PREFIX}{timestamp}.{RAW_REPORT_FILE_SUFFIX}' - filepath = directory / filename - - df_raw.to_csv(filepath, sep=',', encoding='utf-8', date_format=ISO8601_FORMAT, header=True) - - -############################################################################### -# MAIN -############################################################################### - -def main(): - # Record and display the start time of the script - start_time = datetime.now(timezone.utc) - print(f"===============================================================") - print("Execution started at:", start_time.strftime(ISO8601_FORMAT)) - - # 1) query Kentik to build traffic demand matrices - print(f"Querying Kentik API for traffic demand matrix...") - kentik_json = ktm.fetch_kentik_trafic_matrix() - print(f"Processing traffic demand matrices from Kentik data...") - traffic_matrices= ktm.kentik_to_traffic_matrices(kentik_json, nettopo.NODES) # returns traffic_matrices[timestamp][ingress][egress] = traffic_rate_Mbps - - # 2) build graph and lookup data structures - graph= build_graph(nettopo.NODES, nettopo.CORELINKS) - corelink_map= build_corelink_map(nettopo.CORELINKS) - srlg_map= build_srlg_map(nettopo.CORELINKS) - - # 3) define tasks to run in parallel - tasks= build_parallel_tasks(graph, nettopo.NODES, nettopo.NODE_FAILOVER_RATIOS, corelink_map, srlg_map, traffic_matrices) - print(f"Number of traffic simulation tasks: {len(tasks)} across {len(traffic_matrices)} timeslots with {len(tasks)/len(traffic_matrices):.0f} scenarios each") - - # 4) now run all tasks in a single global pool - print(f"Executing parallel traffic simulation tasks...") - with multiprocessing.Pool() as pool: - tasks_results= pool.map(compute_scenario_task, tasks) - - # 5) aggregate link usage across all tasks, recording the worst-case (highest) load on each link - aggLinkUsage= link_usage_aggregator(corelink_map, tasks_results) - - # 6) create and store final report - raw_report= build_raw_report(corelink_map, aggLinkUsage) - store_raw_report(raw_report) - - # Display the end time, and total execution duration - end_time = datetime.now(timezone.utc) - duration = end_time - start_time - print("Execution ended at:", end_time.strftime(ISO8601_FORMAT)) - print("Total duration in seconds:", duration.total_seconds()) - print(f"===============================================================") - -if __name__=="__main__": - main() diff --git a/capacity-planner/__init__.py b/capacity_planner/__init__.py similarity index 100% rename from capacity-planner/__init__.py rename to capacity_planner/__init__.py diff --git a/capacity-planner/alembic.ini b/capacity_planner/alembic.ini similarity index 100% rename from capacity-planner/alembic.ini rename to capacity_planner/alembic.ini diff --git a/capacity-planner/alembic/env.py b/capacity_planner/alembic/env.py similarity index 91% rename from capacity-planner/alembic/env.py rename to capacity_planner/alembic/env.py index 36112a3c68590d6a8e07fea0ce70a5afb38c951a..9dd6c6cecaa25bbf54a9f2ad228ba0e12e1889e9 100644 --- a/capacity-planner/alembic/env.py +++ b/capacity_planner/alembic/env.py @@ -1,9 +1,7 @@ from logging.config import fileConfig -from sqlalchemy import engine_from_config -from sqlalchemy import pool - from alembic import context +from sqlalchemy import engine_from_config, pool # this is the Alembic Config object, which provides # access to the values within the .ini file in use. @@ -64,9 +62,7 @@ def run_migrations_online() -> None: ) with connectable.connect() as connection: - context.configure( - connection=connection, target_metadata=target_metadata - ) + context.configure(connection=connection, target_metadata=target_metadata) with context.begin_transaction(): context.run_migrations() diff --git a/capacity-planner/alembic/script.py.mako b/capacity_planner/alembic/script.py.mako similarity index 100% rename from capacity-planner/alembic/script.py.mako rename to capacity_planner/alembic/script.py.mako diff --git a/capacity_planner/capacityreport.py b/capacity_planner/capacityreport.py new file mode 100644 index 0000000000000000000000000000000000000000..db46ac675f7db4557038c1e7dcbf7d7faa996f18 --- /dev/null +++ b/capacity_planner/capacityreport.py @@ -0,0 +1,403 @@ +import argparse +import os +import re +import sys +from datetime import UTC, datetime, timedelta +from pathlib import Path + +import numpy as np +import pandas as pd # https://pandas.pydata.org + +############################################################################### +# INPUT DATA SECTION +############################################################################### + +# make sure this matches with the What-If Scenario runner script +RAW_REPORT_DIRECTORY = "/Users/daniel.verlouw/Desktop/rawreports" +RAW_REPORT_FILE_PREFIX = "raw_capacityreport_" +RAW_REPORT_FILE_SUFFIX = "csv" + +CONSOLIDATED_REPORT_DIRECTORY = "/Users/daniel.verlouw/Desktop/consolidatedreports" +CONSOLIDATED_REPORT_FILE_PREFIX = "consolidated_capacityreport_" +CONSOLIDATED_REPORT_FILE_SUFFIX_RAW = "csv" +CONSOLIDATED_REPORT_FILE_SUFFIX_HUMAN = "txt" + +# Scenarios triggering a 'MUST UPGRADE' if threshold is exceeded +MUST_UPGRADE_SCENARIOS = ["normal", "onelinkfail", "onenodefail"] +# Scenarios triggering a 'SHOULD UPGRADE' if threshold is exceeded +SHOULD_UPGRADE_SCENARIOS = ["twolinkfail", "nodelinkfail"] + +# ISO 8601 basic timestamp format (YYYYMMDDTHHMMZ) +ISO8601_FORMAT = "%Y%m%dT%H%MZ" +ISO8601_REGEXP = r"\d{4}\d{2}\d{2}T\d{2}\d{2}Z" + + +############################################################################### +# RAW CONSOLIDATED REPORT +############################################################################### + + +# --- Helper function to get the row of the max usage for a given column, handling empty/missing columns --- +def get_max_usage_row(group, usage_col): + """Returns a single row (as a Series) within `group` that has the maximum value in `usage_col`. + If `usage_col` does not exist or is entirely NaN, returns None. + """ + # If the column doesn't exist or has all null values, return None + if usage_col not in group.columns or group[usage_col].dropna().empty: + return None + max_val = group[usage_col].max() + # Filter to rows where usage_col equals the maximum value. + max_rows = group[group[usage_col] == max_val] + # Return only the first row among those with the maximum value. + return max_rows.iloc[0] + + +def extract_usage_details(group): + """For a single group of rows (all links with the same ID), find the row with the max usage for each usage field (Gbps) + and extract the relevant columns. + Booleans are set to True if at least one row in the group is True. + """ + # We'll create a dict to hold the final data for this ID. + out = {} + + # Basic info columns (these are assumed to be consistent within the group) + first_row = group.iloc[0] + out["ID"] = first_row.get("ID", None) + out["NodeA"] = first_row.get("NodeA", None) + out["NodeB"] = first_row.get("NodeB", None) + out["CapacityGbps"] = first_row.get("CapacityGbps", None) + out["ConfNormalThrPct"] = first_row.get("ConfNormalThrPct", None) + out["ConfNormalThrGbps"] = first_row.get("ConfNormalThrGbps", None) + out["ConfFailThrPct"] = first_row.get("ConfFailThrPct", None) + out["ConfFailThrGbps"] = first_row.get("ConfFailThrGbps", None) + + # Normal usage + normal_row = get_max_usage_row(group, "NormalUsageGbps") + out["NormalEnabled"] = bool(group["NormalEnabled"].any()) if "NormalEnabled" in group.columns else False + out["NormalUsagePct"] = normal_row.get("NormalUsagePct", np.nan) if normal_row is not None else np.nan + out["NormalUsageGbps"] = normal_row.get("NormalUsageGbps", np.nan) if normal_row is not None else np.nan + out["NormalUsageExceed"] = bool(group["NormalUsageExceed"].any()) if "NormalUsageExceed" in group.columns else False + out["NormalDateTime"] = normal_row.get("NormalDateTime", pd.NaT) if normal_row is not None else pd.NaT + + # 1 Link Fail usage + one_link_row = get_max_usage_row(group, "1LinkFailUsageGbps") + out["1LinkFailEnabled"] = bool(group["1LinkFailEnabled"].any()) if "1LinkFailEnabled" in group.columns else False + out["1LinkFailScenario"] = one_link_row.get("1LinkFailScenario", None) if one_link_row is not None else None + out["1LinkFailUsagePct"] = one_link_row.get("1LinkFailUsagePct", np.nan) if one_link_row is not None else np.nan + out["1LinkFailUsageGbps"] = one_link_row.get("1LinkFailUsageGbps", np.nan) if one_link_row is not None else np.nan + out["1LinkFailUsageExceed"] = ( + bool(group["1LinkFailUsageExceed"].any()) if "1LinkFailUsageExceed" in group.columns else False + ) + out["1LinkFailUsageTime"] = one_link_row.get("1LinkFailUsageTime", pd.NaT) if one_link_row is not None else pd.NaT + + # 2 Link Fail usage + two_link_row = get_max_usage_row(group, "2LinkFailUsageGbps") + out["2LinkFailEnabled"] = bool(group["2LinkFailEnabled"].any()) if "2LinkFailEnabled" in group.columns else False + out["2LinkFailScenario"] = two_link_row.get("2LinkFailScenario", None) if two_link_row is not None else None + out["2LinkFailUsagePct"] = two_link_row.get("2LinkFailUsagePct", np.nan) if two_link_row is not None else np.nan + out["2LinkFailUsageGbps"] = two_link_row.get("2LinkFailUsageGbps", np.nan) if two_link_row is not None else np.nan + out["2LinkFailUsageExceed"] = ( + bool(group["2LinkFailUsageExceed"].any()) if "2LinkFailUsageExceed" in group.columns else False + ) + out["2LinkFailUsageTime"] = two_link_row.get("2LinkFailUsageTime", pd.NaT) if two_link_row is not None else pd.NaT + + # 1 Node Fail usage + one_node_row = get_max_usage_row(group, "1NodeFailUsageGbps") + out["1NodeFailEnabled"] = bool(group["1NodeFailEnabled"].any()) if "1NodeFailEnabled" in group.columns else False + out["1NodeFailScenario"] = one_node_row.get("1NodeFailScenario", None) if one_node_row is not None else None + out["1NodeFailUsagePct"] = one_node_row.get("1NodeFailUsagePct", np.nan) if one_node_row is not None else np.nan + out["1NodeFailUsageGbps"] = one_node_row.get("1NodeFailUsageGbps", np.nan) if one_node_row is not None else np.nan + out["1NodeFailUsageExceed"] = ( + bool(group["1NodeFailUsageExceed"].any()) if "1NodeFailUsageExceed" in group.columns else False + ) + out["1NodeFailUsageTime"] = one_node_row.get("1NodeFailUsageTime", pd.NaT) if one_node_row is not None else pd.NaT + + # Node+1 Link Fail usage + node_plus_link_row = get_max_usage_row(group, "Node+1LinkFailUsageGbps") + out["Node+1LinkFailEnabled"] = ( + bool(group["Node+1LinkFailEnabled"].any()) if "Node+1LinkFailEnabled" in group.columns else False + ) + out["Node+1LinkFailScenario"] = ( + node_plus_link_row.get("Node+1LinkFailScenario", None) if node_plus_link_row is not None else None + ) + out["Node+1LinkFailUsagePct"] = ( + node_plus_link_row.get("Node+1LinkFailUsagePct", np.nan) if node_plus_link_row is not None else np.nan + ) + out["Node+1LinkFailUsageGbps"] = ( + node_plus_link_row.get("Node+1LinkFailUsageGbps", np.nan) if node_plus_link_row is not None else np.nan + ) + out["Node+1LinkFailUsageExceed"] = ( + bool(group["Node+1LinkFailUsageExceed"].any()) if "Node+1LinkFailUsageExceed" in group.columns else False + ) + out["Node+1LinkFailUsageTime"] = ( + node_plus_link_row.get("Node+1LinkFailUsageTime", pd.NaT) if node_plus_link_row is not None else pd.NaT + ) + + # Finally, consolidated upgrade flags + out["MustUpgrade"] = bool(group["MustUpgrade"].any()) if "MustUpgrade" in group.columns else False + out["ShouldUpgrade"] = bool(group["ShouldUpgrade"].any()) if "ShouldUpgrade" in group.columns else False + + return pd.Series(out) + + +############################################################################### +# HUMAN READABLE CONSOLIDATED REPORT +############################################################################### + + +def build_human_report(df_raw): + df_human = df_raw.copy() + + # Helper formatting functions + def mark_upgrade(scenario, exceed): + if scenario in MUST_UPGRADE_SCENARIOS and exceed: + return " (**)" + if scenario in SHOULD_UPGRADE_SCENARIOS and exceed: + return " (*)" + return "" + + def normal_thr(row): + return f"{row['ConfNormalThrPct'] * 100:.0f}% / {row['ConfNormalThrGbps']:.0f}G" + + def fail_thr(row): + return f"{row['ConfFailThrPct'] * 100:.0f}% / {row['ConfFailThrGbps']:.0f}G" + + def normal_usage(row): + return f"{row['NormalUsagePct'] * 100:.0f}% / {row['NormalUsageGbps']:.1f}G" + mark_upgrade( + "normal", row["NormalUsageExceed"] + ) + + def onelinkfail_usage(row): + return f"{row['1LinkFailUsagePct'] * 100:.0f}% / {row['1LinkFailUsageGbps']:.1f}G" + mark_upgrade( + "onelinkfail", row["1LinkFailUsageExceed"] + ) + + def twolinkfail_usage(row): + return f"{row['2LinkFailUsagePct'] * 100:.0f}% / {row['2LinkFailUsageGbps']:.1f}G" + mark_upgrade( + "twolinkfail", row["2LinkFailUsageExceed"] + ) + + def onenodefail_usage(row): + return f"{row['1NodeFailUsagePct'] * 100:.0f}% / {row['1NodeFailUsageGbps']:.1f}G" + mark_upgrade( + "onenodefail", row["1NodeFailUsageExceed"] + ) + + def nodelinkfail_usage(row): + return f"{row['Node+1LinkFailUsagePct'] * 100:.0f}% / {row['Node+1LinkFailUsageGbps']:.1f}G" + mark_upgrade( + "nodelinkfail", row["Node+1LinkFailUsageExceed"] + ) + + def upgrade(row): + return "MUST (**)" if row["MustUpgrade"] else ("SHOULD (*)" if row["ShouldUpgrade"] else "NO") + + df_human["ConfNormalThr"] = df_human.apply(normal_thr, axis=1) + df_human["ConfFailThr"] = df_human.apply(fail_thr, axis=1) + df_human["NormalUsage"] = df_human.apply(normal_usage, axis=1) + df_human["1LinkFailUsage"] = df_human.apply(onelinkfail_usage, axis=1) + df_human["2LinkFailUsage"] = df_human.apply(twolinkfail_usage, axis=1) + df_human["1NodeFailUsage"] = df_human.apply(onenodefail_usage, axis=1) + df_human["Node+1LinkFailUsage"] = df_human.apply(nodelinkfail_usage, axis=1) + df_human["Upgrade"] = df_human.apply(upgrade, axis=1) + + # Drop unused columns + df_human.drop(["ConfNormalThrPct", "ConfNormalThrGbps"], axis=1, inplace=True) + df_human.drop(["ConfFailThrPct", "ConfFailThrGbps"], axis=1, inplace=True) + df_human.drop(["NormalUsagePct", "NormalUsageGbps", "NormalUsageExceed"], axis=1, inplace=True) + df_human.drop( + ["1LinkFailUsagePct", "1LinkFailUsageGbps", "1LinkFailUsageExceed", "1LinkFailUsageTime"], axis=1, inplace=True + ) + df_human.drop( + ["2LinkFailUsagePct", "2LinkFailUsageGbps", "2LinkFailUsageExceed", "2LinkFailUsageTime"], axis=1, inplace=True + ) + df_human.drop( + ["1NodeFailUsagePct", "1NodeFailUsageGbps", "1NodeFailUsageExceed", "1NodeFailUsageTime"], axis=1, inplace=True + ) + df_human.drop( + ["Node+1LinkFailUsagePct", "Node+1LinkFailUsageGbps", "Node+1LinkFailUsageExceed", "Node+1LinkFailUsageTime"], + axis=1, + inplace=True, + ) + df_human.drop(["MustUpgrade", "ShouldUpgrade"], axis=1, inplace=True) + + # Replace NaN and NaT values with "N/A" + df_human["NormalDateTime"] = df_human["NormalDateTime"].astype(object) + df_human.fillna("N/A", inplace=True) + + return df_human[ + [ + "NodeA", + "NodeB", + "CapacityGbps", + "ConfNormalThr", + "ConfFailThr", + "NormalUsage", + "NormalDateTime", + "1LinkFailScenario", + "1LinkFailUsage", + "2LinkFailScenario", + "2LinkFailUsage", + "1NodeFailScenario", + "1NodeFailUsage", + "Node+1LinkFailScenario", + "Node+1LinkFailUsage", + "Upgrade", + ] + ] + + +############################################################################### +# FILE FUNCTIONS +############################################################################### + + +def find_files_by_timeframe(directory, prefix, suffix, start_datetime, end_datetime): + # List all raw reports in directory + all_raw_reports = [ + file + for file in os.listdir(directory) + if os.path.isfile(os.path.join(directory, file)) + and file.startswith(prefix) + and file.endswith(suffix) + and re.search(ISO8601_REGEXP, file) + ] + + # Filter to files that match the timestamp pattern within the specified datetime range + matching_files = [] + for file in all_raw_reports: + match = re.search(ISO8601_REGEXP, file) + file_date = datetime.strptime(match.group(), ISO8601_FORMAT).replace(tzinfo=UTC) + + if start_datetime <= file_date <= end_datetime: + matching_files.append(os.path.join(directory, file)) + + return matching_files + + +def store_consolidated(df_consolidated, directory, prefix, suffix): + path = Path(directory) + path.mkdir(parents=True, exist_ok=True) # Create directory if it doesn't exist + + # Create a ISO8601 basic format UTC timestamped filename + timestamp = datetime.now(UTC).strftime(ISO8601_FORMAT) + filename = f"{prefix}{timestamp}.{suffix}" + + if suffix == "csv": + df_consolidated.to_csv( + os.path.join(path, filename), sep=",", encoding="utf-8", date_format=ISO8601_FORMAT, header=True + ) + + elif suffix == "txt": + markdown = df_consolidated.to_markdown(headers="keys", tablefmt="psql") + # Write the markdown string to a file + with open(os.path.join(path, filename), "w") as file: + file.write(markdown) + + +############################################################################### +# MAIN +############################################################################### + + +def main(): + # Parse commandline arguments + parser = argparse.ArgumentParser(description="Script usage:") + parser.add_argument("--daily", action="store_true", help="Create daily report (past day)") + parser.add_argument("--weekly", action="store_true", help="Create weekly report (past week)") + parser.add_argument("--monthly", action="store_true", help="Create monthly report (past month)") + parser.add_argument("--quarterly", action="store_true", help="Create quarterly report (past quarter)") + parser.add_argument("--yearly", action="store_true", help="Create yearly report (past year)") + + if len(sys.argv) == 1: + # No arguments were provided; print help message. + parser.print_help() + sys.exit(1) + + args = parser.parse_args() + today = datetime.now(UTC).replace(hour=0, minute=0, second=0, microsecond=0) + first_day_of_current_month = today.replace(day=1) + + if args.daily: + start_datetime = today - timedelta(days=1) + end_datetime = today + elif args.weekly: + start_datetime = today - timedelta(weeks=1) + end_datetime = today + elif args.monthly: + # First day of last month + start_datetime = (first_day_of_current_month - timedelta(days=1)).replace(day=1) + # First day of current month + end_datetime = first_day_of_current_month + elif args.quarterly: + # Approximates the quarter as 90 days before the start of the current month. + start_datetime = (first_day_of_current_month - timedelta(days=1)).replace(day=1) - timedelta( + days=(first_day_of_current_month.month - 1) % 3 * 30 + ) + end_datetime = (first_day_of_current_month - timedelta(days=1)).replace(day=1) + elif args.yearly: + # First day of the previous year + start_datetime = today.replace(year=today.year - 1, month=1, day=1) + end_datetime = today.replace(year=today.year, month=1, day=1) + + matching_files = find_files_by_timeframe( + RAW_REPORT_DIRECTORY, RAW_REPORT_FILE_PREFIX, RAW_REPORT_FILE_SUFFIX, start_datetime, end_datetime + ) + + if len(matching_files) > 0: + print( + f"Generating consolidated report for {len(matching_files)} raw reports for timeframe {start_datetime} through {end_datetime}" + ) + + # List of columns that should be parsed as dates from CSV + date_columns = [ + "NormalDateTime", + "1LinkFailUsageTime", + "2LinkFailUsageTime", + "1NodeFailUsageTime", + "Node+1LinkFailUsageTime", + ] + + # Read and concat CSVs + dataframes = [ + pd.read_csv( + file, + sep=",", + encoding="utf-8", + parse_dates=date_columns, + date_format=ISO8601_FORMAT, + float_precision="round_trip", + ) + for file in matching_files + ] + concat_df = pd.concat(dataframes) + + # Walk over the results for each link and extract and store the highest usage for each scenario + results = [] + for id_val, group in concat_df.groupby("ID"): + details = extract_usage_details(group) + # Overwrite ID with the group key to be sure + details["ID"] = id_val + results.append(details) + + consolidated_raw = pd.DataFrame(results) + consolidated_raw.set_index("ID", inplace=True) + store_consolidated( + consolidated_raw, + CONSOLIDATED_REPORT_DIRECTORY, + CONSOLIDATED_REPORT_FILE_PREFIX, + CONSOLIDATED_REPORT_FILE_SUFFIX_RAW, + ) + + consolidated_human = build_human_report(consolidated_raw) + store_consolidated( + consolidated_human, + CONSOLIDATED_REPORT_DIRECTORY, + CONSOLIDATED_REPORT_FILE_PREFIX, + CONSOLIDATED_REPORT_FILE_SUFFIX_HUMAN, + ) + + else: + print(f"No raw files found for timeframe {start_datetime} through {end_datetime}") + + +if __name__ == "__main__": + main() diff --git a/capacity_planner/services/__init__.py b/capacity_planner/services/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/capacity_planner/services/kentik.py b/capacity_planner/services/kentik.py new file mode 100644 index 0000000000000000000000000000000000000000..c358b67e45be3201f182c57fbf8c0fb5255daa9f --- /dev/null +++ b/capacity_planner/services/kentik.py @@ -0,0 +1,136 @@ +import os + +import requests + +# Extract Kentik API settings from environment variables +API_URL = os.environ["KENTIK_API_URL"] +API_EMAIL = os.environ["KENTIK_API_EMAIL"] +API_TOKEN = os.environ["KENTIK_API_TOKEN"] + +# Kentik report period (in minutes) +KENTIK_REPORTING_PERIOD = 60 +# Kentik flow aggregation window (in minutes -- lower values improve accuracy at the expense of processing time) +KENTIK_FLOW_AGGR_WINDOW = 5 + +# aggregate ingress (source) and egress (destination) on the Kentik 'Site'-level +INGRESS_DIMENSION = "i_device_site_name" +EGRESS_DIMENSION = "i_ult_exit_site" + +############################################################################### +# Fetch traffic matrix from the Kentik API +############################################################################### + + +def _api_query(payload): + # Headers for authentication + headers = {"Content-Type": "application/json", "X-CH-Auth-Email": API_EMAIL, "X-CH-Auth-API-Token": API_TOKEN} + response = requests.post(API_URL, headers=headers, json=payload) + + if response.status_code == 200: + return response.json() + print(f"Error fetching data from Kentik API: {response.status_code} - {response.text}") + return None + + +def fetch_kentik_traffic_matrix(): + # JSON query payload + payload = { + "version": 4, + "queries": [ + { + "bucket": "Table", + "isOverlay": False, + "query": { + "all_devices": True, + "aggregateTypes": ["max_in_bits_per_sec"], + "depth": 350, + "topx": 350, # 350=max supported by Kentik + "device_name": [], + "fastData": "Auto", + "lookback_seconds": 60 * KENTIK_REPORTING_PERIOD, + # "starting_time": null, + # "ending_time": null, + "matrixBy": [], + "metric": ["in_bytes"], + "minsPolling": KENTIK_FLOW_AGGR_WINDOW, + "forceMinsPolling": True, + "outsort": "max_in_bits_per_sec", + "viz_type": "table", + "aggregates": [ + { + "value": "max_in_bits_per_sec", + "column": "f_sum_in_bytes", + "fn": "max", + "label": "Bits/s Sampled at Ingress Max", + "unit": "in_bytes", + "parentUnit": "bytes", + "group": "Bits/s Sampled at Ingress", + "origLabel": "Max", + "sample_rate": 1, + "raw": True, + "name": "max_in_bits_per_sec", + } + ], + "filters": { + "connector": "All", + "filterGroups": [ + { + "connector": "All", + "filters": [ + { + "filterField": EGRESS_DIMENSION, + "metric": "", + "aggregate": "", + "operator": "<>", + "filterValue": "", # Filter unassigned/unknown destinations + "rightFilterField": "", + } + ], + "filterGroups": [], + } + ], + }, + "dimension": [INGRESS_DIMENSION, EGRESS_DIMENSION], + }, + } + ], + } + + response = _api_query(payload) + return response + + +############################################################################### +# Transform Kentik data to traffic matrices (one matrix per timestamp) +############################################################################### + + +def kentik_to_traffic_matrices(json_data, nodes): + """Convert the given JSON structure returned by Kentik into a dictionary + keyed by timestamp. For each timestamp, we store a nested dict of: + traffic_matrices[timestamp][ingress][egress] = traffic_rate_Mbps + """ + # We'll gather all flows in the JSON + data_entries = json_data["results"][0]["data"] + + # This will be our main lookup: { ts -> {ingress -> { egress -> rate}} } + traffic_matrices = {} + + for entry in data_entries: + ingress = entry[INGRESS_DIMENSION] + egress = entry[EGRESS_DIMENSION] + + # skip unknown ingress and/or egress nodes + if ingress not in nodes: + continue + if egress not in nodes: + continue + + # timeSeries -> in_bits_per_sec -> flow => list of [timestamp, in_bits_per_sec, ignored_interval] + flow_list = entry["timeSeries"]["in_bits_per_sec"]["flow"] + for timestamp, bits_per_sec, _ignored_interval in flow_list: + traffic_matrices.setdefault(timestamp, {}) + traffic_matrices[timestamp].setdefault(ingress, {}) + traffic_matrices[timestamp][ingress][egress] = int(bits_per_sec / 1_000_000) # convert bps to Mbps + + return traffic_matrices diff --git a/capacity_planner/utils/__init__.py b/capacity_planner/utils/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/capacity_planner/utils/topology.py b/capacity_planner/utils/topology.py new file mode 100644 index 0000000000000000000000000000000000000000..f1f252b2b0e94eb60a05f35d016ceb5efa580f42 --- /dev/null +++ b/capacity_planner/utils/topology.py @@ -0,0 +1,147 @@ +# (A) Define each core link: +# ( linkID, nodeA, nodeB, igp_metric, capacity, srlg_list, [normal_threshold], [failure_threshold] ) +# where: +# linkID: network-wide unique numeric ID (e.g. 1001) +# nodeA, nodeB: core link endpoints +# igp_metric: IGP cost/distance +# capacity: full-duplex link capacity in Gbps +# srlg_list: list of Shared Risk Link Group (SRLG) names (or empty) +# normal_threshold: fraction for normal usage. If omitted default is used +# failure_threshold: fraction for usage under failure. If omitted default is used +CORELINKS = [ + (1, "AMS", "FRA", 2016, 800, []), + (2, "AMS", "LON", 1428, 800, []), + (3, "FRA", "GEN", 2478, 800, []), + (4, "GEN", "PAR", 2421, 800, []), + (5, "LON", "LON2", 180, 800, []), + (6, "LON2", "PAR", 1866, 800, []), + (7, "FRA", "PRA", 3315, 300, []), + (8, "GEN", "MIL2", 4280, 300, []), + (9, "GEN", "MAR", 2915, 400, []), + (10, "HAM", "POZ", 3435, 300, []), + (11, "MAR", "MIL2", 3850, 300, []), + (12, "MIL2", "VIE", 5400, 400, []), + (13, "POZ", "PRA", 3215, 300, []), + (14, "PRA", "VIE", 2255, 300, []), + (15, "AMS", "HAM", 3270, 300, []), + (16, "BRU", "PAR", 50000, 100, []), + (17, "AMS", "BRU", 50000, 100, []), + (18, "BIL", "PAR", 5600, 300, []), + (19, "BUD", "ZAG", 2370, 200, []), + (20, "COR", "DUB", 5500, 100, []), + (21, "DUB", "LON", 8850, 100, []), + (22, "MAD", "MAR", 6805, 300, []), + (23, "BUC", "SOF", 5680, 200, []), + (24, "BRA", "VIE", 50000, 100, ["BRA-VIE"]), + (25, "LJU", "MIL2", 3330, 300, []), + (26, "LJU", "ZAG", 985, 200, []), + (27, "BUC", "BUD", 11850, 200, []), + (28, "LIS", "MAD", 8970, 200, []), + (29, "BIL", "POR", 9250, 200, []), + (30, "LIS", "POR", 3660, 200, []), + (31, "BIL", "MAD", 4000, 200, []), + (32, "ATH2", "MIL2", 25840, 100, ["MIL2-PRE"]), + (33, "ATH2", "THE", 8200, 100, []), + (34, "SOF", "THE", 5800, 100, []), + # ("COP", "HAM", 480, 400, []), + # ("COP", "STO", 600, 400, []), + # ("HEL", "STO", 630, 400, []), + (35, "RIG", "TAR", 2900, 100, []), + (36, "KAU", "POZ", 10050, 100, []), + # ("BEL", "SOF", 444, 400, []), + # ("BEL", "ZAG", 528, 400, []), + (37, "ZAG", "SOF", 9720, 200, []), + (38, "BRA", "BUD", 50000, 100, ["BRA-BUD"]), + (39, "COR", "LON2", 7160, 100, ["COR-LON2"]), + # ("HEL", "TAR", 227, 400, []), + (40, "KAU", "RIG", 4500, 100, []), + # ("BRU", "LUX", 380, 400, []), + # ("FRA", "LUX", 312, 400, []), + # ("RIG", "STO", 400, 400, []), + # ("COP", "POZ", 750, 400, []), + (41, "COR", "PAR", 12549, 100, ["COR-LON2"]), + (42, "KIE", "POZ", 50000, 100, []), + (43, "CHI", "BUC", 50000, 40, []), + (44, "CHI", "KIE", 10000, 100, []), + (45, "HAM", "TAR", 12490, 100, []), + (46, "BUD", "VIE", 1725, 200, ["BRA-BUD", "BRA-VIE"]), + (47, "MIL2", "THE", 29200, 100, ["MIL2-PRE"]), + (48, "AMS", "LON", 50000, 100, []), # backup link via SURF + (49, "BUC", "FRA", 50000, 100, []), # backup link via TTI +] + +# (B) List of nodes (must match the node names in CORELINKS and node/site names retrieved from Kentik) +# to be added: "BEL", "COP","HEL","LUX","STO" +NODES = [ + "AMS", + "ATH2", + "BIL", + "BRA", + "BRU", + "BUC", + "BUD", + "CHI", + "COR", + "DUB", + "FRA", + "GEN", + "HAM", + "KAU", + "KIE", + "LIS", + "LJU", + "LON", + "LON2", + "MAD", + "MAR", + "MIL2", + "PAR", + "POR", + "POZ", + "PRA", + "RIG", + "SOF", + "TAR", + "THE", + "VIE", + "ZAG", +] + +# Node failover ratio map for single-node fails +NODE_FAILOVER_RATIOS = { + "AMS": {"LON": 0.6, "FRA": 0.4}, + "ATH": {"THE": 0.5, "MAR": 0.5}, + # "BEL": {"ZAG": 1.0 }, + "BIL": {"MAD": 1.0}, + "BRA": {"VIE": 1.0}, + "BRU": {"AMS": 1.0}, + "BUC": {"VIE": 0.5, "SOF": 0.5}, + "BUD": {"ZAG": 1.0}, + # "COP": {"HAM": 0.5, "STO": 0.5 }, + "COR": {"DUB": 1.0}, + "DUB": {"COR": 1.0}, + "FRA": {"HAM": 0.4, "AMS": 0.4, "LON": 0.2}, + "GEN": {"PAR": 0.6, "MIL2": 0.4}, + "HAM": {"FRA": 0.5, "POZ": 0.2, "LON": 0.3}, + # "HEL": {"TAR": 0.3, "HAM": 0.7 }, + "KAU": {"RIG": 1.0}, + "LIS": {"POR": 1.0}, + "LJU": {"ZAG": 1.0}, + "LON": {"AMS": 0.4, "HAM": 0.2, "FRA": 0.4}, + "LON2": {"LON": 1.0}, + # "LUX": {"BRU": 1.0 }, + "MAD": {"BIL": 1.0}, + "MAR": {"MIL2": 0.6, "ATH": 0.4}, + "MIL2": {"GEN": 0.3, "MAR": 0.3, "VIE": 0.3}, + "PAR": {"GEN": 1.0}, + "POR": {"LIS": 1.0}, + "POZ": {"HAM": 1.0}, + "PRA": {"VIE": 1.0}, + "RIG": {"KAU": 1.0}, + "SOF": {"THE": 0.5, "BUC": 0.5}, + # "STO": {"COP": 0.5, "HEL": 0.5 }, + "TAR": {"RIG": 1.0}, + "THE": {"ATH": 0.5, "SOF": 0.5}, + "VIE": {"MIL2": 0.6, "PRA": 0.2, "BUC": 0.2}, + "ZAG": {"BUD": 0.5, "LJU": 0.5}, +} diff --git a/capacity_planner/whatifscenarios.py b/capacity_planner/whatifscenarios.py new file mode 100644 index 0000000000000000000000000000000000000000..2636554c631521800e691f39b5a2aae84b653a25 --- /dev/null +++ b/capacity_planner/whatifscenarios.py @@ -0,0 +1,681 @@ +import itertools +import math +import multiprocessing +from datetime import UTC, datetime +from pathlib import Path + +from capacity_planner.utils import topology as nettopo +import networkx as nx +import pandas as pd +from capacity_planner.services import kentik + +############################################################################### +# 1) INPUT DATA SECTION +############################################################################### + +# If normal threshold not specified on a link, default to 0.40 (allowing upto 40% link utilization in normal non-failure scenario) +DEFAULT_NORMAL_THRESHOLD = 0.40 +# If failure threshold not specified on a link, default to 0.80 (allowing upto 80% link utilization in worst-case failure scenario) +DEFAULT_FAILURE_THRESHOLD = 0.80 + +ENABLED_SCENARIOS = ["normal", "onelinkfail", "twolinkfail", "onenodefail", "nodelinkfail"] + +# Scenarios triggering a 'MUST UPGRADE' if threshold is exceeded +MUST_UPGRADE_SCENARIOS = ["normal", "onelinkfail", "onenodefail"] +# Scenarios triggering a 'SHOULD UPGRADE' if threshold is exceeded +SHOULD_UPGRADE_SCENARIOS = ["twolinkfail", "nodelinkfail"] + +RAW_REPORT_DIRECTORY = "/Users/daniel.verlouw/Desktop/rawreports" +RAW_REPORT_FILE_PREFIX = "raw_capacityreport_" +RAW_REPORT_FILE_SUFFIX = "csv" + +# ISO 8601 basic timestamp format (YYYYMMDDTHHMMZ) +ISO8601_FORMAT = "%Y%m%dT%H%MZ" +ISO8601_REGEXP = r"\d{4}\d{2}\d{2}T\d{2}\d{2}Z" + +############################################################################### +# 2) Build a local NetworkX multi-directional graph for scenario computations +############################################################################### + + +def build_graph(nodes, links): + # MultiDiGraph supports multiple edges (ECMP links) between two nodes + # and is directional, allowing modelling of Full-Duplex links and taking + # traffic direction into account + graph = nx.MultiDiGraph() + graph.add_nodes_from(nodes) + + for link in links: + # ( linkID, nodeA, nodeB, igp_metric, capacity, srlg_list, [norm_thr], [fail_thr] ) + lID = link[0] + nodeA = link[1] + nodeB = link[2] + igp = link[3] + + # Add Full-Duplex edges in both directions with linkID + # assumes the IGP metric is configured symmetrically + graph.add_edge(nodeA, nodeB, key=lID, igpmetric=igp) + graph.add_edge(nodeB, nodeA, key=lID, igpmetric=igp) + + return graph + + +############################################################################### +# 3) Helper functions to ease lookups for core links and SRLGs +############################################################################### + + +def build_corelink_map(links): + # Creating a lookup dictionary for fast lookup by linkID + corelink_dict = {link[0]: link for link in links} + return corelink_dict + + +def build_srlg_map(links): + # Maps SRLG name to one or more core links (linkIDs) + srlg_map = {} + + for link in links: + # ( linkID, nodeA, nodeB, igp_metric, capacity, srlg_list, [norm_thr], [fail_thr] ) + lID = link[0] + nodeA = link[1] + nodeB = link[2] + igp = link[3] + + if len(link) > 5 and isinstance(link[5], list): + for srlg in link[5]: + srlg_map.setdefault(srlg, set()) + srlg_map[srlg].add(lID) + + return srlg_map + + +############################################################################### +# 4) Node failover logic - redistribute traffic to other core nodes +############################################################################### + + +def redistribute_local_traffic(t_xx, ratio_map): + """We treat local traffic T[X,X] as two endpoints at X. + Each endpoint re-homes to neighbor i or j with probability ratio_map[i], ratio_map[j]. + => fraction = ratio_map[i]*ratio_map[j]*t_xx => T[i,j]. + If i=j, it's T[i,i] (local on neighbor i). + Returns local_matrix[i][j] = fraction (Mbps). + """ + local_matrix = {} + nbrs = list(ratio_map.keys()) + for i in nbrs: + local_matrix.setdefault(i, {}) + for j in nbrs: + local_matrix[i][j] = ratio_map[i] * ratio_map[j] * t_xx + + return local_matrix + + +def build_traffic_with_node_fail(baseTraffic, nodefail_ratios, failingNode): + """Return scenario-specific traffic matrix in Mbps, + re-homing T[failingNode,*], T[*,failingNode], T[failingNode,failingNode]. + """ + scenario_traffic = {} + ratio_map = nodefail_ratios.get(failingNode, {}) + + for s in baseTraffic: + scenario_traffic[s] = {} + for d in baseTraffic[s]: + val = baseTraffic[s][d] + if val <= 0: + scenario_traffic[s][d] = 0 + continue + + # traffic is local - T[failingNode,failingNode] + if s == failingNode and d == failingNode: + scenario_traffic[s][d] = 0 + mat = redistribute_local_traffic(val, ratio_map) + for i in mat: + for j in mat[i]: + portion = mat[i][j] + if portion > 0: + scenario_traffic.setdefault(i, {}) + scenario_traffic[i][j] = scenario_traffic[i].get(j, 0) + portion + + # T[failingNode,*] + elif s == failingNode: + scenario_traffic[s][d] = 0 + for nbr, r in ratio_map.items(): + scenario_traffic.setdefault(nbr, {}) + scenario_traffic[nbr][d] = scenario_traffic[nbr].get(d, 0) + val * r + + # T[*,failingNode] + elif d == failingNode: + scenario_traffic[s][d] = 0 + for nbr, r in ratio_map.items(): + scenario_traffic[s][nbr] = scenario_traffic[s].get(nbr, 0) + val * r + + # Unaffected + else: + scenario_traffic[s][d] = val + + return scenario_traffic + + +############################################################################### +# 5) Remove SRLG or corelink from graph +############################################################################### + + +def remove_corelink(graph, removeLink): + edges_rm = [] + for u, v, lID in graph.edges(keys=True): + if lID == removeLink: + edges_rm.append((u, v, lID)) + for e in edges_rm: + graph.remove_edge(*e) + + +def remove_corelink_or_srlg(graph, failedElement, srlg_map): + etype, data = failedElement + if etype == "LINK": + linkID = data + remove_corelink(graph, linkID) + + elif etype == "SRLG": + if data in srlg_map: + for linkID in srlg_map[data]: + remove_corelink(graph, linkID) + + +############################################################################### +# 6) Compute scenario load in MultiDiGraph +############################################################################### + + +def compute_scenario_load(graph, scenario_traffic): + # initialize empty usage map + usageMap = {} + for u, v, lID in graph.edges(keys=True): + usageMap[u, v, lID] = 0.0 + + # walk over [ingress][egress] traffic demands + for s in scenario_traffic: + for d in scenario_traffic[s]: + demand_mbps = scenario_traffic[s][d] + if demand_mbps <= 0 or s == d or (s not in graph) or (d not in graph): + continue + demand_gbps = demand_mbps / 1000.0 # convert Mbps to Gbps + + # Run Dijkstra algorithm + try: + best_dist = nx.dijkstra_path_length(graph, s, d, weight="igpmetric") + except nx.NetworkXNoPath: + continue + + # Enumerate all shortest paths from (s)ource to (d)estination + all_paths = list(nx.all_shortest_paths(graph, s, d, weight="igpmetric")) + valid = [] + for path in all_paths: + dist_sum = 0 + for i in range(len(path) - 1): + p = path[i] + q = path[i + 1] + # pick minimal igp among parallel edges p->q + best_igp = math.inf + for k2 in graph[p][q]: + igp = graph[p][q][k2]["igpmetric"] + best_igp = min(best_igp, igp) + dist_sum += best_igp + + if math.isclose(dist_sum, best_dist): + valid.append(path) + + if not valid: + continue + + # Now equally divide traffic demand across all shortest paths (ECMP) + share = demand_gbps / len(valid) + for path in valid: + for i in range(len(path) - 1): + p = path[i] + q = path[i + 1] + # pick minimal link p->q + best_igp = math.inf + best_k = None + for k2 in graph[p][q]: + igp = graph[p][q][k2]["igpmetric"] + if igp < best_igp: + best_igp = igp + best_k = k2 + usageMap[p, q, best_k] += share + + return usageMap + + +############################################################################### +# 7) Individual Normal/Failure Scenario Runners +############################################################################### + + +def run_normal_scenario(graph, trafficMatrix): + """No failures => just compute load with the base traffic.""" + return compute_scenario_load(graph, trafficMatrix) + + +def run_onenode_failure(graph, trafficMatrix, nodefail_ratios, nodeFail): + """Remove nodeFail from graph, shift its traffic T[nodeFail,*], T[*,nodeFail], T[nodeFail,nodeFail].""" + if nodeFail in graph: + graph.remove_node(nodeFail) + scenario_traffic = build_traffic_with_node_fail(trafficMatrix, nodefail_ratios, nodeFail) + return compute_scenario_load(graph, scenario_traffic) + + +def run_onelink_failure(graph, trafficMatrix, srlg_map, linkFail): + """LinkFail = ("LINK",(u,v)) or ("SRLG",{(x,y),...}). + Remove link from a copy of graph, then compute load with base traffic. + """ + remove_corelink_or_srlg(graph, linkFail, srlg_map) + return compute_scenario_load(graph, trafficMatrix) + + +def run_twolink_failure(graph, trafficMatrix, srlg_map, linkFail1, linkFail2): + """linkFail1, linkFail2 each = ("LINK",(u,v)) or ("SRLG",{(x,y),...}). + Remove them from a copy of graph, then compute load with base traffic. + """ + remove_corelink_or_srlg(graph, linkFail1, srlg_map) + remove_corelink_or_srlg(graph, linkFail2, srlg_map) + return compute_scenario_load(graph, trafficMatrix) + + +def run_nodelink_failure(graph, trafficMatrix, srlg_map, nodefail_ratios, nodeFail, linkFail): + # first remove node and shift traffic + if nodeFail in graph: + graph.remove_node(nodeFail) + scenario_traffic = build_traffic_with_node_fail(trafficMatrix, nodefail_ratios, nodeFail) + # then remove link + remove_corelink_or_srlg(graph, linkFail, srlg_map) + return compute_scenario_load(graph, scenario_traffic) + + +############################################################################### +# 8) Single scenario task runner +############################################################################### + + +def expand_link_name(corelink_map, linkFail): + """Helper function--return a string describing one scenario element (LINK linkID or SRLG name).""" + etype, data = linkFail + if etype == "LINK": + # get corelink details by ID + td = corelink_map.get(data) + nodeA = td[1] + nodeB = td[2] + return f"LINK_{data}/{nodeA}-{nodeB}" + + if etype == "SRLG": + return f"SRLG_{data}" + + return str(linkFail) + + +def compute_scenario_task(args): + """Each scenario runs as a single task""" + (graph_orig, corelink_map, srlg_map, nodefail_ratios, ts, trafficMatrix, scenarioType, scenarioData) = args + + # 1) first, create a copy of graph so in-memory copy of original is not modified + graph = graph_orig.copy() + + # 2) run selected normal or failure scenario + usageMap = {} + scenarioName = "" + if scenarioType == "normal": + usageMap = run_normal_scenario(graph, trafficMatrix) + scenarioName = "normal" + + elif scenarioType == "onenodefail": + nodeFail = scenarioData + usageMap = run_onenode_failure(graph, trafficMatrix, nodefail_ratios, nodeFail) + scenarioName = f"NODE_{nodeFail}" + + elif scenarioType == "onelinkfail": + linkFail = scenarioData + usageMap = run_onelink_failure(graph, trafficMatrix, srlg_map, linkFail) + scenarioName = f"{expand_link_name(corelink_map, linkFail)}" + + elif scenarioType == "twolinkfail": + (linkFail1, linkFail2) = scenarioData + usageMap = run_twolink_failure(graph, trafficMatrix, srlg_map, linkFail1, linkFail2) + scenarioName = f"{expand_link_name(corelink_map, linkFail1)} + {expand_link_name(corelink_map, linkFail2)}" + + elif scenarioType == "nodelinkfail": + (nodeFail, linkFail) = scenarioData + usageMap = run_nodelink_failure(graph, trafficMatrix, srlg_map, nodefail_ratios, nodeFail, linkFail) + scenarioName = f"NODE_{nodeFail} + {expand_link_name(corelink_map, linkFail)}" + + else: + usageMap = {} + scenarioName = "unknown" + + return (ts, scenarioType, scenarioName, usageMap) + + +############################################################################### +# 9) Build list of tasks to run in parallel +############################################################################### + + +def build_parallel_tasks(graph, nodes, nodefail_ratios, corelink_map, srlg_map, traffic_matrices): + # enumerate link failure scenarios => link linkIDs + srlg + linkFailElements = [] + for link_id, _link_data in corelink_map.items(): + linkFailElements.append(("LINK", link_id)) + + for srlg_name, _srlg_data in srlg_map.items(): + linkFailElements.append(("SRLG", srlg_name)) + + tasks = [] + for ts in traffic_matrices.keys(): + trafficMatrix = traffic_matrices[ts] + + # add normal non-failure scenario + if "normal" in ENABLED_SCENARIOS: + tasks.append((graph, corelink_map, srlg_map, nodefail_ratios, ts, trafficMatrix, "normal", None)) + + # add single link fail scenarios + if "onelinkfail" in ENABLED_SCENARIOS: + for linkFail in linkFailElements: + tasks.append(( + graph, + corelink_map, + srlg_map, + nodefail_ratios, + ts, + trafficMatrix, + "onelinkfail", + linkFail, + )) + + # add unique two link fail scenarios + if "twolinkfail" in ENABLED_SCENARIOS: + fe_twofail_combos = list(itertools.combinations(linkFailElements, 2)) + for linkFail1, linkFail2 in fe_twofail_combos: + tasks.append(( + graph, + corelink_map, + srlg_map, + nodefail_ratios, + ts, + trafficMatrix, + "twolinkfail", + (linkFail1, linkFail2), + )) + + for nodeFail in nodes: + # add node fail scenarios + if "onenodefail" in ENABLED_SCENARIOS: + tasks.append(( + graph, + corelink_map, + srlg_map, + nodefail_ratios, + ts, + trafficMatrix, + "onenodefail", + nodeFail, + )) + + # add node + link fail scenarios + if "nodelinkfail" in ENABLED_SCENARIOS: + for linkFail in linkFailElements: + tasks.append(( + graph, + corelink_map, + srlg_map, + nodefail_ratios, + ts, + trafficMatrix, + "nodelinkfail", + (nodeFail, linkFail), + )) + + return tasks + + +############################################################################### +# 10) Report generator +############################################################################### + + +def link_usage_aggregator(corelink_map, results): + # initialize empty link usage aggregator + aggLinkUsage = {} + for link_id, _link_data in corelink_map.items(): + aggLinkUsage[link_id] = {} + for scenarioType in ENABLED_SCENARIOS: + aggLinkUsage[link_id][scenarioType] = (0.0, pd.NaT, None) + + # unify results across all tasks - for each scenario, record worst-case (highest) load on each link + for ts, scenarioType, scenarioName, usageMap in results: + for u, v, lID in usageMap: + usage = usageMap[u, v, lID] + oldUsage, oldTs, oldSc = aggLinkUsage[lID][scenarioType] + if usage > oldUsage: + aggLinkUsage[lID][scenarioType] = (usage, ts, f"{scenarioName}") + + # return aggLinkUsage[linkID][scenarioType] => (usage,ts,scenarioName) + return aggLinkUsage + + +def build_raw_report(corelink_map, aggLinkUsage): + final_report_raw = [] + + # Helper print functions + def exceed(scenario, thr, capacity): + return aggLinkUsage[lID][scenario][0] > thr * capacity if scenario in ENABLED_SCENARIOS else pd.NA + + def usagePct(scenario, capacity): + # (usage,ts,scenarioName) + return aggLinkUsage[lID][scenario][0] / capacity if scenario in ENABLED_SCENARIOS else pd.NA + + def usageGbps(scenario): + return aggLinkUsage[lID][scenario][0] if scenario in ENABLED_SCENARIOS else pd.NA + + def scenarioTimestamp(scenario): + return ( + pd.to_datetime(aggLinkUsage[lID][scenario][1], unit="ms", errors="coerce", utc=True) + if scenario in ENABLED_SCENARIOS + else pd.NaT + ) + + def failScenarioName(scenario): + return aggLinkUsage[lID][scenario][2] if scenario in ENABLED_SCENARIOS else None + + def scenarioEnabled(scenario): + return scenario in ENABLED_SCENARIOS + + # Iterate over each core link + for link_id, link_data in corelink_map.items(): + # ( linkID, nodeA, nodeB, igp_metric, capacity, srlg_list, [norm_thr], [fail_thr] ) + lID = link_data[0] + nodeA = link_data[1] + nodeB = link_data[2] + capacity = link_data[4] + # parse thresholds if present, else use defaults + norm_thr = ( + link_data[6] if len(link_data) > 6 and isinstance(link_data[6], (float, int)) else DEFAULT_NORMAL_THRESHOLD + ) + fail_thr = ( + link_data[7] if len(link_data) > 7 and isinstance(link_data[7], (float, int)) else DEFAULT_FAILURE_THRESHOLD + ) + + # threshold check + mustUpgrade = False + shouldUpgrade = False + for scenarioType in ENABLED_SCENARIOS: + (usage, ts, scenarioName) = aggLinkUsage[lID][scenarioType] + thr = 1 + if scenarioType == "normal" and norm_thr > 0: + thr = norm_thr + elif "fail" in scenarioType and fail_thr > 0: + thr = fail_thr + + if usage > thr * capacity and scenarioType in MUST_UPGRADE_SCENARIOS: + mustUpgrade = True + elif usage > thr * capacity and scenarioType in SHOULD_UPGRADE_SCENARIOS: + shouldUpgrade = True + + # print one row per core link + final_report_raw.append({ + "ID": lID, + "NodeA": nodeA, + "NodeB": nodeB, + "CapacityGbps": capacity, + "ConfNormalThrPct": norm_thr, + "ConfNormalThrGbps": norm_thr * capacity, + "ConfFailThrPct": fail_thr, + "ConfFailThrGbps": fail_thr * capacity, + "NormalEnabled": scenarioEnabled("normal"), + "NormalUsagePct": usagePct("normal", capacity), + "NormalUsageGbps": usageGbps("normal"), + "NormalUsageExceed": exceed("normal", norm_thr, capacity), + "NormalDateTime": scenarioTimestamp("normal"), + "1LinkFailEnabled": scenarioEnabled("onelinkfail"), + "1LinkFailScenario": failScenarioName("onelinkfail"), + "1LinkFailUsagePct": usagePct("onelinkfail", capacity), + "1LinkFailUsageGbps": usageGbps("onelinkfail"), + "1LinkFailUsageExceed": exceed("onelinkfail", fail_thr, capacity), + "1LinkFailUsageTime": scenarioTimestamp("onelinkfail"), + "2LinkFailEnabled": scenarioEnabled("twolinkfail"), + "2LinkFailScenario": failScenarioName("twolinkfail"), + "2LinkFailUsagePct": usagePct("twolinkfail", capacity), + "2LinkFailUsageGbps": usageGbps("twolinkfail"), + "2LinkFailUsageExceed": exceed("twolinkfail", fail_thr, capacity), + "2LinkFailUsageTime": scenarioTimestamp("twolinkfail"), + "1NodeFailEnabled": scenarioEnabled("onenodefail"), + "1NodeFailScenario": failScenarioName("onenodefail"), + "1NodeFailUsagePct": usagePct("onenodefail", capacity), + "1NodeFailUsageGbps": usageGbps("onenodefail"), + "1NodeFailUsageExceed": exceed("onenodefail", fail_thr, capacity), + "1NodeFailUsageTime": scenarioTimestamp("onenodefail"), + "Node+1LinkFailEnabled": scenarioEnabled("nodelinkfail"), + "Node+1LinkFailScenario": failScenarioName("nodelinkfail"), + "Node+1LinkFailUsagePct": usagePct("nodelinkfail", capacity), + "Node+1LinkFailUsageGbps": usageGbps("nodelinkfail"), + "Node+1LinkFailUsageExceed": exceed("nodelinkfail", fail_thr, capacity), + "Node+1LinkFailUsageTime": scenarioTimestamp("nodelinkfail"), + "MustUpgrade": mustUpgrade, + "ShouldUpgrade": shouldUpgrade, + }) + + df_raw = pd.DataFrame( + final_report_raw, + columns=[ + "ID", + "NodeA", + "NodeB", + "CapacityGbps", + "ConfNormalThrPct", + "ConfNormalThrGbps", + "ConfFailThrPct", + "ConfFailThrGbps", + "NormalEnabled", + "NormalUsagePct", + "NormalUsageGbps", + "NormalUsageExceed", + "NormalDateTime", + "1LinkFailEnabled", + "1LinkFailScenario", + "1LinkFailUsagePct", + "1LinkFailUsageGbps", + "1LinkFailUsageExceed", + "1LinkFailUsageTime", + "2LinkFailEnabled", + "2LinkFailScenario", + "2LinkFailUsagePct", + "2LinkFailUsageGbps", + "2LinkFailUsageExceed", + "2LinkFailUsageTime", + "1NodeFailEnabled", + "1NodeFailScenario", + "1NodeFailUsagePct", + "1NodeFailUsageGbps", + "1NodeFailUsageExceed", + "1NodeFailUsageTime", + "Node+1LinkFailEnabled", + "Node+1LinkFailScenario", + "Node+1LinkFailUsagePct", + "Node+1LinkFailUsageGbps", + "Node+1LinkFailUsageExceed", + "Node+1LinkFailUsageTime", + "MustUpgrade", + "ShouldUpgrade", + ], + ) + df_raw.set_index("ID", inplace=True) + + return df_raw + + +def store_raw_report(df_raw): + directory = Path(RAW_REPORT_DIRECTORY) + directory.mkdir(parents=True, exist_ok=True) # Create directory if it doesn't exist + + # Create a ISO8601 basic format UTC timestamped filename + timestamp = datetime.now(UTC).strftime(ISO8601_FORMAT) + filename = f"{RAW_REPORT_FILE_PREFIX}{timestamp}.{RAW_REPORT_FILE_SUFFIX}" + filepath = directory / filename + + df_raw.to_csv(filepath, sep=",", encoding="utf-8", date_format=ISO8601_FORMAT, header=True) + + +############################################################################### +# MAIN +############################################################################### + + +def main(): + # Record and display the start time of the script + start_time = datetime.now(UTC) + print("===============================================================") + print("Execution started at:", start_time.strftime(ISO8601_FORMAT)) + + # 1) query Kentik to build traffic demand matrices + print("Querying Kentik API for traffic demand matrix...") + kentik_json = kentik.fetch_kentik_traffic_matrix() + print("Processing traffic demand matrices from Kentik data...") + traffic_matrices = kentik.kentik_to_traffic_matrices( + kentik_json, nettopo.NODES + ) # returns traffic_matrices[timestamp][ingress][egress] = traffic_rate_Mbps + + # 2) build graph and lookup data structures + graph = build_graph(nettopo.NODES, nettopo.CORELINKS) + corelink_map = build_corelink_map(nettopo.CORELINKS) + srlg_map = build_srlg_map(nettopo.CORELINKS) + + # 3) define tasks to run in parallel + tasks = build_parallel_tasks( + graph, nettopo.NODES, nettopo.NODE_FAILOVER_RATIOS, corelink_map, srlg_map, traffic_matrices + ) + print( + f"Number of traffic simulation tasks: {len(tasks)} across {len(traffic_matrices)} timeslots with {len(tasks) / len(traffic_matrices):.0f} scenarios each" + ) + + # 4) now run all tasks in a single global pool + print("Executing parallel traffic simulation tasks...") + with multiprocessing.Pool() as pool: + tasks_results = pool.map(compute_scenario_task, tasks) + + # 5) aggregate link usage across all tasks, recording the worst-case (highest) load on each link + aggLinkUsage = link_usage_aggregator(corelink_map, tasks_results) + + # 6) create and store final report + raw_report = build_raw_report(corelink_map, aggLinkUsage) + store_raw_report(raw_report) + + # Display the end time, and total execution duration + end_time = datetime.now(UTC) + duration = end_time - start_time + print("Execution ended at:", end_time.strftime(ISO8601_FORMAT)) + print("Total duration in seconds:", duration.total_seconds()) + print("===============================================================") + + +if __name__ == "__main__": + main() diff --git a/tox.ini b/tox.ini index 85de47d9d67482c75ab81a7f0723035d1d98d6ce..d244c8e8a72a1e6726a1b67390b20d98134e3ef0 100644 --- a/tox.ini +++ b/tox.ini @@ -17,7 +17,7 @@ commands = ruff check --respect-gitignore --preview . ruff format --respect-gitignore --preview --check . mypy . - sh -c 'if [ $SKIP_ALL_TESTS = 1 ]; then echo "Skipping coverage report"; else pytest --cov=gso --cov-report=xml --cov-report=html --cov-fail-under=90 -n auto {posargs}; fi' + sh -c 'if [ $SKIP_ALL_TESTS = 1 ]; then echo "Skipping coverage report"; else pytest --cov=capacity-planner --cov-report=xml --cov-report=html --cov-fail-under=90 -n auto {posargs}; fi' allowlist_externals = sh