From b9543578ed79b00db7a2eec238a6a32fefec89c9 Mon Sep 17 00:00:00 2001 From: Karel van Klink <karel.vanklink@geant.org> Date: Mon, 31 Mar 2025 12:02:37 +0200 Subject: [PATCH] Reformat files --- capacity_planner/__init__.py | 1 + capacity_planner/capacityreport.py | 74 ++-- capacity_planner/services/__init__.py | 1 + capacity_planner/services/kentik.py | 25 +- capacity_planner/utils/__init__.py | 1 + capacity_planner/utils/topology.py | 40 +- capacity_planner/whatifscenarios.py | 515 ++++++++++++-------------- pyproject.toml | 1 + 8 files changed, 300 insertions(+), 358 deletions(-) diff --git a/capacity_planner/__init__.py b/capacity_planner/__init__.py index e69de29..e6678ff 100644 --- a/capacity_planner/__init__.py +++ b/capacity_planner/__init__.py @@ -0,0 +1 @@ +"""GÉANT Capacity Planner.""" diff --git a/capacity_planner/capacityreport.py b/capacity_planner/capacityreport.py index db46ac6..8d3f31a 100644 --- a/capacity_planner/capacityreport.py +++ b/capacity_planner/capacityreport.py @@ -1,16 +1,17 @@ +"""Generate a capacity report.""" + import argparse -import os import re import sys from datetime import UTC, datetime, timedelta +from logging import getLogger from pathlib import Path import numpy as np -import pandas as pd # https://pandas.pydata.org +import pandas as pd + +logger = getLogger(__name__) -############################################################################### -# INPUT DATA SECTION -############################################################################### # make sure this matches with the What-If Scenario runner script RAW_REPORT_DIRECTORY = "/Users/daniel.verlouw/Desktop/rawreports" @@ -32,14 +33,11 @@ ISO8601_FORMAT = "%Y%m%dT%H%MZ" ISO8601_REGEXP = r"\d{4}\d{2}\d{2}T\d{2}\d{2}Z" -############################################################################### -# RAW CONSOLIDATED REPORT -############################################################################### - - # --- Helper function to get the row of the max usage for a given column, handling empty/missing columns --- def get_max_usage_row(group, usage_col): - """Returns a single row (as a Series) within `group` that has the maximum value in `usage_col`. + """Given a list of rows, return the row with the highest usage. + + Returns a single row (as a Series) within `group` that has the maximum value in `usage_col`. If `usage_col` does not exist or is entirely NaN, returns None. """ # If the column doesn't exist or has all null values, return None @@ -53,9 +51,10 @@ def get_max_usage_row(group, usage_col): def extract_usage_details(group): - """For a single group of rows (all links with the same ID), find the row with the max usage for each usage field (Gbps) - and extract the relevant columns. - Booleans are set to True if at least one row in the group is True. + """Extract usage details. + + For a single group of rows (all links with the same ID), find the row with the max usage for each usage field (Gbps) + and extract the relevant columns. Booleans are set to True if at least one row in the group is True. """ # We'll create a dict to hold the final data for this ID. out = {} @@ -140,12 +139,8 @@ def extract_usage_details(group): return pd.Series(out) -############################################################################### -# HUMAN READABLE CONSOLIDATED REPORT -############################################################################### - - def build_human_report(df_raw): + """Build a human-readable report.""" df_human = df_raw.copy() # Helper formatting functions @@ -245,35 +240,32 @@ def build_human_report(df_raw): ] -############################################################################### -# FILE FUNCTIONS -############################################################################### - - def find_files_by_timeframe(directory, prefix, suffix, start_datetime, end_datetime): + """Find all files that fall within a given timeframe.""" # List all raw reports in directory all_raw_reports = [ file - for file in os.listdir(directory) - if os.path.isfile(os.path.join(directory, file)) - and file.startswith(prefix) - and file.endswith(suffix) - and re.search(ISO8601_REGEXP, file) + for file in Path(directory).iterdir() + if Path(directory / file).is_file() + and file.name.startswith(prefix) + and file.name.endswith(suffix) + and re.search(ISO8601_REGEXP, file.name) ] # Filter to files that match the timestamp pattern within the specified datetime range matching_files = [] for file in all_raw_reports: - match = re.search(ISO8601_REGEXP, file) + match = re.search(ISO8601_REGEXP, file.name) file_date = datetime.strptime(match.group(), ISO8601_FORMAT).replace(tzinfo=UTC) if start_datetime <= file_date <= end_datetime: - matching_files.append(os.path.join(directory, file)) + matching_files.append(Path(directory / file)) return matching_files def store_consolidated(df_consolidated, directory, prefix, suffix): + """Store consolidated results in a file.""" path = Path(directory) path.mkdir(parents=True, exist_ok=True) # Create directory if it doesn't exist @@ -283,22 +275,17 @@ def store_consolidated(df_consolidated, directory, prefix, suffix): if suffix == "csv": df_consolidated.to_csv( - os.path.join(path, filename), sep=",", encoding="utf-8", date_format=ISO8601_FORMAT, header=True + Path(path / filename), sep=",", encoding="utf-8", date_format=ISO8601_FORMAT, header=True ) elif suffix == "txt": markdown = df_consolidated.to_markdown(headers="keys", tablefmt="psql") # Write the markdown string to a file - with open(os.path.join(path, filename), "w") as file: - file.write(markdown) - - -############################################################################### -# MAIN -############################################################################### + Path(path / filename).write_text(markdown) def main(): + """Main method for running the capacity planner.""" # Parse commandline arguments parser = argparse.ArgumentParser(description="Script usage:") parser.add_argument("--daily", action="store_true", help="Create daily report (past day)") @@ -343,9 +330,11 @@ def main(): ) if len(matching_files) > 0: - print( - f"Generating consolidated report for {len(matching_files)} raw reports for timeframe {start_datetime} through {end_datetime}" + msg = ( + f"Generating consolidated report for {len(matching_files)} raw reports for timeframe {start_datetime} " + f"through {end_datetime}" ) + logger.info(msg) # List of columns that should be parsed as dates from CSV date_columns = [ @@ -396,7 +385,8 @@ def main(): ) else: - print(f"No raw files found for timeframe {start_datetime} through {end_datetime}") + msg = f"No raw files found for timeframe {start_datetime} through {end_datetime}" + logger.warning(msg) if __name__ == "__main__": diff --git a/capacity_planner/services/__init__.py b/capacity_planner/services/__init__.py index e69de29..3f63882 100644 --- a/capacity_planner/services/__init__.py +++ b/capacity_planner/services/__init__.py @@ -0,0 +1 @@ +"""Different services that the capacity planner interacts with.""" diff --git a/capacity_planner/services/kentik.py b/capacity_planner/services/kentik.py index c358b67..a565cb3 100644 --- a/capacity_planner/services/kentik.py +++ b/capacity_planner/services/kentik.py @@ -1,3 +1,5 @@ +"""Interactions with Kentik through their API.""" + import os import requests @@ -24,16 +26,14 @@ EGRESS_DIMENSION = "i_ult_exit_site" def _api_query(payload): # Headers for authentication headers = {"Content-Type": "application/json", "X-CH-Auth-Email": API_EMAIL, "X-CH-Auth-API-Token": API_TOKEN} - response = requests.post(API_URL, headers=headers, json=payload) + response = requests.post(API_URL, headers=headers, json=payload, timeout=120) - if response.status_code == 200: - return response.json() - print(f"Error fetching data from Kentik API: {response.status_code} - {response.text}") - return None + response.raise_for_status() + return response.json() def fetch_kentik_traffic_matrix(): - # JSON query payload + """Fetch a traffic matrix from Kentik.""" payload = { "version": 4, "queries": [ @@ -44,12 +44,10 @@ def fetch_kentik_traffic_matrix(): "all_devices": True, "aggregateTypes": ["max_in_bits_per_sec"], "depth": 350, - "topx": 350, # 350=max supported by Kentik + "topx": 350, "device_name": [], "fastData": "Auto", "lookback_seconds": 60 * KENTIK_REPORTING_PERIOD, - # "starting_time": null, - # "ending_time": null, "matrixBy": [], "metric": ["in_bytes"], "minsPolling": KENTIK_FLOW_AGGR_WINDOW, @@ -96,8 +94,7 @@ def fetch_kentik_traffic_matrix(): ], } - response = _api_query(payload) - return response + return _api_query(payload) ############################################################################### @@ -106,9 +103,9 @@ def fetch_kentik_traffic_matrix(): def kentik_to_traffic_matrices(json_data, nodes): - """Convert the given JSON structure returned by Kentik into a dictionary - keyed by timestamp. For each timestamp, we store a nested dict of: - traffic_matrices[timestamp][ingress][egress] = traffic_rate_Mbps + """Convert the given JSON structure returned by Kentik into a dictionary keyed by timestamp. + + For each timestamp, we store a nested dict of: traffic_matrices[timestamp][ingress][egress] = traffic_rate_Mbps """ # We'll gather all flows in the JSON data_entries = json_data["results"][0]["data"] diff --git a/capacity_planner/utils/__init__.py b/capacity_planner/utils/__init__.py index e69de29..d473e67 100644 --- a/capacity_planner/utils/__init__.py +++ b/capacity_planner/utils/__init__.py @@ -0,0 +1 @@ +"""Utilities for capacity-planner.""" diff --git a/capacity_planner/utils/topology.py b/capacity_planner/utils/topology.py index f1f252b..ac5b731 100644 --- a/capacity_planner/utils/topology.py +++ b/capacity_planner/utils/topology.py @@ -1,13 +1,18 @@ -# (A) Define each core link: -# ( linkID, nodeA, nodeB, igp_metric, capacity, srlg_list, [normal_threshold], [failure_threshold] ) -# where: -# linkID: network-wide unique numeric ID (e.g. 1001) -# nodeA, nodeB: core link endpoints -# igp_metric: IGP cost/distance -# capacity: full-duplex link capacity in Gbps -# srlg_list: list of Shared Risk Link Group (SRLG) names (or empty) -# normal_threshold: fraction for normal usage. If omitted default is used -# failure_threshold: fraction for usage under failure. If omitted default is used +"""Define each core link in a topology. + +Each link is a tuple shaped like: +( linkID, nodeA, nodeB, igp_metric, capacity, srlg_list, [normal_threshold], [failure_threshold] ) + +where: + linkID: network-wide unique numeric ID (e.g. 1001) + nodeA, nodeB: core link endpoints + igp_metric: IGP cost/distance + capacity: full-duplex link capacity in Gbps + srlg_list: list of Shared Risk Link Group (SRLG) names (or empty) + normal_threshold: fraction for normal usage. If omitted default is used + failure_threshold: fraction for usage under failure. If omitted default is used +""" + CORELINKS = [ (1, "AMS", "FRA", 2016, 800, []), (2, "AMS", "LON", 1428, 800, []), @@ -43,22 +48,12 @@ CORELINKS = [ (32, "ATH2", "MIL2", 25840, 100, ["MIL2-PRE"]), (33, "ATH2", "THE", 8200, 100, []), (34, "SOF", "THE", 5800, 100, []), - # ("COP", "HAM", 480, 400, []), - # ("COP", "STO", 600, 400, []), - # ("HEL", "STO", 630, 400, []), (35, "RIG", "TAR", 2900, 100, []), (36, "KAU", "POZ", 10050, 100, []), - # ("BEL", "SOF", 444, 400, []), - # ("BEL", "ZAG", 528, 400, []), (37, "ZAG", "SOF", 9720, 200, []), (38, "BRA", "BUD", 50000, 100, ["BRA-BUD"]), (39, "COR", "LON2", 7160, 100, ["COR-LON2"]), - # ("HEL", "TAR", 227, 400, []), (40, "KAU", "RIG", 4500, 100, []), - # ("BRU", "LUX", 380, 400, []), - # ("FRA", "LUX", 312, 400, []), - # ("RIG", "STO", 400, 400, []), - # ("COP", "POZ", 750, 400, []), (41, "COR", "PAR", 12549, 100, ["COR-LON2"]), (42, "KIE", "POZ", 50000, 100, []), (43, "CHI", "BUC", 50000, 40, []), @@ -111,25 +106,21 @@ NODES = [ NODE_FAILOVER_RATIOS = { "AMS": {"LON": 0.6, "FRA": 0.4}, "ATH": {"THE": 0.5, "MAR": 0.5}, - # "BEL": {"ZAG": 1.0 }, "BIL": {"MAD": 1.0}, "BRA": {"VIE": 1.0}, "BRU": {"AMS": 1.0}, "BUC": {"VIE": 0.5, "SOF": 0.5}, "BUD": {"ZAG": 1.0}, - # "COP": {"HAM": 0.5, "STO": 0.5 }, "COR": {"DUB": 1.0}, "DUB": {"COR": 1.0}, "FRA": {"HAM": 0.4, "AMS": 0.4, "LON": 0.2}, "GEN": {"PAR": 0.6, "MIL2": 0.4}, "HAM": {"FRA": 0.5, "POZ": 0.2, "LON": 0.3}, - # "HEL": {"TAR": 0.3, "HAM": 0.7 }, "KAU": {"RIG": 1.0}, "LIS": {"POR": 1.0}, "LJU": {"ZAG": 1.0}, "LON": {"AMS": 0.4, "HAM": 0.2, "FRA": 0.4}, "LON2": {"LON": 1.0}, - # "LUX": {"BRU": 1.0 }, "MAD": {"BIL": 1.0}, "MAR": {"MIL2": 0.6, "ATH": 0.4}, "MIL2": {"GEN": 0.3, "MAR": 0.3, "VIE": 0.3}, @@ -139,7 +130,6 @@ NODE_FAILOVER_RATIOS = { "PRA": {"VIE": 1.0}, "RIG": {"KAU": 1.0}, "SOF": {"THE": 0.5, "BUC": 0.5}, - # "STO": {"COP": 0.5, "HEL": 0.5 }, "TAR": {"RIG": 1.0}, "THE": {"ATH": 0.5, "SOF": 0.5}, "VIE": {"MIL2": 0.6, "PRA": 0.2, "BUC": 0.2}, diff --git a/capacity_planner/whatifscenarios.py b/capacity_planner/whatifscenarios.py index 2636554..64b366c 100644 --- a/capacity_planner/whatifscenarios.py +++ b/capacity_planner/whatifscenarios.py @@ -1,21 +1,24 @@ +"""Run all different what-if scenarios.""" + import itertools import math import multiprocessing from datetime import UTC, datetime +from logging import getLogger from pathlib import Path -from capacity_planner.utils import topology as nettopo import networkx as nx import pandas as pd + from capacity_planner.services import kentik +from capacity_planner.utils import topology as nettopo + +logger = getLogger(__name__) -############################################################################### -# 1) INPUT DATA SECTION -############################################################################### -# If normal threshold not specified on a link, default to 0.40 (allowing upto 40% link utilization in normal non-failure scenario) +# If normal threshold not specified on a link, default to 40% link utilization in normal non-failure scenario DEFAULT_NORMAL_THRESHOLD = 0.40 -# If failure threshold not specified on a link, default to 0.80 (allowing upto 80% link utilization in worst-case failure scenario) +# If failure threshold not specified on a link, default to 80% link utilization in worst-case failure scenario DEFAULT_FAILURE_THRESHOLD = 0.80 ENABLED_SCENARIOS = ["normal", "onelinkfail", "twolinkfail", "onenodefail", "nodelinkfail"] @@ -28,75 +31,53 @@ SHOULD_UPGRADE_SCENARIOS = ["twolinkfail", "nodelinkfail"] RAW_REPORT_DIRECTORY = "/Users/daniel.verlouw/Desktop/rawreports" RAW_REPORT_FILE_PREFIX = "raw_capacityreport_" RAW_REPORT_FILE_SUFFIX = "csv" - -# ISO 8601 basic timestamp format (YYYYMMDDTHHMMZ) ISO8601_FORMAT = "%Y%m%dT%H%MZ" -ISO8601_REGEXP = r"\d{4}\d{2}\d{2}T\d{2}\d{2}Z" - -############################################################################### -# 2) Build a local NetworkX multi-directional graph for scenario computations -############################################################################### def build_graph(nodes, links): - # MultiDiGraph supports multiple edges (ECMP links) between two nodes - # and is directional, allowing modelling of Full-Duplex links and taking - # traffic direction into account + """Build a graph of nodes and links. + + MultiDiGraph supports multiple edges (ECMP links) between two nodes and is directional, allowing modelling of + Full-Duplex links and taking traffic direction into account. + """ graph = nx.MultiDiGraph() graph.add_nodes_from(nodes) for link in links: - # ( linkID, nodeA, nodeB, igp_metric, capacity, srlg_list, [norm_thr], [fail_thr] ) - lID = link[0] - nodeA = link[1] - nodeB = link[2] + link_id = link[0] + node_a = link[1] + node_b = link[2] igp = link[3] # Add Full-Duplex edges in both directions with linkID # assumes the IGP metric is configured symmetrically - graph.add_edge(nodeA, nodeB, key=lID, igpmetric=igp) - graph.add_edge(nodeB, nodeA, key=lID, igpmetric=igp) + graph.add_edge(node_a, node_b, key=link_id, igpmetric=igp) + graph.add_edge(node_b, node_a, key=link_id, igpmetric=igp) return graph -############################################################################### -# 3) Helper functions to ease lookups for core links and SRLGs -############################################################################### - - def build_corelink_map(links): - # Creating a lookup dictionary for fast lookup by linkID - corelink_dict = {link[0]: link for link in links} - return corelink_dict + """Creating a lookup dictionary for fast lookup by linkID.""" + return {link[0]: link for link in links} def build_srlg_map(links): - # Maps SRLG name to one or more core links (linkIDs) + """Maps SRLG name to one or more core link IDs.""" srlg_map = {} for link in links: - # ( linkID, nodeA, nodeB, igp_metric, capacity, srlg_list, [norm_thr], [fail_thr] ) - lID = link[0] - nodeA = link[1] - nodeB = link[2] - igp = link[3] - - if len(link) > 5 and isinstance(link[5], list): + if len(link) > 5 and isinstance(link[5], list): # noqa: PLR2004 for srlg in link[5]: srlg_map.setdefault(srlg, set()) - srlg_map[srlg].add(lID) + srlg_map[srlg].add(link[0]) return srlg_map -############################################################################### -# 4) Node failover logic - redistribute traffic to other core nodes -############################################################################### - - def redistribute_local_traffic(t_xx, ratio_map): """We treat local traffic T[X,X] as two endpoints at X. + Each endpoint re-homes to neighbor i or j with probability ratio_map[i], ratio_map[j]. => fraction = ratio_map[i]*ratio_map[j]*t_xx => T[i,j]. If i=j, it's T[i,i] (local on neighbor i). @@ -112,23 +93,20 @@ def redistribute_local_traffic(t_xx, ratio_map): return local_matrix -def build_traffic_with_node_fail(baseTraffic, nodefail_ratios, failingNode): - """Return scenario-specific traffic matrix in Mbps, - re-homing T[failingNode,*], T[*,failingNode], T[failingNode,failingNode]. - """ +def build_traffic_with_node_fail(base_traffic, nodefail_ratios, failing_node): + """Return scenario-specific traffic matrix in Mbps.""" scenario_traffic = {} - ratio_map = nodefail_ratios.get(failingNode, {}) + ratio_map = nodefail_ratios.get(failing_node, {}) - for s in baseTraffic: + for s in base_traffic: # noqa: PLR1702 FIXME scenario_traffic[s] = {} - for d in baseTraffic[s]: - val = baseTraffic[s][d] + for d in base_traffic[s]: + val = base_traffic[s][d] if val <= 0: scenario_traffic[s][d] = 0 continue - # traffic is local - T[failingNode,failingNode] - if s == failingNode and d == failingNode: + if s == failing_node and d == failing_node: scenario_traffic[s][d] = 0 mat = redistribute_local_traffic(val, ratio_map) for i in mat: @@ -139,14 +117,14 @@ def build_traffic_with_node_fail(baseTraffic, nodefail_ratios, failingNode): scenario_traffic[i][j] = scenario_traffic[i].get(j, 0) + portion # T[failingNode,*] - elif s == failingNode: + elif s == failing_node: scenario_traffic[s][d] = 0 for nbr, r in ratio_map.items(): scenario_traffic.setdefault(nbr, {}) scenario_traffic[nbr][d] = scenario_traffic[nbr].get(d, 0) + val * r # T[*,failingNode] - elif d == failingNode: + elif d == failing_node: scenario_traffic[s][d] = 0 for nbr, r in ratio_map.items(): scenario_traffic[s][nbr] = scenario_traffic[s].get(nbr, 0) + val * r @@ -158,45 +136,38 @@ def build_traffic_with_node_fail(baseTraffic, nodefail_ratios, failingNode): return scenario_traffic -############################################################################### -# 5) Remove SRLG or corelink from graph -############################################################################### - - -def remove_corelink(graph, removeLink): +def remove_corelink(graph, remove_link): + """Remove a core link from a graph.""" edges_rm = [] - for u, v, lID in graph.edges(keys=True): - if lID == removeLink: - edges_rm.append((u, v, lID)) + for u, v, link_id in graph.edges(keys=True): + if link_id == remove_link: + edges_rm.append((u, v, link_id)) for e in edges_rm: graph.remove_edge(*e) -def remove_corelink_or_srlg(graph, failedElement, srlg_map): - etype, data = failedElement +def remove_corelink_or_srlg(graph, failed_element, srlg_map): + """Remove a core link or SRLG.""" + etype, data = failed_element if etype == "LINK": - linkID = data - remove_corelink(graph, linkID) + link_id = data + remove_corelink(graph, link_id) elif etype == "SRLG": if data in srlg_map: - for linkID in srlg_map[data]: - remove_corelink(graph, linkID) + for link_id in srlg_map[data]: + remove_corelink(graph, link_id) -############################################################################### -# 6) Compute scenario load in MultiDiGraph -############################################################################### - - -def compute_scenario_load(graph, scenario_traffic): +def compute_scenario_load(graph, scenario_traffic): # noqa: PLR0912 FIXME + """Compute load per scenario.""" # initialize empty usage map - usageMap = {} - for u, v, lID in graph.edges(keys=True): - usageMap[u, v, lID] = 0.0 + usage_map = {} + for u, v, link_id in graph.edges(keys=True): + usage_map[u, v, link_id] = 0.0 # walk over [ingress][egress] traffic demands - for s in scenario_traffic: + for s in scenario_traffic: # noqa: PLR1702 for d in scenario_traffic[s]: demand_mbps = scenario_traffic[s][d] if demand_mbps <= 0 or s == d or (s not in graph) or (d not in graph): @@ -244,170 +215,162 @@ def compute_scenario_load(graph, scenario_traffic): if igp < best_igp: best_igp = igp best_k = k2 - usageMap[p, q, best_k] += share - - return usageMap + usage_map[p, q, best_k] += share + return usage_map -############################################################################### -# 7) Individual Normal/Failure Scenario Runners -############################################################################### - -def run_normal_scenario(graph, trafficMatrix): +def run_normal_scenario(graph, traffic_matrix): """No failures => just compute load with the base traffic.""" - return compute_scenario_load(graph, trafficMatrix) + return compute_scenario_load(graph, traffic_matrix) -def run_onenode_failure(graph, trafficMatrix, nodefail_ratios, nodeFail): +def run_onenode_failure(graph, traffic_matrix, nodefail_ratios, node_fail): """Remove nodeFail from graph, shift its traffic T[nodeFail,*], T[*,nodeFail], T[nodeFail,nodeFail].""" - if nodeFail in graph: - graph.remove_node(nodeFail) - scenario_traffic = build_traffic_with_node_fail(trafficMatrix, nodefail_ratios, nodeFail) + if node_fail in graph: + graph.remove_node(node_fail) + scenario_traffic = build_traffic_with_node_fail(traffic_matrix, nodefail_ratios, node_fail) return compute_scenario_load(graph, scenario_traffic) -def run_onelink_failure(graph, trafficMatrix, srlg_map, linkFail): - """LinkFail = ("LINK",(u,v)) or ("SRLG",{(x,y),...}). +def run_one_link_failure(graph, traffic_matrix, srlg_map, link_fail): + """Run a scenario with a single link failure. + + LinkFail = ("LINK",(u,v)) or ("SRLG",{(x,y),...}). Remove link from a copy of graph, then compute load with base traffic. """ - remove_corelink_or_srlg(graph, linkFail, srlg_map) - return compute_scenario_load(graph, trafficMatrix) + remove_corelink_or_srlg(graph, link_fail, srlg_map) + return compute_scenario_load(graph, traffic_matrix) -def run_twolink_failure(graph, trafficMatrix, srlg_map, linkFail1, linkFail2): - """linkFail1, linkFail2 each = ("LINK",(u,v)) or ("SRLG",{(x,y),...}). +def run_two_link_failure(graph, traffic_matrix, srlg_map, link_fail_1, link_fail_2): + """Run a scenario where two links fail. + + linkFail1, linkFail2 each = ("LINK",(u,v)) or ("SRLG",{(x,y),...}). Remove them from a copy of graph, then compute load with base traffic. """ - remove_corelink_or_srlg(graph, linkFail1, srlg_map) - remove_corelink_or_srlg(graph, linkFail2, srlg_map) - return compute_scenario_load(graph, trafficMatrix) + remove_corelink_or_srlg(graph, link_fail_1, srlg_map) + remove_corelink_or_srlg(graph, link_fail_2, srlg_map) + return compute_scenario_load(graph, traffic_matrix) -def run_nodelink_failure(graph, trafficMatrix, srlg_map, nodefail_ratios, nodeFail, linkFail): +def run_node_link_failure(graph, traffic_matrix, srlg_map, node_fail_ratios, node_fail, link_fail): # noqa: PLR0917 + """Run scenario for node and link failure.""" # first remove node and shift traffic - if nodeFail in graph: - graph.remove_node(nodeFail) - scenario_traffic = build_traffic_with_node_fail(trafficMatrix, nodefail_ratios, nodeFail) + if node_fail in graph: + graph.remove_node(node_fail) + scenario_traffic = build_traffic_with_node_fail(traffic_matrix, node_fail_ratios, node_fail) # then remove link - remove_corelink_or_srlg(graph, linkFail, srlg_map) + remove_corelink_or_srlg(graph, link_fail, srlg_map) return compute_scenario_load(graph, scenario_traffic) -############################################################################### -# 8) Single scenario task runner -############################################################################### - - -def expand_link_name(corelink_map, linkFail): +def expand_link_name(corelink_map, link_fail): """Helper function--return a string describing one scenario element (LINK linkID or SRLG name).""" - etype, data = linkFail + etype, data = link_fail if etype == "LINK": # get corelink details by ID td = corelink_map.get(data) - nodeA = td[1] - nodeB = td[2] - return f"LINK_{data}/{nodeA}-{nodeB}" + node_a = td[1] + node_b = td[2] + return f"LINK_{data}/{node_a}-{node_b}" if etype == "SRLG": return f"SRLG_{data}" - return str(linkFail) + return str(link_fail) def compute_scenario_task(args): - """Each scenario runs as a single task""" - (graph_orig, corelink_map, srlg_map, nodefail_ratios, ts, trafficMatrix, scenarioType, scenarioData) = args + """Each scenario runs as a single task.""" + (graph_orig, corelink_map, srlg_map, node_fail_ratios, ts, traffic_matrix, scenario_type, scenario_data) = args # 1) first, create a copy of graph so in-memory copy of original is not modified graph = graph_orig.copy() # 2) run selected normal or failure scenario - usageMap = {} - scenarioName = "" - if scenarioType == "normal": - usageMap = run_normal_scenario(graph, trafficMatrix) - scenarioName = "normal" - - elif scenarioType == "onenodefail": - nodeFail = scenarioData - usageMap = run_onenode_failure(graph, trafficMatrix, nodefail_ratios, nodeFail) - scenarioName = f"NODE_{nodeFail}" - - elif scenarioType == "onelinkfail": - linkFail = scenarioData - usageMap = run_onelink_failure(graph, trafficMatrix, srlg_map, linkFail) - scenarioName = f"{expand_link_name(corelink_map, linkFail)}" - - elif scenarioType == "twolinkfail": - (linkFail1, linkFail2) = scenarioData - usageMap = run_twolink_failure(graph, trafficMatrix, srlg_map, linkFail1, linkFail2) - scenarioName = f"{expand_link_name(corelink_map, linkFail1)} + {expand_link_name(corelink_map, linkFail2)}" - - elif scenarioType == "nodelinkfail": - (nodeFail, linkFail) = scenarioData - usageMap = run_nodelink_failure(graph, trafficMatrix, srlg_map, nodefail_ratios, nodeFail, linkFail) - scenarioName = f"NODE_{nodeFail} + {expand_link_name(corelink_map, linkFail)}" + usage_map = {} + scenario_name = "" + if scenario_type == "normal": + usage_map = run_normal_scenario(graph, traffic_matrix) + scenario_name = "normal" + + elif scenario_type == "onenodefail": + node_fail = scenario_data + usage_map = run_onenode_failure(graph, traffic_matrix, node_fail_ratios, node_fail) + scenario_name = f"NODE_{node_fail}" + + elif scenario_type == "onelinkfail": + link_fail = scenario_data + usage_map = run_one_link_failure(graph, traffic_matrix, srlg_map, link_fail) + scenario_name = f"{expand_link_name(corelink_map, link_fail)}" + + elif scenario_type == "twolinkfail": + (link_fail_1, link_fail_2) = scenario_data + usage_map = run_two_link_failure(graph, traffic_matrix, srlg_map, link_fail_1, link_fail_2) + scenario_name = f"{expand_link_name(corelink_map, link_fail_1)} + {expand_link_name(corelink_map, link_fail_2)}" + + elif scenario_type == "nodelinkfail": + (node_fail, link_fail) = scenario_data + usage_map = run_node_link_failure(graph, traffic_matrix, srlg_map, node_fail_ratios, node_fail, link_fail) + scenario_name = f"NODE_{node_fail} + {expand_link_name(corelink_map, link_fail)}" else: - usageMap = {} - scenarioName = "unknown" - - return (ts, scenarioType, scenarioName, usageMap) - + usage_map = {} + scenario_name = "unknown" -############################################################################### -# 9) Build list of tasks to run in parallel -############################################################################### + return ts, scenario_type, scenario_name, usage_map -def build_parallel_tasks(graph, nodes, nodefail_ratios, corelink_map, srlg_map, traffic_matrices): +def build_parallel_tasks(graph, nodes, nodefail_ratios, corelink_map, srlg_map, traffic_matrices): # noqa: PLR0917 + """Build parallel tasks for given failures.""" # enumerate link failure scenarios => link linkIDs + srlg - linkFailElements = [] - for link_id, _link_data in corelink_map.items(): - linkFailElements.append(("LINK", link_id)) - - for srlg_name, _srlg_data in srlg_map.items(): - linkFailElements.append(("SRLG", srlg_name)) + link_fail_elements = [] + link_fail_elements.extend([("LINK", link_id) for link_id in corelink_map]) + link_fail_elements.extend([("SRLG", srlg_name) for srlg_name in srlg_map]) tasks = [] - for ts in traffic_matrices.keys(): - trafficMatrix = traffic_matrices[ts] + for ts in traffic_matrices: + traffic_matrix = traffic_matrices[ts] # add normal non-failure scenario if "normal" in ENABLED_SCENARIOS: - tasks.append((graph, corelink_map, srlg_map, nodefail_ratios, ts, trafficMatrix, "normal", None)) + tasks.append((graph, corelink_map, srlg_map, nodefail_ratios, ts, traffic_matrix, "normal", None)) # add single link fail scenarios if "onelinkfail" in ENABLED_SCENARIOS: - for linkFail in linkFailElements: - tasks.append(( + tasks.extend([ + ( graph, corelink_map, srlg_map, nodefail_ratios, ts, - trafficMatrix, + traffic_matrix, "onelinkfail", - linkFail, - )) + failure, + ) + for failure in link_fail_elements + ]) # add unique two link fail scenarios if "twolinkfail" in ENABLED_SCENARIOS: - fe_twofail_combos = list(itertools.combinations(linkFailElements, 2)) - for linkFail1, linkFail2 in fe_twofail_combos: - tasks.append(( + fe_twofail_combos = list(itertools.combinations(link_fail_elements, 2)) + tasks.extend([ + ( graph, corelink_map, srlg_map, nodefail_ratios, ts, - trafficMatrix, + traffic_matrix, "twolinkfail", - (linkFail1, linkFail2), - )) + failure, + ) + for failure in fe_twofail_combos + ]) - for nodeFail in nodes: + for node_fail in nodes: # add node fail scenarios if "onenodefail" in ENABLED_SCENARIOS: tasks.append(( @@ -416,152 +379,151 @@ def build_parallel_tasks(graph, nodes, nodefail_ratios, corelink_map, srlg_map, srlg_map, nodefail_ratios, ts, - trafficMatrix, + traffic_matrix, "onenodefail", - nodeFail, + node_fail, )) # add node + link fail scenarios if "nodelinkfail" in ENABLED_SCENARIOS: - for linkFail in linkFailElements: - tasks.append(( + tasks.extend([ + ( graph, corelink_map, srlg_map, nodefail_ratios, ts, - trafficMatrix, + traffic_matrix, "nodelinkfail", - (nodeFail, linkFail), - )) + (node_fail, link_fail), + ) + for link_fail in link_fail_elements + ]) return tasks -############################################################################### -# 10) Report generator -############################################################################### - - def link_usage_aggregator(corelink_map, results): + """Build an aggregate of all link usages.""" # initialize empty link usage aggregator - aggLinkUsage = {} - for link_id, _link_data in corelink_map.items(): - aggLinkUsage[link_id] = {} - for scenarioType in ENABLED_SCENARIOS: - aggLinkUsage[link_id][scenarioType] = (0.0, pd.NaT, None) + agg_link_usage = {} + for link_id in corelink_map: + agg_link_usage[link_id] = {} + for scenario_type in ENABLED_SCENARIOS: + agg_link_usage[link_id][scenario_type] = (0.0, pd.NaT, None) # unify results across all tasks - for each scenario, record worst-case (highest) load on each link - for ts, scenarioType, scenarioName, usageMap in results: - for u, v, lID in usageMap: - usage = usageMap[u, v, lID] - oldUsage, oldTs, oldSc = aggLinkUsage[lID][scenarioType] - if usage > oldUsage: - aggLinkUsage[lID][scenarioType] = (usage, ts, f"{scenarioName}") + for ts, scenario_type, scenario_name, usage_map in results: + for u, v, link_id in usage_map: + usage = usage_map[u, v, link_id] + old_usage, _, _ = agg_link_usage[link_id][scenario_type] + if usage > old_usage: + agg_link_usage[link_id][scenario_type] = (usage, ts, f"{scenario_name}") - # return aggLinkUsage[linkID][scenarioType] => (usage,ts,scenarioName) - return aggLinkUsage + # return agg_link_usage[linkID][scenario_type] => (usage,ts,scenario_name) + return agg_link_usage -def build_raw_report(corelink_map, aggLinkUsage): +def build_raw_report(corelink_map, agg_link_usage): + """Build report from raw data.""" final_report_raw = [] # Helper print functions def exceed(scenario, thr, capacity): - return aggLinkUsage[lID][scenario][0] > thr * capacity if scenario in ENABLED_SCENARIOS else pd.NA + return agg_link_usage[link_id][scenario][0] > thr * capacity if scenario in ENABLED_SCENARIOS else pd.NA - def usagePct(scenario, capacity): - # (usage,ts,scenarioName) - return aggLinkUsage[lID][scenario][0] / capacity if scenario in ENABLED_SCENARIOS else pd.NA + def usage_pct(scenario, capacity): + """Returns a tuple that contains usage, ts, and scenario name.""" + return agg_link_usage[link_id][scenario][0] / capacity if scenario in ENABLED_SCENARIOS else pd.NA - def usageGbps(scenario): - return aggLinkUsage[lID][scenario][0] if scenario in ENABLED_SCENARIOS else pd.NA + def usage_gbps(scenario): + return agg_link_usage[link_id][scenario][0] if scenario in ENABLED_SCENARIOS else pd.NA - def scenarioTimestamp(scenario): + def scenario_timestamp(scenario): return ( - pd.to_datetime(aggLinkUsage[lID][scenario][1], unit="ms", errors="coerce", utc=True) + pd.to_datetime(agg_link_usage[link_id][scenario][1], unit="ms", errors="coerce", utc=True) if scenario in ENABLED_SCENARIOS else pd.NaT ) - def failScenarioName(scenario): - return aggLinkUsage[lID][scenario][2] if scenario in ENABLED_SCENARIOS else None + def fail_scenario_name(scenario): + return agg_link_usage[link_id][scenario][2] if scenario in ENABLED_SCENARIOS else None - def scenarioEnabled(scenario): + def scenario_enabled(scenario): return scenario in ENABLED_SCENARIOS # Iterate over each core link - for link_id, link_data in corelink_map.items(): - # ( linkID, nodeA, nodeB, igp_metric, capacity, srlg_list, [norm_thr], [fail_thr] ) - lID = link_data[0] - nodeA = link_data[1] - nodeB = link_data[2] + for link_data in corelink_map.values(): + # Formatted as: ( link ID, node_a, node_b, igp_metric, capacity, srlg_list, [norm_thr], [fail_thr] ) + link_id = link_data[0] + node_a = link_data[1] + node_b = link_data[2] capacity = link_data[4] # parse thresholds if present, else use defaults norm_thr = ( - link_data[6] if len(link_data) > 6 and isinstance(link_data[6], (float, int)) else DEFAULT_NORMAL_THRESHOLD + link_data[6] if len(link_data) > 6 and isinstance(link_data[6], (float, int)) else DEFAULT_NORMAL_THRESHOLD # noqa: PLR2004 ) fail_thr = ( - link_data[7] if len(link_data) > 7 and isinstance(link_data[7], (float, int)) else DEFAULT_FAILURE_THRESHOLD + link_data[7] if len(link_data) > 7 and isinstance(link_data[7], (float, int)) else DEFAULT_FAILURE_THRESHOLD # noqa: PLR2004 ) # threshold check - mustUpgrade = False - shouldUpgrade = False - for scenarioType in ENABLED_SCENARIOS: - (usage, ts, scenarioName) = aggLinkUsage[lID][scenarioType] + must_upgrade = False + should_upgrade = False + for scenario_type in ENABLED_SCENARIOS: + (usage, _, _) = agg_link_usage[link_id][scenario_type] thr = 1 - if scenarioType == "normal" and norm_thr > 0: + if scenario_type == "normal" and norm_thr > 0: thr = norm_thr - elif "fail" in scenarioType and fail_thr > 0: + elif "fail" in scenario_type and fail_thr > 0: thr = fail_thr - if usage > thr * capacity and scenarioType in MUST_UPGRADE_SCENARIOS: - mustUpgrade = True - elif usage > thr * capacity and scenarioType in SHOULD_UPGRADE_SCENARIOS: - shouldUpgrade = True + if usage > thr * capacity and scenario_type in MUST_UPGRADE_SCENARIOS: + must_upgrade = True + elif usage > thr * capacity and scenario_type in SHOULD_UPGRADE_SCENARIOS: + should_upgrade = True # print one row per core link final_report_raw.append({ - "ID": lID, - "NodeA": nodeA, - "NodeB": nodeB, + "ID": link_id, + "NodeA": node_a, + "NodeB": node_b, "CapacityGbps": capacity, "ConfNormalThrPct": norm_thr, "ConfNormalThrGbps": norm_thr * capacity, "ConfFailThrPct": fail_thr, "ConfFailThrGbps": fail_thr * capacity, - "NormalEnabled": scenarioEnabled("normal"), - "NormalUsagePct": usagePct("normal", capacity), - "NormalUsageGbps": usageGbps("normal"), + "NormalEnabled": scenario_enabled("normal"), + "NormalUsagePct": usage_pct("normal", capacity), + "NormalUsageGbps": usage_gbps("normal"), "NormalUsageExceed": exceed("normal", norm_thr, capacity), - "NormalDateTime": scenarioTimestamp("normal"), - "1LinkFailEnabled": scenarioEnabled("onelinkfail"), - "1LinkFailScenario": failScenarioName("onelinkfail"), - "1LinkFailUsagePct": usagePct("onelinkfail", capacity), - "1LinkFailUsageGbps": usageGbps("onelinkfail"), + "NormalDateTime": scenario_timestamp("normal"), + "1LinkFailEnabled": scenario_enabled("onelinkfail"), + "1LinkFailScenario": fail_scenario_name("onelinkfail"), + "1LinkFailUsagePct": usage_pct("onelinkfail", capacity), + "1LinkFailUsageGbps": usage_gbps("onelinkfail"), "1LinkFailUsageExceed": exceed("onelinkfail", fail_thr, capacity), - "1LinkFailUsageTime": scenarioTimestamp("onelinkfail"), - "2LinkFailEnabled": scenarioEnabled("twolinkfail"), - "2LinkFailScenario": failScenarioName("twolinkfail"), - "2LinkFailUsagePct": usagePct("twolinkfail", capacity), - "2LinkFailUsageGbps": usageGbps("twolinkfail"), + "1LinkFailUsageTime": scenario_timestamp("onelinkfail"), + "2LinkFailEnabled": scenario_enabled("twolinkfail"), + "2LinkFailScenario": fail_scenario_name("twolinkfail"), + "2LinkFailUsagePct": usage_pct("twolinkfail", capacity), + "2LinkFailUsageGbps": usage_gbps("twolinkfail"), "2LinkFailUsageExceed": exceed("twolinkfail", fail_thr, capacity), - "2LinkFailUsageTime": scenarioTimestamp("twolinkfail"), - "1NodeFailEnabled": scenarioEnabled("onenodefail"), - "1NodeFailScenario": failScenarioName("onenodefail"), - "1NodeFailUsagePct": usagePct("onenodefail", capacity), - "1NodeFailUsageGbps": usageGbps("onenodefail"), + "2LinkFailUsageTime": scenario_timestamp("twolinkfail"), + "1NodeFailEnabled": scenario_enabled("onenodefail"), + "1NodeFailScenario": fail_scenario_name("onenodefail"), + "1NodeFailUsagePct": usage_pct("onenodefail", capacity), + "1NodeFailUsageGbps": usage_gbps("onenodefail"), "1NodeFailUsageExceed": exceed("onenodefail", fail_thr, capacity), - "1NodeFailUsageTime": scenarioTimestamp("onenodefail"), - "Node+1LinkFailEnabled": scenarioEnabled("nodelinkfail"), - "Node+1LinkFailScenario": failScenarioName("nodelinkfail"), - "Node+1LinkFailUsagePct": usagePct("nodelinkfail", capacity), - "Node+1LinkFailUsageGbps": usageGbps("nodelinkfail"), + "1NodeFailUsageTime": scenario_timestamp("onenodefail"), + "Node+1LinkFailEnabled": scenario_enabled("nodelinkfail"), + "Node+1LinkFailScenario": fail_scenario_name("nodelinkfail"), + "Node+1LinkFailUsagePct": usage_pct("nodelinkfail", capacity), + "Node+1LinkFailUsageGbps": usage_gbps("nodelinkfail"), "Node+1LinkFailUsageExceed": exceed("nodelinkfail", fail_thr, capacity), - "Node+1LinkFailUsageTime": scenarioTimestamp("nodelinkfail"), - "MustUpgrade": mustUpgrade, - "ShouldUpgrade": shouldUpgrade, + "Node+1LinkFailUsageTime": scenario_timestamp("nodelinkfail"), + "MustUpgrade": must_upgrade, + "ShouldUpgrade": should_upgrade, }) df_raw = pd.DataFrame( @@ -614,6 +576,7 @@ def build_raw_report(corelink_map, aggLinkUsage): def store_raw_report(df_raw): + """Store raw report data.""" directory = Path(RAW_REPORT_DIRECTORY) directory.mkdir(parents=True, exist_ok=True) # Create directory if it doesn't exist @@ -625,21 +588,16 @@ def store_raw_report(df_raw): df_raw.to_csv(filepath, sep=",", encoding="utf-8", date_format=ISO8601_FORMAT, header=True) -############################################################################### -# MAIN -############################################################################### - - def main(): - # Record and display the start time of the script + """Run all what-if scenarios.""" start_time = datetime.now(UTC) - print("===============================================================") - print("Execution started at:", start_time.strftime(ISO8601_FORMAT)) + msg = f"Execution started at: {start_time.strftime(ISO8601_FORMAT)}" + logger.info(msg) # 1) query Kentik to build traffic demand matrices - print("Querying Kentik API for traffic demand matrix...") + logger.debug("Querying Kentik API for traffic demand matrix...") kentik_json = kentik.fetch_kentik_traffic_matrix() - print("Processing traffic demand matrices from Kentik data...") + logger.debug("Processing traffic demand matrices from Kentik data...") traffic_matrices = kentik.kentik_to_traffic_matrices( kentik_json, nettopo.NODES ) # returns traffic_matrices[timestamp][ingress][egress] = traffic_rate_Mbps @@ -653,28 +611,31 @@ def main(): tasks = build_parallel_tasks( graph, nettopo.NODES, nettopo.NODE_FAILOVER_RATIOS, corelink_map, srlg_map, traffic_matrices ) - print( - f"Number of traffic simulation tasks: {len(tasks)} across {len(traffic_matrices)} timeslots with {len(tasks) / len(traffic_matrices):.0f} scenarios each" + msg = ( + f"Number of traffic simulation tasks: {len(tasks)} across {len(traffic_matrices)} timeslots with " + f"{len(tasks) / len(traffic_matrices):.0f} scenarios each." ) + logger.info(msg) # 4) now run all tasks in a single global pool - print("Executing parallel traffic simulation tasks...") + logger.debug("Executing parallel traffic simulation tasks...") with multiprocessing.Pool() as pool: tasks_results = pool.map(compute_scenario_task, tasks) # 5) aggregate link usage across all tasks, recording the worst-case (highest) load on each link - aggLinkUsage = link_usage_aggregator(corelink_map, tasks_results) + agg_link_usage = link_usage_aggregator(corelink_map, tasks_results) # 6) create and store final report - raw_report = build_raw_report(corelink_map, aggLinkUsage) + raw_report = build_raw_report(corelink_map, agg_link_usage) store_raw_report(raw_report) # Display the end time, and total execution duration end_time = datetime.now(UTC) duration = end_time - start_time - print("Execution ended at:", end_time.strftime(ISO8601_FORMAT)) - print("Total duration in seconds:", duration.total_seconds()) - print("===============================================================") + msg = f"Execution ended at: {end_time.strftime(ISO8601_FORMAT)}" + logger.info(msg) + msg = f"Total duration in seconds: {duration.total_seconds()}" + logger.info(msg) if __name__ == "__main__": diff --git a/pyproject.toml b/pyproject.toml index 542d08a..356a33e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,6 +25,7 @@ enable_error_code = "ignore-without-code" extend-exclude = [ "htmlcov", "docs", + "capacity_planner/alembic" ] target-version = "py312" line-length = 120 -- GitLab