Skip to content
Snippets Groups Projects
Verified Commit fdcaf77b authored by Daniël Verlouw's avatar Daniël Verlouw Committed by Karel van Klink
Browse files

Initial commit

parent 24f7dc7b
No related branches found
No related tags found
No related merge requests found
KENTIK_API_URL=https://api.kentik.eu/api/next/v5/query/topxdata
KENTIK_API_EMAIL=<email>
KENTIK_API_TOKEN=<api key>
# run what-if analysis hourly
0 * * * * source <path>/bin/activate && python3 <path>/bin/whatifscenarios.py >> <path>/whatifscenario.log 2>&1
# daily consolidated report
45 0 * * * source <path>/bin/activate && python3 <path>/bin/capacityreport.py --daily >> <path>/whatifscenario.log 2>&1
# monthly consolidated report
50 0 1 * * source <path>/bin/activate && python3 <path>/bin/capacityreport.py --monthly >> <path>/whatifscenario.log 2>&1
\ No newline at end of file
import os
import requests
import json
# Extract Kentik API settings from environment variables
API_URL = os.environ["KENTIK_API_URL"]
API_EMAIL= os.environ["KENTIK_API_EMAIL"]
API_TOKEN= os.environ["KENTIK_API_TOKEN"]
# Kentik report period (in minutes)
KENTIK_REPORTING_PERIOD=60
# Kentik flow aggregation window (in minutes -- lower values improve accuracy at the expense of processing time)
KENTIK_FLOW_AGGR_WINDOW=5
# aggregate ingress (source) and egress (destination) on the Kentik 'Site'-level
INGRESS_DIMENSION="i_device_site_name"
EGRESS_DIMENSION ="i_ult_exit_site"
###############################################################################
# Fetch traffic matrix from the Kentik API
###############################################################################
def kentik_api_query(payload):
# Headers for authentication
headers = {
"Content-Type": "application/json",
"X-CH-Auth-Email": API_EMAIL,
"X-CH-Auth-API-Token": API_TOKEN
}
response = requests.post(API_URL, headers=headers, json=payload)
if response.status_code == 200:
return response.json()
else:
print(f"Error fetching data from Kentik API: {response.status_code} - {response.text}")
return None
# Example in Kentik dashboard: https://portal.kentik.eu/v4/core/explorer/5787b62a7e8ae1ef7d399e08cdea2800
def fetch_kentik_trafic_matrix():
# JSON query payload
payload = {
"version": 4,
"queries": [
{
"bucket": "Table",
"isOverlay": False,
"query": {
"all_devices": True,
"aggregateTypes": [
"max_in_bits_per_sec"
],
"depth": 350,
"topx": 350, # 350=max supported by Kentik
"device_name": [],
"fastData": "Auto",
"lookback_seconds": 60 * KENTIK_REPORTING_PERIOD,
# "starting_time": null,
# "ending_time": null,
"matrixBy": [],
"metric": [
"in_bytes"
],
"minsPolling": KENTIK_FLOW_AGGR_WINDOW,
"forceMinsPolling": True,
"outsort": "max_in_bits_per_sec",
"viz_type": "table",
"aggregates": [
{
"value": "max_in_bits_per_sec",
"column": "f_sum_in_bytes",
"fn": "max",
"label": "Bits/s Sampled at Ingress Max",
"unit": "in_bytes",
"parentUnit": "bytes",
"group": "Bits/s Sampled at Ingress",
"origLabel": "Max",
"sample_rate": 1,
"raw": True,
"name": "max_in_bits_per_sec"
}
],
"filters": {
"connector": "All",
"filterGroups": [
{
"connector": "All",
"filters": [
{
"filterField": EGRESS_DIMENSION,
"metric": "",
"aggregate": "",
"operator": "<>",
"filterValue": "", # Filter unassigned/unknown destinations
"rightFilterField": ""
}
],
"filterGroups": []
}
]
},
"dimension": [
INGRESS_DIMENSION,
EGRESS_DIMENSION
]
}
}
]
}
response = kentik_api_query(payload)
return response
###############################################################################
# Transform Kentik data to traffic matrices (one matrix per timestamp)
###############################################################################
def kentik_to_traffic_matrices(json_data, nodes):
"""
Convert the given JSON structure returned by Kentik into a dictionary
keyed by timestamp. For each timestamp, we store a nested dict of:
traffic_matrices[timestamp][ingress][egress] = traffic_rate_Mbps
"""
# We'll gather all flows in the JSON
data_entries = json_data["results"][0]["data"]
# This will be our main lookup: { ts -> {ingress -> { egress -> rate}} }
traffic_matrices = {}
for entry in data_entries:
ingress= entry[INGRESS_DIMENSION]
egress = entry[EGRESS_DIMENSION]
# skip unknown ingress and/or egress nodes
if ingress not in nodes:
continue
if egress not in nodes:
continue
# timeSeries -> in_bits_per_sec -> flow => list of [timestamp, in_bits_per_sec, ignored_interval]
flow_list = entry["timeSeries"]["in_bits_per_sec"]["flow"]
for (timestamp, bits_per_sec, _ignored_interval) in flow_list:
traffic_matrices.setdefault(timestamp, {})
traffic_matrices[timestamp].setdefault(ingress, {})
traffic_matrices[timestamp][ingress][egress] = int(bits_per_sec / 1_000_000) # convert bps to Mbps
return traffic_matrices
\ No newline at end of file
# (A) Define each core link:
# ( linkID, nodeA, nodeB, igp_metric, capacity, srlg_list, [normal_threshold], [failure_threshold] )
# where:
# linkID: network-wide unique numeric ID (e.g. 1001)
# nodeA, nodeB: core link endpoints
# igp_metric: IGP cost/distance
# capacity: full-duplex link capacity in Gbps
# srlg_list: list of Shared Risk Link Group (SRLG) names (or empty)
# normal_threshold: fraction for normal usage. If omitted default is used
# failure_threshold: fraction for usage under failure. If omitted default is used
CORELINKS = [
( 1, "AMS", "FRA", 2016, 800, []),
( 2, "AMS", "LON", 1428, 800, []),
( 3, "FRA", "GEN", 2478, 800, []),
( 4, "GEN", "PAR", 2421, 800, []),
( 5, "LON", "LON2", 180, 800, []),
( 6, "LON2", "PAR", 1866, 800, []),
( 7, "FRA", "PRA", 3315, 300, []),
( 8, "GEN", "MIL2", 4280, 300, []),
( 9, "GEN", "MAR", 2915, 400, []),
(10, "HAM", "POZ", 3435, 300, []),
(11, "MAR", "MIL2", 3850, 300, []),
(12, "MIL2", "VIE", 5400, 400, []),
(13, "POZ", "PRA", 3215, 300, []),
(14, "PRA", "VIE", 2255, 300, []),
(15, "AMS", "HAM", 3270, 300, []),
(16, "BRU", "PAR", 50000, 100, []),
(17, "AMS", "BRU", 50000, 100, []),
(18, "BIL", "PAR", 5600, 300, []),
(19, "BUD", "ZAG", 2370, 200, []),
(20, "COR", "DUB", 5500, 100, []),
(21, "DUB", "LON", 8850, 100, []),
(22, "MAD", "MAR", 6805, 300, []),
(23, "BUC", "SOF", 5680, 200, []),
(24, "BRA", "VIE", 50000, 100, ["BRA-VIE"]),
(25, "LJU", "MIL2", 3330, 300, []),
(26, "LJU", "ZAG", 985, 200, []),
(27, "BUC", "BUD", 11850, 200, []),
(28, "LIS", "MAD", 8970, 200, []),
(29, "BIL", "POR", 9250, 200, []),
(30, "LIS", "POR", 3660, 200, []),
(31, "BIL", "MAD", 4000, 200, []),
(32, "ATH2", "MIL2", 25840, 100, ["MIL2-PRE"]),
(33, "ATH2", "THE", 8200, 100, []),
(34, "SOF", "THE", 5800, 100, []),
# ("COP", "HAM", 480, 400, []),
# ("COP", "STO", 600, 400, []),
# ("HEL", "STO", 630, 400, []),
(35, "RIG", "TAR", 2900, 100, []),
(36, "KAU", "POZ", 10050, 100, []),
# ("BEL", "SOF", 444, 400, []),
# ("BEL", "ZAG", 528, 400, []),
(37, "ZAG", "SOF", 9720, 200, []),
(38, "BRA", "BUD", 50000, 100, ["BRA-BUD"]),
(39, "COR", "LON2", 7160, 100, ["COR-LON2"]),
# ("HEL", "TAR", 227, 400, []),
(40, "KAU", "RIG", 4500, 100, []),
# ("BRU", "LUX", 380, 400, []),
# ("FRA", "LUX", 312, 400, []),
# ("RIG", "STO", 400, 400, []),
# ("COP", "POZ", 750, 400, []),
(41, "COR", "PAR", 12549, 100, ["COR-LON2"]),
(42, "KIE", "POZ", 50000, 100, []),
(43, "CHI", "BUC", 50000, 40, []),
(44, "CHI", "KIE", 10000, 100, []),
(45, "HAM", "TAR", 12490, 100, []),
(46, "BUD", "VIE", 1725, 200, ["BRA-BUD", "BRA-VIE"]),
(47, "MIL2", "THE", 29200, 100, ["MIL2-PRE"]),
(48, "AMS", "LON", 50000, 100, []), # backup link via SURF
(49, "BUC", "FRA", 50000, 100, []), # backup link via TTI
]
# (B) List of nodes (must match the node names in CORELINKS and node/site names retrieved from Kentik)
# to be added: "BEL", "COP","HEL","LUX","STO"
NODES = [
"AMS","ATH2","BIL","BRA","BRU","BUC","BUD","CHI","COR","DUB","FRA",
"GEN","HAM","KAU","KIE","LIS","LJU","LON","LON2","MAD","MAR","MIL2",
"PAR","POR","POZ","PRA","RIG","SOF","TAR","THE","VIE","ZAG"
]
# Node failover ratio map for single-node fails
NODE_FAILOVER_RATIOS = {
"AMS": {"LON": 0.6, "FRA": 0.4 },
"ATH": {"THE": 0.5, "MAR": 0.5 },
# "BEL": {"ZAG": 1.0 },
"BIL": {"MAD": 1.0 },
"BRA": {"VIE": 1.0 },
"BRU": {"AMS": 1.0 },
"BUC": {"VIE": 0.5, "SOF": 0.5 },
"BUD": {"ZAG": 1.0 },
# "COP": {"HAM": 0.5, "STO": 0.5 },
"COR": {"DUB": 1.0 },
"DUB": {"COR": 1.0 },
"FRA": {"HAM": 0.4, "AMS": 0.4, "LON": 0.2 },
"GEN": {"PAR": 0.6, "MIL2": 0.4 },
"HAM": {"FRA": 0.5, "POZ": 0.2, "LON": 0.3 },
# "HEL": {"TAR": 0.3, "HAM": 0.7 },
"KAU": {"RIG": 1.0 },
"LIS": {"POR": 1.0 },
"LJU": {"ZAG": 1.0 },
"LON": {"AMS": 0.4, "HAM": 0.2, "FRA": 0.4},
"LON2": {"LON": 1.0 },
# "LUX": {"BRU": 1.0 },
"MAD": {"BIL": 1.0 },
"MAR": {"MIL2": 0.6, "ATH": 0.4 },
"MIL2": {"GEN": 0.3, "MAR": 0.3, "VIE": 0.3 },
"PAR": {"GEN": 1.0 },
"POR": {"LIS": 1.0 },
"POZ": {"HAM": 1.0 },
"PRA": {"VIE": 1.0 },
"RIG": {"KAU": 1.0 },
"SOF": {"THE": 0.5, "BUC": 0.5 },
# "STO": {"COP": 0.5, "HEL": 0.5 },
"TAR": {"RIG": 1.0 },
"THE": {"ATH": 0.5, "SOF": 0.5 },
"VIE": {"MIL2": 0.6, "PRA": 0.2, "BUC": 0.2 },
"ZAG": {"BUD": 0.5, "LJU": 0.5 },
}
import os
import re
import argparse
import sys
import pandas as pd # https://pandas.pydata.org
from datetime import datetime, timezone, timedelta
from pathlib import Path
import numpy as np
###############################################################################
# INPUT DATA SECTION
###############################################################################
# make sure this matches with the What-If Scenario runner script
RAW_REPORT_DIRECTORY="/Users/daniel.verlouw/Desktop/rawreports"
RAW_REPORT_FILE_PREFIX="raw_capacityreport_"
RAW_REPORT_FILE_SUFFIX="csv"
CONSOLIDATED_REPORT_DIRECTORY="/Users/daniel.verlouw/Desktop/consolidatedreports"
CONSOLIDATED_REPORT_FILE_PREFIX="consolidated_capacityreport_"
CONSOLIDATED_REPORT_FILE_SUFFIX_RAW="csv"
CONSOLIDATED_REPORT_FILE_SUFFIX_HUMAN="txt"
# Scenarios triggering a 'MUST UPGRADE' if threshold is exceeded
MUST_UPGRADE_SCENARIOS=["normal","onelinkfail","onenodefail"]
# Scenarios triggering a 'SHOULD UPGRADE' if threshold is exceeded
SHOULD_UPGRADE_SCENARIOS=["twolinkfail","nodelinkfail"]
# ISO 8601 basic timestamp format (YYYYMMDDTHHMMZ)
ISO8601_FORMAT = '%Y%m%dT%H%MZ'
ISO8601_REGEXP = r'\d{4}\d{2}\d{2}T\d{2}\d{2}Z'
###############################################################################
# RAW CONSOLIDATED REPORT
###############################################################################
# --- Helper function to get the row of the max usage for a given column, handling empty/missing columns ---
def get_max_usage_row(group, usage_col):
"""
Returns a single row (as a Series) within `group` that has the maximum value in `usage_col`.
If `usage_col` does not exist or is entirely NaN, returns None.
"""
# If the column doesn't exist or has all null values, return None
if usage_col not in group.columns or group[usage_col].dropna().empty:
return None
max_val = group[usage_col].max()
# Filter to rows where usage_col equals the maximum value.
max_rows = group[group[usage_col] == max_val]
# Return only the first row among those with the maximum value.
return max_rows.iloc[0]
def extract_usage_details(group):
"""
For a single group of rows (all links with the same ID), find the row with the max usage for each usage field (Gbps)
and extract the relevant columns.
Booleans are set to True if at least one row in the group is True.
"""
# We'll create a dict to hold the final data for this ID.
out = {}
# Basic info columns (these are assumed to be consistent within the group)
first_row = group.iloc[0]
out['ID'] = first_row.get('ID', None)
out['NodeA'] = first_row.get('NodeA', None)
out['NodeB'] = first_row.get('NodeB', None)
out['CapacityGbps'] = first_row.get('CapacityGbps', None)
out['ConfNormalThrPct'] = first_row.get('ConfNormalThrPct', None)
out['ConfNormalThrGbps'] = first_row.get('ConfNormalThrGbps', None)
out['ConfFailThrPct'] = first_row.get('ConfFailThrPct', None)
out['ConfFailThrGbps'] = first_row.get('ConfFailThrGbps', None)
# Normal usage
normal_row = get_max_usage_row(group, 'NormalUsageGbps')
out['NormalEnabled'] = bool(group['NormalEnabled'].any()) if 'NormalEnabled' in group.columns else False
out['NormalUsagePct'] = normal_row.get('NormalUsagePct', np.nan) if normal_row is not None else np.nan
out['NormalUsageGbps'] = normal_row.get('NormalUsageGbps', np.nan) if normal_row is not None else np.nan
out['NormalUsageExceed'] = bool(group['NormalUsageExceed'].any()) if 'NormalUsageExceed' in group.columns else False
out['NormalDateTime'] = normal_row.get('NormalDateTime', pd.NaT) if normal_row is not None else pd.NaT
# 1 Link Fail usage
one_link_row = get_max_usage_row(group, '1LinkFailUsageGbps')
out['1LinkFailEnabled'] = bool(group['1LinkFailEnabled'].any()) if '1LinkFailEnabled' in group.columns else False
out['1LinkFailScenario'] = one_link_row.get('1LinkFailScenario', None) if one_link_row is not None else None
out['1LinkFailUsagePct'] = one_link_row.get('1LinkFailUsagePct', np.nan) if one_link_row is not None else np.nan
out['1LinkFailUsageGbps'] = one_link_row.get('1LinkFailUsageGbps', np.nan) if one_link_row is not None else np.nan
out['1LinkFailUsageExceed'] = bool(group['1LinkFailUsageExceed'].any()) if '1LinkFailUsageExceed' in group.columns else False
out['1LinkFailUsageTime'] = one_link_row.get('1LinkFailUsageTime', pd.NaT) if one_link_row is not None else pd.NaT
# 2 Link Fail usage
two_link_row = get_max_usage_row(group, '2LinkFailUsageGbps')
out['2LinkFailEnabled'] = bool(group['2LinkFailEnabled'].any()) if '2LinkFailEnabled' in group.columns else False
out['2LinkFailScenario'] = two_link_row.get('2LinkFailScenario', None) if two_link_row is not None else None
out['2LinkFailUsagePct'] = two_link_row.get('2LinkFailUsagePct', np.nan) if two_link_row is not None else np.nan
out['2LinkFailUsageGbps'] = two_link_row.get('2LinkFailUsageGbps', np.nan) if two_link_row is not None else np.nan
out['2LinkFailUsageExceed'] = bool(group['2LinkFailUsageExceed'].any()) if '2LinkFailUsageExceed' in group.columns else False
out['2LinkFailUsageTime'] = two_link_row.get('2LinkFailUsageTime', pd.NaT) if two_link_row is not None else pd.NaT
# 1 Node Fail usage
one_node_row = get_max_usage_row(group, '1NodeFailUsageGbps')
out['1NodeFailEnabled'] = bool(group['1NodeFailEnabled'].any()) if '1NodeFailEnabled' in group.columns else False
out['1NodeFailScenario'] = one_node_row.get('1NodeFailScenario', None) if one_node_row is not None else None
out['1NodeFailUsagePct'] = one_node_row.get('1NodeFailUsagePct', np.nan) if one_node_row is not None else np.nan
out['1NodeFailUsageGbps'] = one_node_row.get('1NodeFailUsageGbps', np.nan) if one_node_row is not None else np.nan
out['1NodeFailUsageExceed'] = bool(group['1NodeFailUsageExceed'].any()) if '1NodeFailUsageExceed' in group.columns else False
out['1NodeFailUsageTime'] = one_node_row.get('1NodeFailUsageTime', pd.NaT) if one_node_row is not None else pd.NaT
# Node+1 Link Fail usage
node_plus_link_row = get_max_usage_row(group, 'Node+1LinkFailUsageGbps')
out['Node+1LinkFailEnabled'] = bool(group['Node+1LinkFailEnabled'].any()) if 'Node+1LinkFailEnabled' in group.columns else False
out['Node+1LinkFailScenario'] = node_plus_link_row.get('Node+1LinkFailScenario', None) if node_plus_link_row is not None else None
out['Node+1LinkFailUsagePct'] = node_plus_link_row.get('Node+1LinkFailUsagePct', np.nan) if node_plus_link_row is not None else np.nan
out['Node+1LinkFailUsageGbps'] = node_plus_link_row.get('Node+1LinkFailUsageGbps', np.nan) if node_plus_link_row is not None else np.nan
out['Node+1LinkFailUsageExceed'] = bool(group['Node+1LinkFailUsageExceed'].any()) if 'Node+1LinkFailUsageExceed' in group.columns else False
out['Node+1LinkFailUsageTime'] = node_plus_link_row.get('Node+1LinkFailUsageTime', pd.NaT) if node_plus_link_row is not None else pd.NaT
# Finally, consolidated upgrade flags
out['MustUpgrade'] = bool(group['MustUpgrade'].any()) if 'MustUpgrade' in group.columns else False
out['ShouldUpgrade'] = bool(group['ShouldUpgrade'].any()) if 'ShouldUpgrade' in group.columns else False
return pd.Series(out)
###############################################################################
# HUMAN READABLE CONSOLIDATED REPORT
###############################################################################
def build_human_report(df_raw):
df_human= df_raw.copy()
# Helper formatting functions
def mark_upgrade(scenario, exceed):
if scenario in MUST_UPGRADE_SCENARIOS and exceed:
return f" (**)"
elif scenario in SHOULD_UPGRADE_SCENARIOS and exceed:
return f" (*)"
else:
return ""
def normal_thr(row):
return f"{row['ConfNormalThrPct']*100:.0f}% / {row['ConfNormalThrGbps']:.0f}G"
def fail_thr(row):
return f"{row['ConfFailThrPct']*100:.0f}% / {row['ConfFailThrGbps']:.0f}G"
def normal_usage(row):
return f"{row['NormalUsagePct']*100:.0f}% / {row['NormalUsageGbps']:.1f}G" + mark_upgrade("normal", row['NormalUsageExceed'])
def onelinkfail_usage(row):
return f"{row['1LinkFailUsagePct']*100:.0f}% / {row['1LinkFailUsageGbps']:.1f}G" + mark_upgrade("onelinkfail", row['1LinkFailUsageExceed'])
def twolinkfail_usage(row):
return f"{row['2LinkFailUsagePct']*100:.0f}% / {row['2LinkFailUsageGbps']:.1f}G" + mark_upgrade("twolinkfail", row['2LinkFailUsageExceed'])
def onenodefail_usage(row):
return f"{row['1NodeFailUsagePct']*100:.0f}% / {row['1NodeFailUsageGbps']:.1f}G" + mark_upgrade("onenodefail", row['1NodeFailUsageExceed'])
def nodelinkfail_usage(row):
return f"{row['Node+1LinkFailUsagePct']*100:.0f}% / {row['Node+1LinkFailUsageGbps']:.1f}G" + mark_upgrade("nodelinkfail", row['Node+1LinkFailUsageExceed'])
def upgrade(row):
return "MUST (**)" if row['MustUpgrade'] else ("SHOULD (*)" if row['ShouldUpgrade'] else "NO")
df_human['ConfNormalThr'] = df_human.apply(normal_thr, axis=1)
df_human['ConfFailThr'] = df_human.apply(fail_thr, axis=1)
df_human['NormalUsage'] = df_human.apply(normal_usage, axis=1)
df_human['1LinkFailUsage'] = df_human.apply(onelinkfail_usage, axis=1)
df_human['2LinkFailUsage'] = df_human.apply(twolinkfail_usage, axis=1)
df_human['1NodeFailUsage'] = df_human.apply(onenodefail_usage, axis=1)
df_human['Node+1LinkFailUsage'] = df_human.apply(nodelinkfail_usage, axis=1)
df_human['Upgrade'] = df_human.apply(upgrade, axis=1)
# Drop unused columns
df_human.drop(['ConfNormalThrPct','ConfNormalThrGbps'],axis=1, inplace=True)
df_human.drop(['ConfFailThrPct','ConfFailThrGbps'],axis=1, inplace=True)
df_human.drop(['NormalUsagePct','NormalUsageGbps','NormalUsageExceed'],axis=1, inplace=True)
df_human.drop(['1LinkFailUsagePct','1LinkFailUsageGbps','1LinkFailUsageExceed','1LinkFailUsageTime'],axis=1, inplace=True)
df_human.drop(['2LinkFailUsagePct','2LinkFailUsageGbps','2LinkFailUsageExceed','2LinkFailUsageTime'],axis=1, inplace=True)
df_human.drop(['1NodeFailUsagePct','1NodeFailUsageGbps','1NodeFailUsageExceed','1NodeFailUsageTime'],axis=1, inplace=True)
df_human.drop(['Node+1LinkFailUsagePct','Node+1LinkFailUsageGbps','Node+1LinkFailUsageExceed','Node+1LinkFailUsageTime'],axis=1, inplace=True)
df_human.drop(['MustUpgrade','ShouldUpgrade'],axis=1, inplace=True)
# Replace NaN and NaT values with "N/A"
df_human['NormalDateTime'] = df_human['NormalDateTime'].astype(object)
df_human.fillna("N/A", inplace=True)
return df_human[['NodeA', 'NodeB', 'CapacityGbps', 'ConfNormalThr', 'ConfFailThr',
'NormalUsage', 'NormalDateTime',
'1LinkFailScenario', '1LinkFailUsage',
'2LinkFailScenario', '2LinkFailUsage',
'1NodeFailScenario', '1NodeFailUsage',
'Node+1LinkFailScenario', 'Node+1LinkFailUsage',
'Upgrade']]
###############################################################################
# FILE FUNCTIONS
###############################################################################
def find_files_by_timeframe(directory, prefix, suffix, start_datetime, end_datetime):
# List all raw reports in directory
all_raw_reports = [
file for file in os.listdir(directory)
if os.path.isfile(os.path.join(directory, file))
and file.startswith(prefix)
and file.endswith(suffix)
and re.search(ISO8601_REGEXP, file)
]
# Filter to files that match the timestamp pattern within the specified datetime range
matching_files = []
for file in all_raw_reports:
match = re.search(ISO8601_REGEXP, file)
file_date = datetime.strptime(match.group(), ISO8601_FORMAT).replace(tzinfo=timezone.utc)
if start_datetime <= file_date <= end_datetime:
matching_files.append(os.path.join(directory, file))
return matching_files
def store_consolidated(df_consolidated, directory, prefix, suffix):
path = Path(directory)
path.mkdir(parents=True, exist_ok=True) # Create directory if it doesn't exist
# Create a ISO8601 basic format UTC timestamped filename
timestamp = datetime.now(timezone.utc).strftime(ISO8601_FORMAT)
filename = f'{prefix}{timestamp}.{suffix}'
if suffix == "csv":
df_consolidated.to_csv(os.path.join(path, filename), sep=',', encoding='utf-8', date_format=ISO8601_FORMAT, header=True)
elif suffix == "txt":
markdown = df_consolidated.to_markdown(headers='keys', tablefmt='psql')
# Write the markdown string to a file
with open(os.path.join(path, filename), "w") as file:
file.write(markdown)
###############################################################################
# MAIN
###############################################################################
def main():
# Parse commandline arguments
parser = argparse.ArgumentParser(description='Script usage:')
parser.add_argument('--daily', action='store_true', help='Create daily report (past day)')
parser.add_argument('--weekly', action='store_true', help='Create weekly report (past week)')
parser.add_argument('--monthly', action='store_true', help='Create monthly report (past month)')
parser.add_argument('--quarterly', action='store_true', help='Create quarterly report (past quarter)')
parser.add_argument('--yearly', action='store_true', help='Create yearly report (past year)')
if len(sys.argv) == 1:
# No arguments were provided; print help message.
parser.print_help()
sys.exit(1)
args = parser.parse_args()
today = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0)
first_day_of_current_month = today.replace(day=1)
if args.daily:
start_datetime= today - timedelta(days=1)
end_datetime = today
elif args.weekly:
start_datetime= today - timedelta(weeks=1)
end_datetime = today
elif args.monthly:
# First day of last month
start_datetime= (first_day_of_current_month - timedelta(days=1)).replace(day=1)
# First day of current month
end_datetime = first_day_of_current_month
elif args.quarterly:
# Approximates the quarter as 90 days before the start of the current month.
start_datetime= (first_day_of_current_month - timedelta(days=1)).replace(day=1) - timedelta(days=(first_day_of_current_month.month - 1) % 3 * 30)
end_datetime = (first_day_of_current_month - timedelta(days=1)).replace(day=1)
elif args.yearly:
# First day of the previous year
start_datetime= today.replace(year=today.year - 1, month=1, day=1)
end_datetime = today.replace(year=today.year, month=1, day=1)
matching_files= find_files_by_timeframe(RAW_REPORT_DIRECTORY, RAW_REPORT_FILE_PREFIX, RAW_REPORT_FILE_SUFFIX, start_datetime, end_datetime)
if len(matching_files) > 0:
print(f"Generating consolidated report for {len(matching_files)} raw reports for timeframe {start_datetime} through {end_datetime}")
# List of columns that should be parsed as dates from CSV
date_columns = ['NormalDateTime', '1LinkFailUsageTime', '2LinkFailUsageTime', '1NodeFailUsageTime', 'Node+1LinkFailUsageTime']
# Read and concat CSVs
dataframes= [pd.read_csv(file, sep=',', encoding='utf-8', parse_dates=date_columns, date_format=ISO8601_FORMAT, float_precision='round_trip') for file in matching_files]
concat_df = pd.concat(dataframes)
# Walk over the results for each link and extract and store the highest usage for each scenario
results = []
for id_val, group in concat_df.groupby('ID'):
details = extract_usage_details(group)
# Overwrite ID with the group key to be sure
details['ID'] = id_val
results.append(details)
consolidated_raw = pd.DataFrame(results)
consolidated_raw.set_index("ID", inplace=True)
store_consolidated(consolidated_raw, CONSOLIDATED_REPORT_DIRECTORY, CONSOLIDATED_REPORT_FILE_PREFIX, CONSOLIDATED_REPORT_FILE_SUFFIX_RAW)
consolidated_human= build_human_report(consolidated_raw)
store_consolidated(consolidated_human, CONSOLIDATED_REPORT_DIRECTORY, CONSOLIDATED_REPORT_FILE_PREFIX, CONSOLIDATED_REPORT_FILE_SUFFIX_HUMAN)
else:
print(f"No raw files found for timeframe {start_datetime} through {end_datetime}")
if __name__=="__main__":
main()
import math
import itertools
import networkx as nx # https://networkx.org
import pandas as pd # https://pandas.pydata.org
import multiprocessing
from datetime import datetime, timezone
from pathlib import Path
import KentikTrafficMatrix as ktm
import NetworkTopology as nettopo
###############################################################################
# 1) INPUT DATA SECTION
###############################################################################
# If normal threshold not specified on a link, default to 0.40 (allowing upto 40% link utilization in normal non-failure scenario)
DEFAULT_NORMAL_THRESHOLD = 0.40
# If failure threshold not specified on a link, default to 0.80 (allowing upto 80% link utilization in worst-case failure scenario)
DEFAULT_FAILURE_THRESHOLD= 0.80
ENABLED_SCENARIOS=["normal","onelinkfail","twolinkfail","onenodefail","nodelinkfail"]
# Scenarios triggering a 'MUST UPGRADE' if threshold is exceeded
MUST_UPGRADE_SCENARIOS=["normal","onelinkfail","onenodefail"]
# Scenarios triggering a 'SHOULD UPGRADE' if threshold is exceeded
SHOULD_UPGRADE_SCENARIOS=["twolinkfail","nodelinkfail"]
RAW_REPORT_DIRECTORY="/Users/daniel.verlouw/Desktop/rawreports"
RAW_REPORT_FILE_PREFIX="raw_capacityreport_"
RAW_REPORT_FILE_SUFFIX="csv"
# ISO 8601 basic timestamp format (YYYYMMDDTHHMMZ)
ISO8601_FORMAT = '%Y%m%dT%H%MZ'
ISO8601_REGEXP = r'\d{4}\d{2}\d{2}T\d{2}\d{2}Z'
###############################################################################
# 2) Build a local NetworkX multi-directional graph for scenario computations
###############################################################################
def build_graph(nodes, links):
# MultiDiGraph supports multiple edges (ECMP links) between two nodes
# and is directional, allowing modelling of Full-Duplex links and taking
# traffic direction into account
graph= nx.MultiDiGraph()
graph.add_nodes_from(nodes)
for link in links:
# ( linkID, nodeA, nodeB, igp_metric, capacity, srlg_list, [norm_thr], [fail_thr] )
lID = link[0]
nodeA= link[1]
nodeB= link[2]
igp = link[3]
# Add Full-Duplex edges in both directions with linkID
# assumes the IGP metric is configured symmetrically
graph.add_edge(nodeA,nodeB,key=lID, igpmetric=igp)
graph.add_edge(nodeB,nodeA,key=lID, igpmetric=igp)
return graph
###############################################################################
# 3) Helper functions to ease lookups for core links and SRLGs
###############################################################################
def build_corelink_map(links):
# Creating a lookup dictionary for fast lookup by linkID
corelink_dict = {link[0]: link for link in links}
return corelink_dict
def build_srlg_map(links):
# Maps SRLG name to one or more core links (linkIDs)
srlg_map={}
for link in links:
# ( linkID, nodeA, nodeB, igp_metric, capacity, srlg_list, [norm_thr], [fail_thr] )
lID = link[0]
nodeA= link[1]
nodeB= link[2]
igp = link[3]
if len(link)>5 and isinstance(link[5], list):
for srlg in link[5]:
srlg_map.setdefault(srlg, set())
srlg_map[srlg].add(lID)
return srlg_map
###############################################################################
# 4) Node failover logic - redistribute traffic to other core nodes
###############################################################################
def redistribute_local_traffic(t_xx, ratio_map):
"""
We treat local traffic T[X,X] as two endpoints at X.
Each endpoint re-homes to neighbor i or j with probability ratio_map[i], ratio_map[j].
=> fraction = ratio_map[i]*ratio_map[j]*t_xx => T[i,j].
If i=j, it's T[i,i] (local on neighbor i).
Returns local_matrix[i][j] = fraction (Mbps).
"""
local_matrix={}
nbrs= list(ratio_map.keys())
for i in nbrs:
local_matrix.setdefault(i,{})
for j in nbrs:
local_matrix[i][j]= ratio_map[i]* ratio_map[j]* t_xx
return local_matrix
def build_traffic_with_node_fail(baseTraffic, nodefail_ratios, failingNode):
"""
Return scenario-specific traffic matrix in Mbps,
re-homing T[failingNode,*], T[*,failingNode], T[failingNode,failingNode].
"""
scenario_traffic={}
ratio_map= nodefail_ratios.get(failingNode,{})
for s in baseTraffic:
scenario_traffic[s]={}
for d in baseTraffic[s]:
val= baseTraffic[s][d]
if val<=0:
scenario_traffic[s][d]=0
continue
# traffic is local - T[failingNode,failingNode]
if s==failingNode and d==failingNode:
scenario_traffic[s][d]=0
mat= redistribute_local_traffic(val, ratio_map)
for i in mat:
for j in mat[i]:
portion= mat[i][j]
if portion>0:
scenario_traffic.setdefault(i,{})
scenario_traffic[i][j]= scenario_traffic[i].get(j,0)+ portion
# T[failingNode,*]
elif s==failingNode:
scenario_traffic[s][d]=0
for nbr,r in ratio_map.items():
scenario_traffic.setdefault(nbr,{})
scenario_traffic[nbr][d]= scenario_traffic[nbr].get(d,0)+ val*r
# T[*,failingNode]
elif d==failingNode:
scenario_traffic[s][d]=0
for nbr,r in ratio_map.items():
scenario_traffic[s][nbr]= scenario_traffic[s].get(nbr,0)+ val*r
# Unaffected
else:
scenario_traffic[s][d]= val
return scenario_traffic
###############################################################################
# 5) Remove SRLG or corelink from graph
###############################################################################
def remove_corelink(graph, removeLink):
edges_rm=[]
for (u,v,lID) in graph.edges(keys=True):
if lID==removeLink:
edges_rm.append((u,v,lID))
for e in edges_rm:
graph.remove_edge(*e)
def remove_corelink_or_srlg(graph, failedElement, srlg_map):
etype, data= failedElement
if etype=="LINK":
linkID= data
remove_corelink(graph, linkID)
elif etype=="SRLG":
if data in srlg_map:
for linkID in srlg_map[data]:
remove_corelink(graph, linkID)
###############################################################################
# 6) Compute scenario load in MultiDiGraph
###############################################################################
def compute_scenario_load(graph, scenario_traffic):
# initialize empty usage map
usageMap={}
for(u,v,lID) in graph.edges(keys=True):
usageMap[(u,v,lID)] = 0.0
# walk over [ingress][egress] traffic demands
for s in scenario_traffic:
for d in scenario_traffic[s]:
demand_mbps= scenario_traffic[s][d]
if demand_mbps<=0 or s==d or (s not in graph) or (d not in graph):
continue
demand_gbps= demand_mbps/1000.0 # convert Mbps to Gbps
# Run Dijkstra algorithm
try:
best_dist= nx.dijkstra_path_length(graph, s, d, weight='igpmetric')
except nx.NetworkXNoPath:
continue
# Enumerate all shortest paths from (s)ource to (d)estination
all_paths= list(nx.all_shortest_paths(graph, s, d, weight='igpmetric'))
valid=[]
for path in all_paths:
dist_sum=0
for i in range(len(path)-1):
p= path[i]
q= path[i+1]
# pick minimal igp among parallel edges p->q
best_igp= math.inf
for k2 in graph[p][q]:
igp= graph[p][q][k2]['igpmetric']
if igp<best_igp:
best_igp= igp
dist_sum+= best_igp
if math.isclose(dist_sum,best_dist):
valid.append(path)
if not valid:
continue
# Now equally divide traffic demand across all shortest paths (ECMP)
share= demand_gbps/ len(valid)
for path in valid:
for i in range(len(path)-1):
p= path[i]
q= path[i+1]
# pick minimal link p->q
best_igp= math.inf
best_k= None
for k2 in graph[p][q]:
igp= graph[p][q][k2]['igpmetric']
if igp<best_igp:
best_igp= igp
best_k= k2
usageMap[(p,q,best_k)]+= share
return usageMap
###############################################################################
# 7) Individual Normal/Failure Scenario Runners
###############################################################################
def run_normal_scenario(graph, trafficMatrix):
"""
No failures => just compute load with the base traffic.
"""
return compute_scenario_load(graph, trafficMatrix)
def run_onenode_failure(graph, trafficMatrix, nodefail_ratios, nodeFail):
"""
Remove nodeFail from graph, shift its traffic T[nodeFail,*], T[*,nodeFail], T[nodeFail,nodeFail].
"""
if nodeFail in graph:
graph.remove_node(nodeFail)
scenario_traffic= build_traffic_with_node_fail(trafficMatrix, nodefail_ratios, nodeFail)
return compute_scenario_load(graph, scenario_traffic)
def run_onelink_failure(graph, trafficMatrix, srlg_map, linkFail):
"""
linkFail = ("LINK",(u,v)) or ("SRLG",{(x,y),...}).
Remove link from a copy of graph, then compute load with base traffic.
"""
remove_corelink_or_srlg(graph, linkFail, srlg_map)
return compute_scenario_load(graph, trafficMatrix)
def run_twolink_failure(graph, trafficMatrix, srlg_map, linkFail1, linkFail2):
"""
linkFail1, linkFail2 each = ("LINK",(u,v)) or ("SRLG",{(x,y),...}).
Remove them from a copy of graph, then compute load with base traffic.
"""
remove_corelink_or_srlg(graph, linkFail1, srlg_map)
remove_corelink_or_srlg(graph, linkFail2, srlg_map)
return compute_scenario_load(graph, trafficMatrix)
def run_nodelink_failure(graph, trafficMatrix, srlg_map, nodefail_ratios, nodeFail, linkFail):
# first remove node and shift traffic
if nodeFail in graph:
graph.remove_node(nodeFail)
scenario_traffic= build_traffic_with_node_fail(trafficMatrix, nodefail_ratios, nodeFail)
# then remove link
remove_corelink_or_srlg(graph, linkFail, srlg_map)
return compute_scenario_load(graph, scenario_traffic)
###############################################################################
# 8) Single scenario task runner
###############################################################################
def expand_link_name(corelink_map, linkFail):
"""Helper function--return a string describing one scenario element (LINK linkID or SRLG name)."""
etype,data= linkFail
if etype=="LINK":
# get corelink details by ID
td= corelink_map.get(data)
nodeA= td[1]
nodeB= td[2]
return f"LINK_{data}/{nodeA}-{nodeB}"
elif etype=="SRLG":
return f"SRLG_{data}"
else:
return str(linkFail)
def compute_scenario_task(args):
"""
Each scenario runs as a single task
"""
(graph_orig, corelink_map, srlg_map, nodefail_ratios, ts, trafficMatrix, scenarioType, scenarioData)= args
# 1) first, create a copy of graph so in-memory copy of original is not modified
graph=graph_orig.copy()
# 2) run selected normal or failure scenario
usageMap={}
scenarioName=""
if scenarioType=="normal":
usageMap= run_normal_scenario(graph, trafficMatrix)
scenarioName="normal"
elif scenarioType=="onenodefail":
nodeFail= scenarioData
usageMap= run_onenode_failure(graph, trafficMatrix, nodefail_ratios, nodeFail)
scenarioName= f"NODE_{nodeFail}"
elif scenarioType=="onelinkfail":
linkFail= scenarioData
usageMap= run_onelink_failure(graph, trafficMatrix, srlg_map, linkFail)
scenarioName= f"{expand_link_name(corelink_map, linkFail)}"
elif scenarioType=="twolinkfail":
(linkFail1,linkFail2)= scenarioData
usageMap= run_twolink_failure(graph, trafficMatrix, srlg_map, linkFail1, linkFail2)
scenarioName= f"{expand_link_name(corelink_map, linkFail1)} + {expand_link_name(corelink_map, linkFail2)}"
elif scenarioType=="nodelinkfail":
(nodeFail,linkFail)= scenarioData
usageMap= run_nodelink_failure(graph, trafficMatrix, srlg_map, nodefail_ratios, nodeFail, linkFail)
scenarioName= f"NODE_{nodeFail} + {expand_link_name(corelink_map, linkFail)}"
else:
usageMap={}
scenarioName="unknown"
return (ts, scenarioType, scenarioName, usageMap)
###############################################################################
# 9) Build list of tasks to run in parallel
###############################################################################
def build_parallel_tasks(graph, nodes, nodefail_ratios, corelink_map, srlg_map, traffic_matrices):
# enumerate link failure scenarios => link linkIDs + srlg
linkFailElements=[]
for link_id, _link_data in corelink_map.items():
linkFailElements.append(("LINK", link_id))
for srlg_name, _srlg_data in srlg_map.items():
linkFailElements.append(("SRLG", srlg_name))
tasks=[]
for ts in traffic_matrices.keys():
trafficMatrix= traffic_matrices[ts]
# add normal non-failure scenario
if "normal" in ENABLED_SCENARIOS:
tasks.append((graph, corelink_map, srlg_map, nodefail_ratios, ts, trafficMatrix, "normal", None))
# add single link fail scenarios
if "onelinkfail" in ENABLED_SCENARIOS:
for linkFail in linkFailElements:
tasks.append((graph, corelink_map, srlg_map, nodefail_ratios, ts, trafficMatrix, "onelinkfail", linkFail))
# add unique two link fail scenarios
if "twolinkfail" in ENABLED_SCENARIOS:
fe_twofail_combos= list(itertools.combinations(linkFailElements,2))
for linkFail1,linkFail2 in fe_twofail_combos:
tasks.append((graph, corelink_map, srlg_map, nodefail_ratios, ts, trafficMatrix, "twolinkfail", (linkFail1,linkFail2)))
for nodeFail in nodes:
# add node fail scenarios
if "onenodefail" in ENABLED_SCENARIOS:
tasks.append((graph, corelink_map, srlg_map, nodefail_ratios, ts, trafficMatrix, "onenodefail", nodeFail))
# add node + link fail scenarios
if "nodelinkfail" in ENABLED_SCENARIOS:
for linkFail in linkFailElements:
tasks.append((graph, corelink_map, srlg_map, nodefail_ratios, ts, trafficMatrix, "nodelinkfail", (nodeFail,linkFail)))
return tasks
###############################################################################
# 10) Report generator
###############################################################################
def link_usage_aggregator(corelink_map, results):
# initialize empty link usage aggregator
aggLinkUsage={}
for link_id, _link_data in corelink_map.items():
aggLinkUsage[link_id]= {}
for scenarioType in ENABLED_SCENARIOS:
aggLinkUsage[link_id][scenarioType] = (0.0,pd.NaT,None)
# unify results across all tasks - for each scenario, record worst-case (highest) load on each link
for (ts, scenarioType, scenarioName, usageMap) in results:
for (u,v,lID) in usageMap:
usage= usageMap[(u,v,lID)]
oldUsage, oldTs, oldSc= aggLinkUsage[lID][scenarioType]
if usage> oldUsage:
aggLinkUsage[lID][scenarioType]= (usage, ts, f"{scenarioName}")
# return aggLinkUsage[linkID][scenarioType] => (usage,ts,scenarioName)
return aggLinkUsage
def build_raw_report(corelink_map, aggLinkUsage):
final_report_raw=[]
# Helper print functions
def exceed(scenario, thr, capacity):
return aggLinkUsage[lID][scenario][0]>thr*capacity if scenario in ENABLED_SCENARIOS else pd.NA
def usagePct(scenario, capacity):
# (usage,ts,scenarioName)
return aggLinkUsage[lID][scenario][0]/capacity if scenario in ENABLED_SCENARIOS else pd.NA
def usageGbps(scenario):
return aggLinkUsage[lID][scenario][0] if scenario in ENABLED_SCENARIOS else pd.NA
def scenarioTimestamp(scenario):
return pd.to_datetime(aggLinkUsage[lID][scenario][1], unit='ms', errors='coerce', utc=True) if scenario in ENABLED_SCENARIOS else pd.NaT
def failScenarioName(scenario):
return aggLinkUsage[lID][scenario][2] if scenario in ENABLED_SCENARIOS else None
def scenarioEnabled(scenario):
return scenario in ENABLED_SCENARIOS
# Iterate over each core link
for link_id, link_data in corelink_map.items():
# ( linkID, nodeA, nodeB, igp_metric, capacity, srlg_list, [norm_thr], [fail_thr] )
lID = link_data[0]
nodeA = link_data[1]
nodeB = link_data[2]
capacity= link_data[4]
# parse thresholds if present, else use defaults
norm_thr= link_data[6] if len(link_data)>6 and isinstance(link_data[6], (float,int)) else DEFAULT_NORMAL_THRESHOLD
fail_thr= link_data[7] if len(link_data)>7 and isinstance(link_data[7], (float,int)) else DEFAULT_FAILURE_THRESHOLD
# threshold check
mustUpgrade=False
shouldUpgrade=False
for scenarioType in ENABLED_SCENARIOS:
(usage,ts,scenarioName) = aggLinkUsage[lID][scenarioType]
thr= 1
if scenarioType=="normal" and norm_thr>0:
thr=norm_thr
elif "fail" in scenarioType and fail_thr>0:
thr=fail_thr
if usage> thr*capacity and scenarioType in MUST_UPGRADE_SCENARIOS:
mustUpgrade=True
elif usage> thr*capacity and scenarioType in SHOULD_UPGRADE_SCENARIOS:
shouldUpgrade=True
# print one row per core link
final_report_raw.append({
"ID": lID,
"NodeA": nodeA,
"NodeB": nodeB,
"CapacityGbps": capacity,
"ConfNormalThrPct": norm_thr,
"ConfNormalThrGbps": norm_thr*capacity,
"ConfFailThrPct": fail_thr,
"ConfFailThrGbps": fail_thr*capacity,
"NormalEnabled": scenarioEnabled("normal"),
"NormalUsagePct": usagePct("normal", capacity),
"NormalUsageGbps": usageGbps("normal"),
"NormalUsageExceed": exceed("normal", norm_thr, capacity),
"NormalDateTime": scenarioTimestamp("normal"),
"1LinkFailEnabled": scenarioEnabled("onelinkfail"),
"1LinkFailScenario": failScenarioName("onelinkfail"),
"1LinkFailUsagePct": usagePct("onelinkfail", capacity),
"1LinkFailUsageGbps": usageGbps("onelinkfail"),
"1LinkFailUsageExceed": exceed("onelinkfail", fail_thr, capacity),
"1LinkFailUsageTime": scenarioTimestamp("onelinkfail"),
"2LinkFailEnabled": scenarioEnabled("twolinkfail"),
"2LinkFailScenario": failScenarioName("twolinkfail"),
"2LinkFailUsagePct": usagePct("twolinkfail", capacity),
"2LinkFailUsageGbps": usageGbps("twolinkfail"),
"2LinkFailUsageExceed": exceed("twolinkfail", fail_thr, capacity),
"2LinkFailUsageTime": scenarioTimestamp("twolinkfail"),
"1NodeFailEnabled": scenarioEnabled("onenodefail"),
"1NodeFailScenario": failScenarioName("onenodefail"),
"1NodeFailUsagePct": usagePct("onenodefail", capacity),
"1NodeFailUsageGbps": usageGbps("onenodefail"),
"1NodeFailUsageExceed": exceed("onenodefail", fail_thr, capacity),
"1NodeFailUsageTime": scenarioTimestamp("onenodefail"),
"Node+1LinkFailEnabled": scenarioEnabled("nodelinkfail"),
"Node+1LinkFailScenario": failScenarioName("nodelinkfail"),
"Node+1LinkFailUsagePct": usagePct("nodelinkfail", capacity),
"Node+1LinkFailUsageGbps": usageGbps("nodelinkfail"),
"Node+1LinkFailUsageExceed": exceed("nodelinkfail", fail_thr, capacity),
"Node+1LinkFailUsageTime": scenarioTimestamp("nodelinkfail"),
"MustUpgrade": mustUpgrade,
"ShouldUpgrade": shouldUpgrade
})
df_raw= pd.DataFrame(final_report_raw, columns=["ID","NodeA","NodeB","CapacityGbps",
"ConfNormalThrPct","ConfNormalThrGbps",
"ConfFailThrPct","ConfFailThrGbps",
"NormalEnabled","NormalUsagePct","NormalUsageGbps","NormalUsageExceed","NormalDateTime",
"1LinkFailEnabled","1LinkFailScenario","1LinkFailUsagePct","1LinkFailUsageGbps","1LinkFailUsageExceed","1LinkFailUsageTime",
"2LinkFailEnabled","2LinkFailScenario","2LinkFailUsagePct","2LinkFailUsageGbps","2LinkFailUsageExceed","2LinkFailUsageTime",
"1NodeFailEnabled","1NodeFailScenario","1NodeFailUsagePct","1NodeFailUsageGbps","1NodeFailUsageExceed","1NodeFailUsageTime",
"Node+1LinkFailEnabled","Node+1LinkFailScenario","Node+1LinkFailUsagePct","Node+1LinkFailUsageGbps","Node+1LinkFailUsageExceed","Node+1LinkFailUsageTime",
"MustUpgrade","ShouldUpgrade",])
df_raw.set_index("ID", inplace=True)
return df_raw
def store_raw_report(df_raw):
directory = Path(RAW_REPORT_DIRECTORY)
directory.mkdir(parents=True, exist_ok=True) # Create directory if it doesn't exist
# Create a ISO8601 basic format UTC timestamped filename
timestamp = datetime.now(timezone.utc).strftime(ISO8601_FORMAT)
filename = f'{RAW_REPORT_FILE_PREFIX}{timestamp}.{RAW_REPORT_FILE_SUFFIX}'
filepath = directory / filename
df_raw.to_csv(filepath, sep=',', encoding='utf-8', date_format=ISO8601_FORMAT, header=True)
###############################################################################
# MAIN
###############################################################################
def main():
# Record and display the start time of the script
start_time = datetime.now(timezone.utc)
print(f"===============================================================")
print("Execution started at:", start_time.strftime(ISO8601_FORMAT))
# 1) query Kentik to build traffic demand matrices
print(f"Querying Kentik API for traffic demand matrix...")
kentik_json = ktm.fetch_kentik_trafic_matrix()
print(f"Processing traffic demand matrices from Kentik data...")
traffic_matrices= ktm.kentik_to_traffic_matrices(kentik_json, nettopo.NODES) # returns traffic_matrices[timestamp][ingress][egress] = traffic_rate_Mbps
# 2) build graph and lookup data structures
graph= build_graph(nettopo.NODES, nettopo.CORELINKS)
corelink_map= build_corelink_map(nettopo.CORELINKS)
srlg_map= build_srlg_map(nettopo.CORELINKS)
# 3) define tasks to run in parallel
tasks= build_parallel_tasks(graph, nettopo.NODES, nettopo.NODE_FAILOVER_RATIOS, corelink_map, srlg_map, traffic_matrices)
print(f"Number of traffic simulation tasks: {len(tasks)} across {len(traffic_matrices)} timeslots with {len(tasks)/len(traffic_matrices):.0f} scenarios each")
# 4) now run all tasks in a single global pool
print(f"Executing parallel traffic simulation tasks...")
with multiprocessing.Pool() as pool:
tasks_results= pool.map(compute_scenario_task, tasks)
# 5) aggregate link usage across all tasks, recording the worst-case (highest) load on each link
aggLinkUsage= link_usage_aggregator(corelink_map, tasks_results)
# 6) create and store final report
raw_report= build_raw_report(corelink_map, aggLinkUsage)
store_raw_report(raw_report)
# Display the end time, and total execution duration
end_time = datetime.now(timezone.utc)
duration = end_time - start_time
print("Execution ended at:", end_time.strftime(ISO8601_FORMAT))
print("Total duration in seconds:", duration.total_seconds())
print(f"===============================================================")
if __name__=="__main__":
main()
certifi==2024.12.14
charset-normalizer==3.4.1
idna==3.10
networkx==3.4.2
numpy==2.2.2
pandas==2.2.3
python-dateutil==2.9.0.post0
pytz==2025.1
requests==2.32.3
setuptools==75.8.0
six==1.17.0
tabulate==0.9.0
tzdata==2025.1
urllib3==2.3.0
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment