Skip to content
Snippets Groups Projects
Verified Commit b9543578 authored by Karel van Klink's avatar Karel van Klink :smiley_cat:
Browse files

Reformat files

parent 6a43fc36
Branches main
No related tags found
No related merge requests found
"""GÉANT Capacity Planner."""
"""Generate a capacity report."""
import argparse
import os
import re
import sys
from datetime import UTC, datetime, timedelta
from logging import getLogger
from pathlib import Path
import numpy as np
import pandas as pd # https://pandas.pydata.org
import pandas as pd
logger = getLogger(__name__)
###############################################################################
# INPUT DATA SECTION
###############################################################################
# make sure this matches with the What-If Scenario runner script
RAW_REPORT_DIRECTORY = "/Users/daniel.verlouw/Desktop/rawreports"
......@@ -32,14 +33,11 @@ ISO8601_FORMAT = "%Y%m%dT%H%MZ"
ISO8601_REGEXP = r"\d{4}\d{2}\d{2}T\d{2}\d{2}Z"
###############################################################################
# RAW CONSOLIDATED REPORT
###############################################################################
# --- Helper function to get the row of the max usage for a given column, handling empty/missing columns ---
def get_max_usage_row(group, usage_col):
"""Returns a single row (as a Series) within `group` that has the maximum value in `usage_col`.
"""Given a list of rows, return the row with the highest usage.
Returns a single row (as a Series) within `group` that has the maximum value in `usage_col`.
If `usage_col` does not exist or is entirely NaN, returns None.
"""
# If the column doesn't exist or has all null values, return None
......@@ -53,9 +51,10 @@ def get_max_usage_row(group, usage_col):
def extract_usage_details(group):
"""For a single group of rows (all links with the same ID), find the row with the max usage for each usage field (Gbps)
and extract the relevant columns.
Booleans are set to True if at least one row in the group is True.
"""Extract usage details.
For a single group of rows (all links with the same ID), find the row with the max usage for each usage field (Gbps)
and extract the relevant columns. Booleans are set to True if at least one row in the group is True.
"""
# We'll create a dict to hold the final data for this ID.
out = {}
......@@ -140,12 +139,8 @@ def extract_usage_details(group):
return pd.Series(out)
###############################################################################
# HUMAN READABLE CONSOLIDATED REPORT
###############################################################################
def build_human_report(df_raw):
"""Build a human-readable report."""
df_human = df_raw.copy()
# Helper formatting functions
......@@ -245,35 +240,32 @@ def build_human_report(df_raw):
]
###############################################################################
# FILE FUNCTIONS
###############################################################################
def find_files_by_timeframe(directory, prefix, suffix, start_datetime, end_datetime):
"""Find all files that fall within a given timeframe."""
# List all raw reports in directory
all_raw_reports = [
file
for file in os.listdir(directory)
if os.path.isfile(os.path.join(directory, file))
and file.startswith(prefix)
and file.endswith(suffix)
and re.search(ISO8601_REGEXP, file)
for file in Path(directory).iterdir()
if Path(directory / file).is_file()
and file.name.startswith(prefix)
and file.name.endswith(suffix)
and re.search(ISO8601_REGEXP, file.name)
]
# Filter to files that match the timestamp pattern within the specified datetime range
matching_files = []
for file in all_raw_reports:
match = re.search(ISO8601_REGEXP, file)
match = re.search(ISO8601_REGEXP, file.name)
file_date = datetime.strptime(match.group(), ISO8601_FORMAT).replace(tzinfo=UTC)
if start_datetime <= file_date <= end_datetime:
matching_files.append(os.path.join(directory, file))
matching_files.append(Path(directory / file))
return matching_files
def store_consolidated(df_consolidated, directory, prefix, suffix):
"""Store consolidated results in a file."""
path = Path(directory)
path.mkdir(parents=True, exist_ok=True) # Create directory if it doesn't exist
......@@ -283,22 +275,17 @@ def store_consolidated(df_consolidated, directory, prefix, suffix):
if suffix == "csv":
df_consolidated.to_csv(
os.path.join(path, filename), sep=",", encoding="utf-8", date_format=ISO8601_FORMAT, header=True
Path(path / filename), sep=",", encoding="utf-8", date_format=ISO8601_FORMAT, header=True
)
elif suffix == "txt":
markdown = df_consolidated.to_markdown(headers="keys", tablefmt="psql")
# Write the markdown string to a file
with open(os.path.join(path, filename), "w") as file:
file.write(markdown)
###############################################################################
# MAIN
###############################################################################
Path(path / filename).write_text(markdown)
def main():
"""Main method for running the capacity planner."""
# Parse commandline arguments
parser = argparse.ArgumentParser(description="Script usage:")
parser.add_argument("--daily", action="store_true", help="Create daily report (past day)")
......@@ -343,9 +330,11 @@ def main():
)
if len(matching_files) > 0:
print(
f"Generating consolidated report for {len(matching_files)} raw reports for timeframe {start_datetime} through {end_datetime}"
msg = (
f"Generating consolidated report for {len(matching_files)} raw reports for timeframe {start_datetime} "
f"through {end_datetime}"
)
logger.info(msg)
# List of columns that should be parsed as dates from CSV
date_columns = [
......@@ -396,7 +385,8 @@ def main():
)
else:
print(f"No raw files found for timeframe {start_datetime} through {end_datetime}")
msg = f"No raw files found for timeframe {start_datetime} through {end_datetime}"
logger.warning(msg)
if __name__ == "__main__":
......
"""Different services that the capacity planner interacts with."""
"""Interactions with Kentik through their API."""
import os
import requests
......@@ -24,16 +26,14 @@ EGRESS_DIMENSION = "i_ult_exit_site"
def _api_query(payload):
# Headers for authentication
headers = {"Content-Type": "application/json", "X-CH-Auth-Email": API_EMAIL, "X-CH-Auth-API-Token": API_TOKEN}
response = requests.post(API_URL, headers=headers, json=payload)
response = requests.post(API_URL, headers=headers, json=payload, timeout=120)
if response.status_code == 200:
return response.json()
print(f"Error fetching data from Kentik API: {response.status_code} - {response.text}")
return None
response.raise_for_status()
return response.json()
def fetch_kentik_traffic_matrix():
# JSON query payload
"""Fetch a traffic matrix from Kentik."""
payload = {
"version": 4,
"queries": [
......@@ -44,12 +44,10 @@ def fetch_kentik_traffic_matrix():
"all_devices": True,
"aggregateTypes": ["max_in_bits_per_sec"],
"depth": 350,
"topx": 350, # 350=max supported by Kentik
"topx": 350,
"device_name": [],
"fastData": "Auto",
"lookback_seconds": 60 * KENTIK_REPORTING_PERIOD,
# "starting_time": null,
# "ending_time": null,
"matrixBy": [],
"metric": ["in_bytes"],
"minsPolling": KENTIK_FLOW_AGGR_WINDOW,
......@@ -96,8 +94,7 @@ def fetch_kentik_traffic_matrix():
],
}
response = _api_query(payload)
return response
return _api_query(payload)
###############################################################################
......@@ -106,9 +103,9 @@ def fetch_kentik_traffic_matrix():
def kentik_to_traffic_matrices(json_data, nodes):
"""Convert the given JSON structure returned by Kentik into a dictionary
keyed by timestamp. For each timestamp, we store a nested dict of:
traffic_matrices[timestamp][ingress][egress] = traffic_rate_Mbps
"""Convert the given JSON structure returned by Kentik into a dictionary keyed by timestamp.
For each timestamp, we store a nested dict of: traffic_matrices[timestamp][ingress][egress] = traffic_rate_Mbps
"""
# We'll gather all flows in the JSON
data_entries = json_data["results"][0]["data"]
......
"""Utilities for capacity-planner."""
# (A) Define each core link:
# ( linkID, nodeA, nodeB, igp_metric, capacity, srlg_list, [normal_threshold], [failure_threshold] )
# where:
# linkID: network-wide unique numeric ID (e.g. 1001)
# nodeA, nodeB: core link endpoints
# igp_metric: IGP cost/distance
# capacity: full-duplex link capacity in Gbps
# srlg_list: list of Shared Risk Link Group (SRLG) names (or empty)
# normal_threshold: fraction for normal usage. If omitted default is used
# failure_threshold: fraction for usage under failure. If omitted default is used
"""Define each core link in a topology.
Each link is a tuple shaped like:
( linkID, nodeA, nodeB, igp_metric, capacity, srlg_list, [normal_threshold], [failure_threshold] )
where:
linkID: network-wide unique numeric ID (e.g. 1001)
nodeA, nodeB: core link endpoints
igp_metric: IGP cost/distance
capacity: full-duplex link capacity in Gbps
srlg_list: list of Shared Risk Link Group (SRLG) names (or empty)
normal_threshold: fraction for normal usage. If omitted default is used
failure_threshold: fraction for usage under failure. If omitted default is used
"""
CORELINKS = [
(1, "AMS", "FRA", 2016, 800, []),
(2, "AMS", "LON", 1428, 800, []),
......@@ -43,22 +48,12 @@ CORELINKS = [
(32, "ATH2", "MIL2", 25840, 100, ["MIL2-PRE"]),
(33, "ATH2", "THE", 8200, 100, []),
(34, "SOF", "THE", 5800, 100, []),
# ("COP", "HAM", 480, 400, []),
# ("COP", "STO", 600, 400, []),
# ("HEL", "STO", 630, 400, []),
(35, "RIG", "TAR", 2900, 100, []),
(36, "KAU", "POZ", 10050, 100, []),
# ("BEL", "SOF", 444, 400, []),
# ("BEL", "ZAG", 528, 400, []),
(37, "ZAG", "SOF", 9720, 200, []),
(38, "BRA", "BUD", 50000, 100, ["BRA-BUD"]),
(39, "COR", "LON2", 7160, 100, ["COR-LON2"]),
# ("HEL", "TAR", 227, 400, []),
(40, "KAU", "RIG", 4500, 100, []),
# ("BRU", "LUX", 380, 400, []),
# ("FRA", "LUX", 312, 400, []),
# ("RIG", "STO", 400, 400, []),
# ("COP", "POZ", 750, 400, []),
(41, "COR", "PAR", 12549, 100, ["COR-LON2"]),
(42, "KIE", "POZ", 50000, 100, []),
(43, "CHI", "BUC", 50000, 40, []),
......@@ -111,25 +106,21 @@ NODES = [
NODE_FAILOVER_RATIOS = {
"AMS": {"LON": 0.6, "FRA": 0.4},
"ATH": {"THE": 0.5, "MAR": 0.5},
# "BEL": {"ZAG": 1.0 },
"BIL": {"MAD": 1.0},
"BRA": {"VIE": 1.0},
"BRU": {"AMS": 1.0},
"BUC": {"VIE": 0.5, "SOF": 0.5},
"BUD": {"ZAG": 1.0},
# "COP": {"HAM": 0.5, "STO": 0.5 },
"COR": {"DUB": 1.0},
"DUB": {"COR": 1.0},
"FRA": {"HAM": 0.4, "AMS": 0.4, "LON": 0.2},
"GEN": {"PAR": 0.6, "MIL2": 0.4},
"HAM": {"FRA": 0.5, "POZ": 0.2, "LON": 0.3},
# "HEL": {"TAR": 0.3, "HAM": 0.7 },
"KAU": {"RIG": 1.0},
"LIS": {"POR": 1.0},
"LJU": {"ZAG": 1.0},
"LON": {"AMS": 0.4, "HAM": 0.2, "FRA": 0.4},
"LON2": {"LON": 1.0},
# "LUX": {"BRU": 1.0 },
"MAD": {"BIL": 1.0},
"MAR": {"MIL2": 0.6, "ATH": 0.4},
"MIL2": {"GEN": 0.3, "MAR": 0.3, "VIE": 0.3},
......@@ -139,7 +130,6 @@ NODE_FAILOVER_RATIOS = {
"PRA": {"VIE": 1.0},
"RIG": {"KAU": 1.0},
"SOF": {"THE": 0.5, "BUC": 0.5},
# "STO": {"COP": 0.5, "HEL": 0.5 },
"TAR": {"RIG": 1.0},
"THE": {"ATH": 0.5, "SOF": 0.5},
"VIE": {"MIL2": 0.6, "PRA": 0.2, "BUC": 0.2},
......
"""Run all different what-if scenarios."""
import itertools
import math
import multiprocessing
from datetime import UTC, datetime
from logging import getLogger
from pathlib import Path
from capacity_planner.utils import topology as nettopo
import networkx as nx
import pandas as pd
from capacity_planner.services import kentik
from capacity_planner.utils import topology as nettopo
logger = getLogger(__name__)
###############################################################################
# 1) INPUT DATA SECTION
###############################################################################
# If normal threshold not specified on a link, default to 0.40 (allowing upto 40% link utilization in normal non-failure scenario)
# If normal threshold not specified on a link, default to 40% link utilization in normal non-failure scenario
DEFAULT_NORMAL_THRESHOLD = 0.40
# If failure threshold not specified on a link, default to 0.80 (allowing upto 80% link utilization in worst-case failure scenario)
# If failure threshold not specified on a link, default to 80% link utilization in worst-case failure scenario
DEFAULT_FAILURE_THRESHOLD = 0.80
ENABLED_SCENARIOS = ["normal", "onelinkfail", "twolinkfail", "onenodefail", "nodelinkfail"]
......@@ -28,75 +31,53 @@ SHOULD_UPGRADE_SCENARIOS = ["twolinkfail", "nodelinkfail"]
RAW_REPORT_DIRECTORY = "/Users/daniel.verlouw/Desktop/rawreports"
RAW_REPORT_FILE_PREFIX = "raw_capacityreport_"
RAW_REPORT_FILE_SUFFIX = "csv"
# ISO 8601 basic timestamp format (YYYYMMDDTHHMMZ)
ISO8601_FORMAT = "%Y%m%dT%H%MZ"
ISO8601_REGEXP = r"\d{4}\d{2}\d{2}T\d{2}\d{2}Z"
###############################################################################
# 2) Build a local NetworkX multi-directional graph for scenario computations
###############################################################################
def build_graph(nodes, links):
# MultiDiGraph supports multiple edges (ECMP links) between two nodes
# and is directional, allowing modelling of Full-Duplex links and taking
# traffic direction into account
"""Build a graph of nodes and links.
MultiDiGraph supports multiple edges (ECMP links) between two nodes and is directional, allowing modelling of
Full-Duplex links and taking traffic direction into account.
"""
graph = nx.MultiDiGraph()
graph.add_nodes_from(nodes)
for link in links:
# ( linkID, nodeA, nodeB, igp_metric, capacity, srlg_list, [norm_thr], [fail_thr] )
lID = link[0]
nodeA = link[1]
nodeB = link[2]
link_id = link[0]
node_a = link[1]
node_b = link[2]
igp = link[3]
# Add Full-Duplex edges in both directions with linkID
# assumes the IGP metric is configured symmetrically
graph.add_edge(nodeA, nodeB, key=lID, igpmetric=igp)
graph.add_edge(nodeB, nodeA, key=lID, igpmetric=igp)
graph.add_edge(node_a, node_b, key=link_id, igpmetric=igp)
graph.add_edge(node_b, node_a, key=link_id, igpmetric=igp)
return graph
###############################################################################
# 3) Helper functions to ease lookups for core links and SRLGs
###############################################################################
def build_corelink_map(links):
# Creating a lookup dictionary for fast lookup by linkID
corelink_dict = {link[0]: link for link in links}
return corelink_dict
"""Creating a lookup dictionary for fast lookup by linkID."""
return {link[0]: link for link in links}
def build_srlg_map(links):
# Maps SRLG name to one or more core links (linkIDs)
"""Maps SRLG name to one or more core link IDs."""
srlg_map = {}
for link in links:
# ( linkID, nodeA, nodeB, igp_metric, capacity, srlg_list, [norm_thr], [fail_thr] )
lID = link[0]
nodeA = link[1]
nodeB = link[2]
igp = link[3]
if len(link) > 5 and isinstance(link[5], list):
if len(link) > 5 and isinstance(link[5], list): # noqa: PLR2004
for srlg in link[5]:
srlg_map.setdefault(srlg, set())
srlg_map[srlg].add(lID)
srlg_map[srlg].add(link[0])
return srlg_map
###############################################################################
# 4) Node failover logic - redistribute traffic to other core nodes
###############################################################################
def redistribute_local_traffic(t_xx, ratio_map):
"""We treat local traffic T[X,X] as two endpoints at X.
Each endpoint re-homes to neighbor i or j with probability ratio_map[i], ratio_map[j].
=> fraction = ratio_map[i]*ratio_map[j]*t_xx => T[i,j].
If i=j, it's T[i,i] (local on neighbor i).
......@@ -112,23 +93,20 @@ def redistribute_local_traffic(t_xx, ratio_map):
return local_matrix
def build_traffic_with_node_fail(baseTraffic, nodefail_ratios, failingNode):
"""Return scenario-specific traffic matrix in Mbps,
re-homing T[failingNode,*], T[*,failingNode], T[failingNode,failingNode].
"""
def build_traffic_with_node_fail(base_traffic, nodefail_ratios, failing_node):
"""Return scenario-specific traffic matrix in Mbps."""
scenario_traffic = {}
ratio_map = nodefail_ratios.get(failingNode, {})
ratio_map = nodefail_ratios.get(failing_node, {})
for s in baseTraffic:
for s in base_traffic: # noqa: PLR1702 FIXME
scenario_traffic[s] = {}
for d in baseTraffic[s]:
val = baseTraffic[s][d]
for d in base_traffic[s]:
val = base_traffic[s][d]
if val <= 0:
scenario_traffic[s][d] = 0
continue
# traffic is local - T[failingNode,failingNode]
if s == failingNode and d == failingNode:
if s == failing_node and d == failing_node:
scenario_traffic[s][d] = 0
mat = redistribute_local_traffic(val, ratio_map)
for i in mat:
......@@ -139,14 +117,14 @@ def build_traffic_with_node_fail(baseTraffic, nodefail_ratios, failingNode):
scenario_traffic[i][j] = scenario_traffic[i].get(j, 0) + portion
# T[failingNode,*]
elif s == failingNode:
elif s == failing_node:
scenario_traffic[s][d] = 0
for nbr, r in ratio_map.items():
scenario_traffic.setdefault(nbr, {})
scenario_traffic[nbr][d] = scenario_traffic[nbr].get(d, 0) + val * r
# T[*,failingNode]
elif d == failingNode:
elif d == failing_node:
scenario_traffic[s][d] = 0
for nbr, r in ratio_map.items():
scenario_traffic[s][nbr] = scenario_traffic[s].get(nbr, 0) + val * r
......@@ -158,45 +136,38 @@ def build_traffic_with_node_fail(baseTraffic, nodefail_ratios, failingNode):
return scenario_traffic
###############################################################################
# 5) Remove SRLG or corelink from graph
###############################################################################
def remove_corelink(graph, removeLink):
def remove_corelink(graph, remove_link):
"""Remove a core link from a graph."""
edges_rm = []
for u, v, lID in graph.edges(keys=True):
if lID == removeLink:
edges_rm.append((u, v, lID))
for u, v, link_id in graph.edges(keys=True):
if link_id == remove_link:
edges_rm.append((u, v, link_id))
for e in edges_rm:
graph.remove_edge(*e)
def remove_corelink_or_srlg(graph, failedElement, srlg_map):
etype, data = failedElement
def remove_corelink_or_srlg(graph, failed_element, srlg_map):
"""Remove a core link or SRLG."""
etype, data = failed_element
if etype == "LINK":
linkID = data
remove_corelink(graph, linkID)
link_id = data
remove_corelink(graph, link_id)
elif etype == "SRLG":
if data in srlg_map:
for linkID in srlg_map[data]:
remove_corelink(graph, linkID)
for link_id in srlg_map[data]:
remove_corelink(graph, link_id)
###############################################################################
# 6) Compute scenario load in MultiDiGraph
###############################################################################
def compute_scenario_load(graph, scenario_traffic):
def compute_scenario_load(graph, scenario_traffic): # noqa: PLR0912 FIXME
"""Compute load per scenario."""
# initialize empty usage map
usageMap = {}
for u, v, lID in graph.edges(keys=True):
usageMap[u, v, lID] = 0.0
usage_map = {}
for u, v, link_id in graph.edges(keys=True):
usage_map[u, v, link_id] = 0.0
# walk over [ingress][egress] traffic demands
for s in scenario_traffic:
for s in scenario_traffic: # noqa: PLR1702
for d in scenario_traffic[s]:
demand_mbps = scenario_traffic[s][d]
if demand_mbps <= 0 or s == d or (s not in graph) or (d not in graph):
......@@ -244,170 +215,162 @@ def compute_scenario_load(graph, scenario_traffic):
if igp < best_igp:
best_igp = igp
best_k = k2
usageMap[p, q, best_k] += share
return usageMap
usage_map[p, q, best_k] += share
return usage_map
###############################################################################
# 7) Individual Normal/Failure Scenario Runners
###############################################################################
def run_normal_scenario(graph, trafficMatrix):
def run_normal_scenario(graph, traffic_matrix):
"""No failures => just compute load with the base traffic."""
return compute_scenario_load(graph, trafficMatrix)
return compute_scenario_load(graph, traffic_matrix)
def run_onenode_failure(graph, trafficMatrix, nodefail_ratios, nodeFail):
def run_onenode_failure(graph, traffic_matrix, nodefail_ratios, node_fail):
"""Remove nodeFail from graph, shift its traffic T[nodeFail,*], T[*,nodeFail], T[nodeFail,nodeFail]."""
if nodeFail in graph:
graph.remove_node(nodeFail)
scenario_traffic = build_traffic_with_node_fail(trafficMatrix, nodefail_ratios, nodeFail)
if node_fail in graph:
graph.remove_node(node_fail)
scenario_traffic = build_traffic_with_node_fail(traffic_matrix, nodefail_ratios, node_fail)
return compute_scenario_load(graph, scenario_traffic)
def run_onelink_failure(graph, trafficMatrix, srlg_map, linkFail):
"""LinkFail = ("LINK",(u,v)) or ("SRLG",{(x,y),...}).
def run_one_link_failure(graph, traffic_matrix, srlg_map, link_fail):
"""Run a scenario with a single link failure.
LinkFail = ("LINK",(u,v)) or ("SRLG",{(x,y),...}).
Remove link from a copy of graph, then compute load with base traffic.
"""
remove_corelink_or_srlg(graph, linkFail, srlg_map)
return compute_scenario_load(graph, trafficMatrix)
remove_corelink_or_srlg(graph, link_fail, srlg_map)
return compute_scenario_load(graph, traffic_matrix)
def run_twolink_failure(graph, trafficMatrix, srlg_map, linkFail1, linkFail2):
"""linkFail1, linkFail2 each = ("LINK",(u,v)) or ("SRLG",{(x,y),...}).
def run_two_link_failure(graph, traffic_matrix, srlg_map, link_fail_1, link_fail_2):
"""Run a scenario where two links fail.
linkFail1, linkFail2 each = ("LINK",(u,v)) or ("SRLG",{(x,y),...}).
Remove them from a copy of graph, then compute load with base traffic.
"""
remove_corelink_or_srlg(graph, linkFail1, srlg_map)
remove_corelink_or_srlg(graph, linkFail2, srlg_map)
return compute_scenario_load(graph, trafficMatrix)
remove_corelink_or_srlg(graph, link_fail_1, srlg_map)
remove_corelink_or_srlg(graph, link_fail_2, srlg_map)
return compute_scenario_load(graph, traffic_matrix)
def run_nodelink_failure(graph, trafficMatrix, srlg_map, nodefail_ratios, nodeFail, linkFail):
def run_node_link_failure(graph, traffic_matrix, srlg_map, node_fail_ratios, node_fail, link_fail): # noqa: PLR0917
"""Run scenario for node and link failure."""
# first remove node and shift traffic
if nodeFail in graph:
graph.remove_node(nodeFail)
scenario_traffic = build_traffic_with_node_fail(trafficMatrix, nodefail_ratios, nodeFail)
if node_fail in graph:
graph.remove_node(node_fail)
scenario_traffic = build_traffic_with_node_fail(traffic_matrix, node_fail_ratios, node_fail)
# then remove link
remove_corelink_or_srlg(graph, linkFail, srlg_map)
remove_corelink_or_srlg(graph, link_fail, srlg_map)
return compute_scenario_load(graph, scenario_traffic)
###############################################################################
# 8) Single scenario task runner
###############################################################################
def expand_link_name(corelink_map, linkFail):
def expand_link_name(corelink_map, link_fail):
"""Helper function--return a string describing one scenario element (LINK linkID or SRLG name)."""
etype, data = linkFail
etype, data = link_fail
if etype == "LINK":
# get corelink details by ID
td = corelink_map.get(data)
nodeA = td[1]
nodeB = td[2]
return f"LINK_{data}/{nodeA}-{nodeB}"
node_a = td[1]
node_b = td[2]
return f"LINK_{data}/{node_a}-{node_b}"
if etype == "SRLG":
return f"SRLG_{data}"
return str(linkFail)
return str(link_fail)
def compute_scenario_task(args):
"""Each scenario runs as a single task"""
(graph_orig, corelink_map, srlg_map, nodefail_ratios, ts, trafficMatrix, scenarioType, scenarioData) = args
"""Each scenario runs as a single task."""
(graph_orig, corelink_map, srlg_map, node_fail_ratios, ts, traffic_matrix, scenario_type, scenario_data) = args
# 1) first, create a copy of graph so in-memory copy of original is not modified
graph = graph_orig.copy()
# 2) run selected normal or failure scenario
usageMap = {}
scenarioName = ""
if scenarioType == "normal":
usageMap = run_normal_scenario(graph, trafficMatrix)
scenarioName = "normal"
elif scenarioType == "onenodefail":
nodeFail = scenarioData
usageMap = run_onenode_failure(graph, trafficMatrix, nodefail_ratios, nodeFail)
scenarioName = f"NODE_{nodeFail}"
elif scenarioType == "onelinkfail":
linkFail = scenarioData
usageMap = run_onelink_failure(graph, trafficMatrix, srlg_map, linkFail)
scenarioName = f"{expand_link_name(corelink_map, linkFail)}"
elif scenarioType == "twolinkfail":
(linkFail1, linkFail2) = scenarioData
usageMap = run_twolink_failure(graph, trafficMatrix, srlg_map, linkFail1, linkFail2)
scenarioName = f"{expand_link_name(corelink_map, linkFail1)} + {expand_link_name(corelink_map, linkFail2)}"
elif scenarioType == "nodelinkfail":
(nodeFail, linkFail) = scenarioData
usageMap = run_nodelink_failure(graph, trafficMatrix, srlg_map, nodefail_ratios, nodeFail, linkFail)
scenarioName = f"NODE_{nodeFail} + {expand_link_name(corelink_map, linkFail)}"
usage_map = {}
scenario_name = ""
if scenario_type == "normal":
usage_map = run_normal_scenario(graph, traffic_matrix)
scenario_name = "normal"
elif scenario_type == "onenodefail":
node_fail = scenario_data
usage_map = run_onenode_failure(graph, traffic_matrix, node_fail_ratios, node_fail)
scenario_name = f"NODE_{node_fail}"
elif scenario_type == "onelinkfail":
link_fail = scenario_data
usage_map = run_one_link_failure(graph, traffic_matrix, srlg_map, link_fail)
scenario_name = f"{expand_link_name(corelink_map, link_fail)}"
elif scenario_type == "twolinkfail":
(link_fail_1, link_fail_2) = scenario_data
usage_map = run_two_link_failure(graph, traffic_matrix, srlg_map, link_fail_1, link_fail_2)
scenario_name = f"{expand_link_name(corelink_map, link_fail_1)} + {expand_link_name(corelink_map, link_fail_2)}"
elif scenario_type == "nodelinkfail":
(node_fail, link_fail) = scenario_data
usage_map = run_node_link_failure(graph, traffic_matrix, srlg_map, node_fail_ratios, node_fail, link_fail)
scenario_name = f"NODE_{node_fail} + {expand_link_name(corelink_map, link_fail)}"
else:
usageMap = {}
scenarioName = "unknown"
return (ts, scenarioType, scenarioName, usageMap)
usage_map = {}
scenario_name = "unknown"
###############################################################################
# 9) Build list of tasks to run in parallel
###############################################################################
return ts, scenario_type, scenario_name, usage_map
def build_parallel_tasks(graph, nodes, nodefail_ratios, corelink_map, srlg_map, traffic_matrices):
def build_parallel_tasks(graph, nodes, nodefail_ratios, corelink_map, srlg_map, traffic_matrices): # noqa: PLR0917
"""Build parallel tasks for given failures."""
# enumerate link failure scenarios => link linkIDs + srlg
linkFailElements = []
for link_id, _link_data in corelink_map.items():
linkFailElements.append(("LINK", link_id))
for srlg_name, _srlg_data in srlg_map.items():
linkFailElements.append(("SRLG", srlg_name))
link_fail_elements = []
link_fail_elements.extend([("LINK", link_id) for link_id in corelink_map])
link_fail_elements.extend([("SRLG", srlg_name) for srlg_name in srlg_map])
tasks = []
for ts in traffic_matrices.keys():
trafficMatrix = traffic_matrices[ts]
for ts in traffic_matrices:
traffic_matrix = traffic_matrices[ts]
# add normal non-failure scenario
if "normal" in ENABLED_SCENARIOS:
tasks.append((graph, corelink_map, srlg_map, nodefail_ratios, ts, trafficMatrix, "normal", None))
tasks.append((graph, corelink_map, srlg_map, nodefail_ratios, ts, traffic_matrix, "normal", None))
# add single link fail scenarios
if "onelinkfail" in ENABLED_SCENARIOS:
for linkFail in linkFailElements:
tasks.append((
tasks.extend([
(
graph,
corelink_map,
srlg_map,
nodefail_ratios,
ts,
trafficMatrix,
traffic_matrix,
"onelinkfail",
linkFail,
))
failure,
)
for failure in link_fail_elements
])
# add unique two link fail scenarios
if "twolinkfail" in ENABLED_SCENARIOS:
fe_twofail_combos = list(itertools.combinations(linkFailElements, 2))
for linkFail1, linkFail2 in fe_twofail_combos:
tasks.append((
fe_twofail_combos = list(itertools.combinations(link_fail_elements, 2))
tasks.extend([
(
graph,
corelink_map,
srlg_map,
nodefail_ratios,
ts,
trafficMatrix,
traffic_matrix,
"twolinkfail",
(linkFail1, linkFail2),
))
failure,
)
for failure in fe_twofail_combos
])
for nodeFail in nodes:
for node_fail in nodes:
# add node fail scenarios
if "onenodefail" in ENABLED_SCENARIOS:
tasks.append((
......@@ -416,152 +379,151 @@ def build_parallel_tasks(graph, nodes, nodefail_ratios, corelink_map, srlg_map,
srlg_map,
nodefail_ratios,
ts,
trafficMatrix,
traffic_matrix,
"onenodefail",
nodeFail,
node_fail,
))
# add node + link fail scenarios
if "nodelinkfail" in ENABLED_SCENARIOS:
for linkFail in linkFailElements:
tasks.append((
tasks.extend([
(
graph,
corelink_map,
srlg_map,
nodefail_ratios,
ts,
trafficMatrix,
traffic_matrix,
"nodelinkfail",
(nodeFail, linkFail),
))
(node_fail, link_fail),
)
for link_fail in link_fail_elements
])
return tasks
###############################################################################
# 10) Report generator
###############################################################################
def link_usage_aggregator(corelink_map, results):
"""Build an aggregate of all link usages."""
# initialize empty link usage aggregator
aggLinkUsage = {}
for link_id, _link_data in corelink_map.items():
aggLinkUsage[link_id] = {}
for scenarioType in ENABLED_SCENARIOS:
aggLinkUsage[link_id][scenarioType] = (0.0, pd.NaT, None)
agg_link_usage = {}
for link_id in corelink_map:
agg_link_usage[link_id] = {}
for scenario_type in ENABLED_SCENARIOS:
agg_link_usage[link_id][scenario_type] = (0.0, pd.NaT, None)
# unify results across all tasks - for each scenario, record worst-case (highest) load on each link
for ts, scenarioType, scenarioName, usageMap in results:
for u, v, lID in usageMap:
usage = usageMap[u, v, lID]
oldUsage, oldTs, oldSc = aggLinkUsage[lID][scenarioType]
if usage > oldUsage:
aggLinkUsage[lID][scenarioType] = (usage, ts, f"{scenarioName}")
for ts, scenario_type, scenario_name, usage_map in results:
for u, v, link_id in usage_map:
usage = usage_map[u, v, link_id]
old_usage, _, _ = agg_link_usage[link_id][scenario_type]
if usage > old_usage:
agg_link_usage[link_id][scenario_type] = (usage, ts, f"{scenario_name}")
# return aggLinkUsage[linkID][scenarioType] => (usage,ts,scenarioName)
return aggLinkUsage
# return agg_link_usage[linkID][scenario_type] => (usage,ts,scenario_name)
return agg_link_usage
def build_raw_report(corelink_map, aggLinkUsage):
def build_raw_report(corelink_map, agg_link_usage):
"""Build report from raw data."""
final_report_raw = []
# Helper print functions
def exceed(scenario, thr, capacity):
return aggLinkUsage[lID][scenario][0] > thr * capacity if scenario in ENABLED_SCENARIOS else pd.NA
return agg_link_usage[link_id][scenario][0] > thr * capacity if scenario in ENABLED_SCENARIOS else pd.NA
def usagePct(scenario, capacity):
# (usage,ts,scenarioName)
return aggLinkUsage[lID][scenario][0] / capacity if scenario in ENABLED_SCENARIOS else pd.NA
def usage_pct(scenario, capacity):
"""Returns a tuple that contains usage, ts, and scenario name."""
return agg_link_usage[link_id][scenario][0] / capacity if scenario in ENABLED_SCENARIOS else pd.NA
def usageGbps(scenario):
return aggLinkUsage[lID][scenario][0] if scenario in ENABLED_SCENARIOS else pd.NA
def usage_gbps(scenario):
return agg_link_usage[link_id][scenario][0] if scenario in ENABLED_SCENARIOS else pd.NA
def scenarioTimestamp(scenario):
def scenario_timestamp(scenario):
return (
pd.to_datetime(aggLinkUsage[lID][scenario][1], unit="ms", errors="coerce", utc=True)
pd.to_datetime(agg_link_usage[link_id][scenario][1], unit="ms", errors="coerce", utc=True)
if scenario in ENABLED_SCENARIOS
else pd.NaT
)
def failScenarioName(scenario):
return aggLinkUsage[lID][scenario][2] if scenario in ENABLED_SCENARIOS else None
def fail_scenario_name(scenario):
return agg_link_usage[link_id][scenario][2] if scenario in ENABLED_SCENARIOS else None
def scenarioEnabled(scenario):
def scenario_enabled(scenario):
return scenario in ENABLED_SCENARIOS
# Iterate over each core link
for link_id, link_data in corelink_map.items():
# ( linkID, nodeA, nodeB, igp_metric, capacity, srlg_list, [norm_thr], [fail_thr] )
lID = link_data[0]
nodeA = link_data[1]
nodeB = link_data[2]
for link_data in corelink_map.values():
# Formatted as: ( link ID, node_a, node_b, igp_metric, capacity, srlg_list, [norm_thr], [fail_thr] )
link_id = link_data[0]
node_a = link_data[1]
node_b = link_data[2]
capacity = link_data[4]
# parse thresholds if present, else use defaults
norm_thr = (
link_data[6] if len(link_data) > 6 and isinstance(link_data[6], (float, int)) else DEFAULT_NORMAL_THRESHOLD
link_data[6] if len(link_data) > 6 and isinstance(link_data[6], (float, int)) else DEFAULT_NORMAL_THRESHOLD # noqa: PLR2004
)
fail_thr = (
link_data[7] if len(link_data) > 7 and isinstance(link_data[7], (float, int)) else DEFAULT_FAILURE_THRESHOLD
link_data[7] if len(link_data) > 7 and isinstance(link_data[7], (float, int)) else DEFAULT_FAILURE_THRESHOLD # noqa: PLR2004
)
# threshold check
mustUpgrade = False
shouldUpgrade = False
for scenarioType in ENABLED_SCENARIOS:
(usage, ts, scenarioName) = aggLinkUsage[lID][scenarioType]
must_upgrade = False
should_upgrade = False
for scenario_type in ENABLED_SCENARIOS:
(usage, _, _) = agg_link_usage[link_id][scenario_type]
thr = 1
if scenarioType == "normal" and norm_thr > 0:
if scenario_type == "normal" and norm_thr > 0:
thr = norm_thr
elif "fail" in scenarioType and fail_thr > 0:
elif "fail" in scenario_type and fail_thr > 0:
thr = fail_thr
if usage > thr * capacity and scenarioType in MUST_UPGRADE_SCENARIOS:
mustUpgrade = True
elif usage > thr * capacity and scenarioType in SHOULD_UPGRADE_SCENARIOS:
shouldUpgrade = True
if usage > thr * capacity and scenario_type in MUST_UPGRADE_SCENARIOS:
must_upgrade = True
elif usage > thr * capacity and scenario_type in SHOULD_UPGRADE_SCENARIOS:
should_upgrade = True
# print one row per core link
final_report_raw.append({
"ID": lID,
"NodeA": nodeA,
"NodeB": nodeB,
"ID": link_id,
"NodeA": node_a,
"NodeB": node_b,
"CapacityGbps": capacity,
"ConfNormalThrPct": norm_thr,
"ConfNormalThrGbps": norm_thr * capacity,
"ConfFailThrPct": fail_thr,
"ConfFailThrGbps": fail_thr * capacity,
"NormalEnabled": scenarioEnabled("normal"),
"NormalUsagePct": usagePct("normal", capacity),
"NormalUsageGbps": usageGbps("normal"),
"NormalEnabled": scenario_enabled("normal"),
"NormalUsagePct": usage_pct("normal", capacity),
"NormalUsageGbps": usage_gbps("normal"),
"NormalUsageExceed": exceed("normal", norm_thr, capacity),
"NormalDateTime": scenarioTimestamp("normal"),
"1LinkFailEnabled": scenarioEnabled("onelinkfail"),
"1LinkFailScenario": failScenarioName("onelinkfail"),
"1LinkFailUsagePct": usagePct("onelinkfail", capacity),
"1LinkFailUsageGbps": usageGbps("onelinkfail"),
"NormalDateTime": scenario_timestamp("normal"),
"1LinkFailEnabled": scenario_enabled("onelinkfail"),
"1LinkFailScenario": fail_scenario_name("onelinkfail"),
"1LinkFailUsagePct": usage_pct("onelinkfail", capacity),
"1LinkFailUsageGbps": usage_gbps("onelinkfail"),
"1LinkFailUsageExceed": exceed("onelinkfail", fail_thr, capacity),
"1LinkFailUsageTime": scenarioTimestamp("onelinkfail"),
"2LinkFailEnabled": scenarioEnabled("twolinkfail"),
"2LinkFailScenario": failScenarioName("twolinkfail"),
"2LinkFailUsagePct": usagePct("twolinkfail", capacity),
"2LinkFailUsageGbps": usageGbps("twolinkfail"),
"1LinkFailUsageTime": scenario_timestamp("onelinkfail"),
"2LinkFailEnabled": scenario_enabled("twolinkfail"),
"2LinkFailScenario": fail_scenario_name("twolinkfail"),
"2LinkFailUsagePct": usage_pct("twolinkfail", capacity),
"2LinkFailUsageGbps": usage_gbps("twolinkfail"),
"2LinkFailUsageExceed": exceed("twolinkfail", fail_thr, capacity),
"2LinkFailUsageTime": scenarioTimestamp("twolinkfail"),
"1NodeFailEnabled": scenarioEnabled("onenodefail"),
"1NodeFailScenario": failScenarioName("onenodefail"),
"1NodeFailUsagePct": usagePct("onenodefail", capacity),
"1NodeFailUsageGbps": usageGbps("onenodefail"),
"2LinkFailUsageTime": scenario_timestamp("twolinkfail"),
"1NodeFailEnabled": scenario_enabled("onenodefail"),
"1NodeFailScenario": fail_scenario_name("onenodefail"),
"1NodeFailUsagePct": usage_pct("onenodefail", capacity),
"1NodeFailUsageGbps": usage_gbps("onenodefail"),
"1NodeFailUsageExceed": exceed("onenodefail", fail_thr, capacity),
"1NodeFailUsageTime": scenarioTimestamp("onenodefail"),
"Node+1LinkFailEnabled": scenarioEnabled("nodelinkfail"),
"Node+1LinkFailScenario": failScenarioName("nodelinkfail"),
"Node+1LinkFailUsagePct": usagePct("nodelinkfail", capacity),
"Node+1LinkFailUsageGbps": usageGbps("nodelinkfail"),
"1NodeFailUsageTime": scenario_timestamp("onenodefail"),
"Node+1LinkFailEnabled": scenario_enabled("nodelinkfail"),
"Node+1LinkFailScenario": fail_scenario_name("nodelinkfail"),
"Node+1LinkFailUsagePct": usage_pct("nodelinkfail", capacity),
"Node+1LinkFailUsageGbps": usage_gbps("nodelinkfail"),
"Node+1LinkFailUsageExceed": exceed("nodelinkfail", fail_thr, capacity),
"Node+1LinkFailUsageTime": scenarioTimestamp("nodelinkfail"),
"MustUpgrade": mustUpgrade,
"ShouldUpgrade": shouldUpgrade,
"Node+1LinkFailUsageTime": scenario_timestamp("nodelinkfail"),
"MustUpgrade": must_upgrade,
"ShouldUpgrade": should_upgrade,
})
df_raw = pd.DataFrame(
......@@ -614,6 +576,7 @@ def build_raw_report(corelink_map, aggLinkUsage):
def store_raw_report(df_raw):
"""Store raw report data."""
directory = Path(RAW_REPORT_DIRECTORY)
directory.mkdir(parents=True, exist_ok=True) # Create directory if it doesn't exist
......@@ -625,21 +588,16 @@ def store_raw_report(df_raw):
df_raw.to_csv(filepath, sep=",", encoding="utf-8", date_format=ISO8601_FORMAT, header=True)
###############################################################################
# MAIN
###############################################################################
def main():
# Record and display the start time of the script
"""Run all what-if scenarios."""
start_time = datetime.now(UTC)
print("===============================================================")
print("Execution started at:", start_time.strftime(ISO8601_FORMAT))
msg = f"Execution started at: {start_time.strftime(ISO8601_FORMAT)}"
logger.info(msg)
# 1) query Kentik to build traffic demand matrices
print("Querying Kentik API for traffic demand matrix...")
logger.debug("Querying Kentik API for traffic demand matrix...")
kentik_json = kentik.fetch_kentik_traffic_matrix()
print("Processing traffic demand matrices from Kentik data...")
logger.debug("Processing traffic demand matrices from Kentik data...")
traffic_matrices = kentik.kentik_to_traffic_matrices(
kentik_json, nettopo.NODES
) # returns traffic_matrices[timestamp][ingress][egress] = traffic_rate_Mbps
......@@ -653,28 +611,31 @@ def main():
tasks = build_parallel_tasks(
graph, nettopo.NODES, nettopo.NODE_FAILOVER_RATIOS, corelink_map, srlg_map, traffic_matrices
)
print(
f"Number of traffic simulation tasks: {len(tasks)} across {len(traffic_matrices)} timeslots with {len(tasks) / len(traffic_matrices):.0f} scenarios each"
msg = (
f"Number of traffic simulation tasks: {len(tasks)} across {len(traffic_matrices)} timeslots with "
f"{len(tasks) / len(traffic_matrices):.0f} scenarios each."
)
logger.info(msg)
# 4) now run all tasks in a single global pool
print("Executing parallel traffic simulation tasks...")
logger.debug("Executing parallel traffic simulation tasks...")
with multiprocessing.Pool() as pool:
tasks_results = pool.map(compute_scenario_task, tasks)
# 5) aggregate link usage across all tasks, recording the worst-case (highest) load on each link
aggLinkUsage = link_usage_aggregator(corelink_map, tasks_results)
agg_link_usage = link_usage_aggregator(corelink_map, tasks_results)
# 6) create and store final report
raw_report = build_raw_report(corelink_map, aggLinkUsage)
raw_report = build_raw_report(corelink_map, agg_link_usage)
store_raw_report(raw_report)
# Display the end time, and total execution duration
end_time = datetime.now(UTC)
duration = end_time - start_time
print("Execution ended at:", end_time.strftime(ISO8601_FORMAT))
print("Total duration in seconds:", duration.total_seconds())
print("===============================================================")
msg = f"Execution ended at: {end_time.strftime(ISO8601_FORMAT)}"
logger.info(msg)
msg = f"Total duration in seconds: {duration.total_seconds()}"
logger.info(msg)
if __name__ == "__main__":
......
......@@ -25,6 +25,7 @@ enable_error_code = "ignore-without-code"
extend-exclude = [
"htmlcov",
"docs",
"capacity_planner/alembic"
]
target-version = "py312"
line-length = 120
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment