Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • main
1 result

Target

Select target project
  • karel.vanklink/daniel
1 result
Select Git revision
  • main
1 result
Show changes
Commits on Source (4)
Showing
with 1657 additions and 1016 deletions
venv/
__pycache__/
*.egg-info
.coverage*
coverage.xml
.vscode
venv
oss-params.json
.mypy_cache
.pytest_cache
.ruff_cache
.tox
build/
.idea
.venv
.env
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: v0.1.15
hooks:
# Run the linter.
- id: ruff
args:
- --fix
- --preview
- --ignore=PLR0917,PLR0914
- --extend-exclude=test/*
# Changelog
## [0.1] - 2025-03-26
- Initial version
FROM python:3.12.7-alpine
WORKDIR /app
# Set environment variables for predictable Python behavior and UTF-8 encoding
ENV PYTHONUNBUFFERED=1 \
PYTHONDONTWRITEBYTECODE=1 \
LANG=C.UTF-8 \
LC_ALL=C.UTF-8
ARG ARTIFACT_VERSION
RUN apk add --no-cache gcc libc-dev libffi-dev && \
addgroup -S appgroup && adduser -S appuser -G appgroup -h /app
RUN pip install --no-cache-dir \
--pre \
--trusted-host 150.254.211.2 \
--extra-index-url https://150.254.211.2/artifactory/api/pypi/geant-swd-pypi/simple \
--target /app \
geant-capacity-planner==${ARTIFACT_VERSION}
# Copy the shell scripts and ensure scripts do not have Windows line endings and make them executable
COPY start-app.sh start-worker.sh start-scheduler.sh /app/
RUN sed -i 's/\r$//' start-app.sh start-worker.sh start-scheduler.sh && \
chmod +x start-app.sh start-worker.sh start-scheduler.sh
RUN chown -R appuser:appgroup /app
USER appuser
EXPOSE 8080
ENTRYPOINT ["/app/start-app.sh"]
MIT License
Copyright (c) 2025 GÉANT Vereniging
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
# (A) Define each core link:
# ( linkID, nodeA, nodeB, igp_metric, capacity, srlg_list, [normal_threshold], [failure_threshold] )
# where:
# linkID: network-wide unique numeric ID (e.g. 1001)
# nodeA, nodeB: core link endpoints
# igp_metric: IGP cost/distance
# capacity: full-duplex link capacity in Gbps
# srlg_list: list of Shared Risk Link Group (SRLG) names (or empty)
# normal_threshold: fraction for normal usage. If omitted default is used
# failure_threshold: fraction for usage under failure. If omitted default is used
CORELINKS = [
( 1, "AMS", "FRA", 2016, 800, []),
( 2, "AMS", "LON", 1428, 800, []),
( 3, "FRA", "GEN", 2478, 800, []),
( 4, "GEN", "PAR", 2421, 800, []),
( 5, "LON", "LON2", 180, 800, []),
( 6, "LON2", "PAR", 1866, 800, []),
( 7, "FRA", "PRA", 3315, 300, []),
( 8, "GEN", "MIL2", 4280, 300, []),
( 9, "GEN", "MAR", 2915, 400, []),
(10, "HAM", "POZ", 3435, 300, []),
(11, "MAR", "MIL2", 3850, 300, []),
(12, "MIL2", "VIE", 5400, 400, []),
(13, "POZ", "PRA", 3215, 300, []),
(14, "PRA", "VIE", 2255, 300, []),
(15, "AMS", "HAM", 3270, 300, []),
(16, "BRU", "PAR", 50000, 100, []),
(17, "AMS", "BRU", 50000, 100, []),
(18, "BIL", "PAR", 5600, 300, []),
(19, "BUD", "ZAG", 2370, 200, []),
(20, "COR", "DUB", 5500, 100, []),
(21, "DUB", "LON", 8850, 100, []),
(22, "MAD", "MAR", 6805, 300, []),
(23, "BUC", "SOF", 5680, 200, []),
(24, "BRA", "VIE", 50000, 100, ["BRA-VIE"]),
(25, "LJU", "MIL2", 3330, 300, []),
(26, "LJU", "ZAG", 985, 200, []),
(27, "BUC", "BUD", 11850, 200, []),
(28, "LIS", "MAD", 8970, 200, []),
(29, "BIL", "POR", 9250, 200, []),
(30, "LIS", "POR", 3660, 200, []),
(31, "BIL", "MAD", 4000, 200, []),
(32, "ATH2", "MIL2", 25840, 100, ["MIL2-PRE"]),
(33, "ATH2", "THE", 8200, 100, []),
(34, "SOF", "THE", 5800, 100, []),
# ("COP", "HAM", 480, 400, []),
# ("COP", "STO", 600, 400, []),
# ("HEL", "STO", 630, 400, []),
(35, "RIG", "TAR", 2900, 100, []),
(36, "KAU", "POZ", 10050, 100, []),
# ("BEL", "SOF", 444, 400, []),
# ("BEL", "ZAG", 528, 400, []),
(37, "ZAG", "SOF", 9720, 200, []),
(38, "BRA", "BUD", 50000, 100, ["BRA-BUD"]),
(39, "COR", "LON2", 7160, 100, ["COR-LON2"]),
# ("HEL", "TAR", 227, 400, []),
(40, "KAU", "RIG", 4500, 100, []),
# ("BRU", "LUX", 380, 400, []),
# ("FRA", "LUX", 312, 400, []),
# ("RIG", "STO", 400, 400, []),
# ("COP", "POZ", 750, 400, []),
(41, "COR", "PAR", 12549, 100, ["COR-LON2"]),
(42, "KIE", "POZ", 50000, 100, []),
(43, "CHI", "BUC", 50000, 40, []),
(44, "CHI", "KIE", 10000, 100, []),
(45, "HAM", "TAR", 12490, 100, []),
(46, "BUD", "VIE", 1725, 200, ["BRA-BUD", "BRA-VIE"]),
(47, "MIL2", "THE", 29200, 100, ["MIL2-PRE"]),
(48, "AMS", "LON", 50000, 100, []), # backup link via SURF
(49, "BUC", "FRA", 50000, 100, []), # backup link via TTI
]
# (B) List of nodes (must match the node names in CORELINKS and node/site names retrieved from Kentik)
# to be added: "BEL", "COP","HEL","LUX","STO"
NODES = [
"AMS","ATH2","BIL","BRA","BRU","BUC","BUD","CHI","COR","DUB","FRA",
"GEN","HAM","KAU","KIE","LIS","LJU","LON","LON2","MAD","MAR","MIL2",
"PAR","POR","POZ","PRA","RIG","SOF","TAR","THE","VIE","ZAG"
]
# Node failover ratio map for single-node fails
NODE_FAILOVER_RATIOS = {
"AMS": {"LON": 0.6, "FRA": 0.4 },
"ATH": {"THE": 0.5, "MAR": 0.5 },
# "BEL": {"ZAG": 1.0 },
"BIL": {"MAD": 1.0 },
"BRA": {"VIE": 1.0 },
"BRU": {"AMS": 1.0 },
"BUC": {"VIE": 0.5, "SOF": 0.5 },
"BUD": {"ZAG": 1.0 },
# "COP": {"HAM": 0.5, "STO": 0.5 },
"COR": {"DUB": 1.0 },
"DUB": {"COR": 1.0 },
"FRA": {"HAM": 0.4, "AMS": 0.4, "LON": 0.2 },
"GEN": {"PAR": 0.6, "MIL2": 0.4 },
"HAM": {"FRA": 0.5, "POZ": 0.2, "LON": 0.3 },
# "HEL": {"TAR": 0.3, "HAM": 0.7 },
"KAU": {"RIG": 1.0 },
"LIS": {"POR": 1.0 },
"LJU": {"ZAG": 1.0 },
"LON": {"AMS": 0.4, "HAM": 0.2, "FRA": 0.4},
"LON2": {"LON": 1.0 },
# "LUX": {"BRU": 1.0 },
"MAD": {"BIL": 1.0 },
"MAR": {"MIL2": 0.6, "ATH": 0.4 },
"MIL2": {"GEN": 0.3, "MAR": 0.3, "VIE": 0.3 },
"PAR": {"GEN": 1.0 },
"POR": {"LIS": 1.0 },
"POZ": {"HAM": 1.0 },
"PRA": {"VIE": 1.0 },
"RIG": {"KAU": 1.0 },
"SOF": {"THE": 0.5, "BUC": 0.5 },
# "STO": {"COP": 0.5, "HEL": 0.5 },
"TAR": {"RIG": 1.0 },
"THE": {"ATH": 0.5, "SOF": 0.5 },
"VIE": {"MIL2": 0.6, "PRA": 0.2, "BUC": 0.2 },
"ZAG": {"BUD": 0.5, "LJU": 0.5 },
}
import os
import re
import argparse
import sys
import pandas as pd # https://pandas.pydata.org
from datetime import datetime, timezone, timedelta
from pathlib import Path
import numpy as np
###############################################################################
# INPUT DATA SECTION
###############################################################################
# make sure this matches with the What-If Scenario runner script
RAW_REPORT_DIRECTORY="/Users/daniel.verlouw/Desktop/rawreports"
RAW_REPORT_FILE_PREFIX="raw_capacityreport_"
RAW_REPORT_FILE_SUFFIX="csv"
CONSOLIDATED_REPORT_DIRECTORY="/Users/daniel.verlouw/Desktop/consolidatedreports"
CONSOLIDATED_REPORT_FILE_PREFIX="consolidated_capacityreport_"
CONSOLIDATED_REPORT_FILE_SUFFIX_RAW="csv"
CONSOLIDATED_REPORT_FILE_SUFFIX_HUMAN="txt"
# Scenarios triggering a 'MUST UPGRADE' if threshold is exceeded
MUST_UPGRADE_SCENARIOS=["normal","onelinkfail","onenodefail"]
# Scenarios triggering a 'SHOULD UPGRADE' if threshold is exceeded
SHOULD_UPGRADE_SCENARIOS=["twolinkfail","nodelinkfail"]
# ISO 8601 basic timestamp format (YYYYMMDDTHHMMZ)
ISO8601_FORMAT = '%Y%m%dT%H%MZ'
ISO8601_REGEXP = r'\d{4}\d{2}\d{2}T\d{2}\d{2}Z'
###############################################################################
# RAW CONSOLIDATED REPORT
###############################################################################
# --- Helper function to get the row of the max usage for a given column, handling empty/missing columns ---
def get_max_usage_row(group, usage_col):
"""
Returns a single row (as a Series) within `group` that has the maximum value in `usage_col`.
If `usage_col` does not exist or is entirely NaN, returns None.
"""
# If the column doesn't exist or has all null values, return None
if usage_col not in group.columns or group[usage_col].dropna().empty:
return None
max_val = group[usage_col].max()
# Filter to rows where usage_col equals the maximum value.
max_rows = group[group[usage_col] == max_val]
# Return only the first row among those with the maximum value.
return max_rows.iloc[0]
def extract_usage_details(group):
"""
For a single group of rows (all links with the same ID), find the row with the max usage for each usage field (Gbps)
and extract the relevant columns.
Booleans are set to True if at least one row in the group is True.
"""
# We'll create a dict to hold the final data for this ID.
out = {}
# Basic info columns (these are assumed to be consistent within the group)
first_row = group.iloc[0]
out['ID'] = first_row.get('ID', None)
out['NodeA'] = first_row.get('NodeA', None)
out['NodeB'] = first_row.get('NodeB', None)
out['CapacityGbps'] = first_row.get('CapacityGbps', None)
out['ConfNormalThrPct'] = first_row.get('ConfNormalThrPct', None)
out['ConfNormalThrGbps'] = first_row.get('ConfNormalThrGbps', None)
out['ConfFailThrPct'] = first_row.get('ConfFailThrPct', None)
out['ConfFailThrGbps'] = first_row.get('ConfFailThrGbps', None)
# Normal usage
normal_row = get_max_usage_row(group, 'NormalUsageGbps')
out['NormalEnabled'] = bool(group['NormalEnabled'].any()) if 'NormalEnabled' in group.columns else False
out['NormalUsagePct'] = normal_row.get('NormalUsagePct', np.nan) if normal_row is not None else np.nan
out['NormalUsageGbps'] = normal_row.get('NormalUsageGbps', np.nan) if normal_row is not None else np.nan
out['NormalUsageExceed'] = bool(group['NormalUsageExceed'].any()) if 'NormalUsageExceed' in group.columns else False
out['NormalDateTime'] = normal_row.get('NormalDateTime', pd.NaT) if normal_row is not None else pd.NaT
# 1 Link Fail usage
one_link_row = get_max_usage_row(group, '1LinkFailUsageGbps')
out['1LinkFailEnabled'] = bool(group['1LinkFailEnabled'].any()) if '1LinkFailEnabled' in group.columns else False
out['1LinkFailScenario'] = one_link_row.get('1LinkFailScenario', None) if one_link_row is not None else None
out['1LinkFailUsagePct'] = one_link_row.get('1LinkFailUsagePct', np.nan) if one_link_row is not None else np.nan
out['1LinkFailUsageGbps'] = one_link_row.get('1LinkFailUsageGbps', np.nan) if one_link_row is not None else np.nan
out['1LinkFailUsageExceed'] = bool(group['1LinkFailUsageExceed'].any()) if '1LinkFailUsageExceed' in group.columns else False
out['1LinkFailUsageTime'] = one_link_row.get('1LinkFailUsageTime', pd.NaT) if one_link_row is not None else pd.NaT
# 2 Link Fail usage
two_link_row = get_max_usage_row(group, '2LinkFailUsageGbps')
out['2LinkFailEnabled'] = bool(group['2LinkFailEnabled'].any()) if '2LinkFailEnabled' in group.columns else False
out['2LinkFailScenario'] = two_link_row.get('2LinkFailScenario', None) if two_link_row is not None else None
out['2LinkFailUsagePct'] = two_link_row.get('2LinkFailUsagePct', np.nan) if two_link_row is not None else np.nan
out['2LinkFailUsageGbps'] = two_link_row.get('2LinkFailUsageGbps', np.nan) if two_link_row is not None else np.nan
out['2LinkFailUsageExceed'] = bool(group['2LinkFailUsageExceed'].any()) if '2LinkFailUsageExceed' in group.columns else False
out['2LinkFailUsageTime'] = two_link_row.get('2LinkFailUsageTime', pd.NaT) if two_link_row is not None else pd.NaT
# 1 Node Fail usage
one_node_row = get_max_usage_row(group, '1NodeFailUsageGbps')
out['1NodeFailEnabled'] = bool(group['1NodeFailEnabled'].any()) if '1NodeFailEnabled' in group.columns else False
out['1NodeFailScenario'] = one_node_row.get('1NodeFailScenario', None) if one_node_row is not None else None
out['1NodeFailUsagePct'] = one_node_row.get('1NodeFailUsagePct', np.nan) if one_node_row is not None else np.nan
out['1NodeFailUsageGbps'] = one_node_row.get('1NodeFailUsageGbps', np.nan) if one_node_row is not None else np.nan
out['1NodeFailUsageExceed'] = bool(group['1NodeFailUsageExceed'].any()) if '1NodeFailUsageExceed' in group.columns else False
out['1NodeFailUsageTime'] = one_node_row.get('1NodeFailUsageTime', pd.NaT) if one_node_row is not None else pd.NaT
# Node+1 Link Fail usage
node_plus_link_row = get_max_usage_row(group, 'Node+1LinkFailUsageGbps')
out['Node+1LinkFailEnabled'] = bool(group['Node+1LinkFailEnabled'].any()) if 'Node+1LinkFailEnabled' in group.columns else False
out['Node+1LinkFailScenario'] = node_plus_link_row.get('Node+1LinkFailScenario', None) if node_plus_link_row is not None else None
out['Node+1LinkFailUsagePct'] = node_plus_link_row.get('Node+1LinkFailUsagePct', np.nan) if node_plus_link_row is not None else np.nan
out['Node+1LinkFailUsageGbps'] = node_plus_link_row.get('Node+1LinkFailUsageGbps', np.nan) if node_plus_link_row is not None else np.nan
out['Node+1LinkFailUsageExceed'] = bool(group['Node+1LinkFailUsageExceed'].any()) if 'Node+1LinkFailUsageExceed' in group.columns else False
out['Node+1LinkFailUsageTime'] = node_plus_link_row.get('Node+1LinkFailUsageTime', pd.NaT) if node_plus_link_row is not None else pd.NaT
# Finally, consolidated upgrade flags
out['MustUpgrade'] = bool(group['MustUpgrade'].any()) if 'MustUpgrade' in group.columns else False
out['ShouldUpgrade'] = bool(group['ShouldUpgrade'].any()) if 'ShouldUpgrade' in group.columns else False
return pd.Series(out)
###############################################################################
# HUMAN READABLE CONSOLIDATED REPORT
###############################################################################
def build_human_report(df_raw):
df_human= df_raw.copy()
# Helper formatting functions
def mark_upgrade(scenario, exceed):
if scenario in MUST_UPGRADE_SCENARIOS and exceed:
return f" (**)"
elif scenario in SHOULD_UPGRADE_SCENARIOS and exceed:
return f" (*)"
else:
return ""
def normal_thr(row):
return f"{row['ConfNormalThrPct']*100:.0f}% / {row['ConfNormalThrGbps']:.0f}G"
def fail_thr(row):
return f"{row['ConfFailThrPct']*100:.0f}% / {row['ConfFailThrGbps']:.0f}G"
def normal_usage(row):
return f"{row['NormalUsagePct']*100:.0f}% / {row['NormalUsageGbps']:.1f}G" + mark_upgrade("normal", row['NormalUsageExceed'])
def onelinkfail_usage(row):
return f"{row['1LinkFailUsagePct']*100:.0f}% / {row['1LinkFailUsageGbps']:.1f}G" + mark_upgrade("onelinkfail", row['1LinkFailUsageExceed'])
def twolinkfail_usage(row):
return f"{row['2LinkFailUsagePct']*100:.0f}% / {row['2LinkFailUsageGbps']:.1f}G" + mark_upgrade("twolinkfail", row['2LinkFailUsageExceed'])
def onenodefail_usage(row):
return f"{row['1NodeFailUsagePct']*100:.0f}% / {row['1NodeFailUsageGbps']:.1f}G" + mark_upgrade("onenodefail", row['1NodeFailUsageExceed'])
def nodelinkfail_usage(row):
return f"{row['Node+1LinkFailUsagePct']*100:.0f}% / {row['Node+1LinkFailUsageGbps']:.1f}G" + mark_upgrade("nodelinkfail", row['Node+1LinkFailUsageExceed'])
def upgrade(row):
return "MUST (**)" if row['MustUpgrade'] else ("SHOULD (*)" if row['ShouldUpgrade'] else "NO")
df_human['ConfNormalThr'] = df_human.apply(normal_thr, axis=1)
df_human['ConfFailThr'] = df_human.apply(fail_thr, axis=1)
df_human['NormalUsage'] = df_human.apply(normal_usage, axis=1)
df_human['1LinkFailUsage'] = df_human.apply(onelinkfail_usage, axis=1)
df_human['2LinkFailUsage'] = df_human.apply(twolinkfail_usage, axis=1)
df_human['1NodeFailUsage'] = df_human.apply(onenodefail_usage, axis=1)
df_human['Node+1LinkFailUsage'] = df_human.apply(nodelinkfail_usage, axis=1)
df_human['Upgrade'] = df_human.apply(upgrade, axis=1)
# Drop unused columns
df_human.drop(['ConfNormalThrPct','ConfNormalThrGbps'],axis=1, inplace=True)
df_human.drop(['ConfFailThrPct','ConfFailThrGbps'],axis=1, inplace=True)
df_human.drop(['NormalUsagePct','NormalUsageGbps','NormalUsageExceed'],axis=1, inplace=True)
df_human.drop(['1LinkFailUsagePct','1LinkFailUsageGbps','1LinkFailUsageExceed','1LinkFailUsageTime'],axis=1, inplace=True)
df_human.drop(['2LinkFailUsagePct','2LinkFailUsageGbps','2LinkFailUsageExceed','2LinkFailUsageTime'],axis=1, inplace=True)
df_human.drop(['1NodeFailUsagePct','1NodeFailUsageGbps','1NodeFailUsageExceed','1NodeFailUsageTime'],axis=1, inplace=True)
df_human.drop(['Node+1LinkFailUsagePct','Node+1LinkFailUsageGbps','Node+1LinkFailUsageExceed','Node+1LinkFailUsageTime'],axis=1, inplace=True)
df_human.drop(['MustUpgrade','ShouldUpgrade'],axis=1, inplace=True)
# Replace NaN and NaT values with "N/A"
df_human['NormalDateTime'] = df_human['NormalDateTime'].astype(object)
df_human.fillna("N/A", inplace=True)
return df_human[['NodeA', 'NodeB', 'CapacityGbps', 'ConfNormalThr', 'ConfFailThr',
'NormalUsage', 'NormalDateTime',
'1LinkFailScenario', '1LinkFailUsage',
'2LinkFailScenario', '2LinkFailUsage',
'1NodeFailScenario', '1NodeFailUsage',
'Node+1LinkFailScenario', 'Node+1LinkFailUsage',
'Upgrade']]
###############################################################################
# FILE FUNCTIONS
###############################################################################
def find_files_by_timeframe(directory, prefix, suffix, start_datetime, end_datetime):
# List all raw reports in directory
all_raw_reports = [
file for file in os.listdir(directory)
if os.path.isfile(os.path.join(directory, file))
and file.startswith(prefix)
and file.endswith(suffix)
and re.search(ISO8601_REGEXP, file)
]
# Filter to files that match the timestamp pattern within the specified datetime range
matching_files = []
for file in all_raw_reports:
match = re.search(ISO8601_REGEXP, file)
file_date = datetime.strptime(match.group(), ISO8601_FORMAT).replace(tzinfo=timezone.utc)
if start_datetime <= file_date <= end_datetime:
matching_files.append(os.path.join(directory, file))
return matching_files
def store_consolidated(df_consolidated, directory, prefix, suffix):
path = Path(directory)
path.mkdir(parents=True, exist_ok=True) # Create directory if it doesn't exist
# Create a ISO8601 basic format UTC timestamped filename
timestamp = datetime.now(timezone.utc).strftime(ISO8601_FORMAT)
filename = f'{prefix}{timestamp}.{suffix}'
if suffix == "csv":
df_consolidated.to_csv(os.path.join(path, filename), sep=',', encoding='utf-8', date_format=ISO8601_FORMAT, header=True)
elif suffix == "txt":
markdown = df_consolidated.to_markdown(headers='keys', tablefmt='psql')
# Write the markdown string to a file
with open(os.path.join(path, filename), "w") as file:
file.write(markdown)
###############################################################################
# MAIN
###############################################################################
def main():
# Parse commandline arguments
parser = argparse.ArgumentParser(description='Script usage:')
parser.add_argument('--daily', action='store_true', help='Create daily report (past day)')
parser.add_argument('--weekly', action='store_true', help='Create weekly report (past week)')
parser.add_argument('--monthly', action='store_true', help='Create monthly report (past month)')
parser.add_argument('--quarterly', action='store_true', help='Create quarterly report (past quarter)')
parser.add_argument('--yearly', action='store_true', help='Create yearly report (past year)')
if len(sys.argv) == 1:
# No arguments were provided; print help message.
parser.print_help()
sys.exit(1)
args = parser.parse_args()
today = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0)
first_day_of_current_month = today.replace(day=1)
if args.daily:
start_datetime= today - timedelta(days=1)
end_datetime = today
elif args.weekly:
start_datetime= today - timedelta(weeks=1)
end_datetime = today
elif args.monthly:
# First day of last month
start_datetime= (first_day_of_current_month - timedelta(days=1)).replace(day=1)
# First day of current month
end_datetime = first_day_of_current_month
elif args.quarterly:
# Approximates the quarter as 90 days before the start of the current month.
start_datetime= (first_day_of_current_month - timedelta(days=1)).replace(day=1) - timedelta(days=(first_day_of_current_month.month - 1) % 3 * 30)
end_datetime = (first_day_of_current_month - timedelta(days=1)).replace(day=1)
elif args.yearly:
# First day of the previous year
start_datetime= today.replace(year=today.year - 1, month=1, day=1)
end_datetime = today.replace(year=today.year, month=1, day=1)
matching_files= find_files_by_timeframe(RAW_REPORT_DIRECTORY, RAW_REPORT_FILE_PREFIX, RAW_REPORT_FILE_SUFFIX, start_datetime, end_datetime)
if len(matching_files) > 0:
print(f"Generating consolidated report for {len(matching_files)} raw reports for timeframe {start_datetime} through {end_datetime}")
# List of columns that should be parsed as dates from CSV
date_columns = ['NormalDateTime', '1LinkFailUsageTime', '2LinkFailUsageTime', '1NodeFailUsageTime', 'Node+1LinkFailUsageTime']
# Read and concat CSVs
dataframes= [pd.read_csv(file, sep=',', encoding='utf-8', parse_dates=date_columns, date_format=ISO8601_FORMAT, float_precision='round_trip') for file in matching_files]
concat_df = pd.concat(dataframes)
# Walk over the results for each link and extract and store the highest usage for each scenario
results = []
for id_val, group in concat_df.groupby('ID'):
details = extract_usage_details(group)
# Overwrite ID with the group key to be sure
details['ID'] = id_val
results.append(details)
consolidated_raw = pd.DataFrame(results)
consolidated_raw.set_index("ID", inplace=True)
store_consolidated(consolidated_raw, CONSOLIDATED_REPORT_DIRECTORY, CONSOLIDATED_REPORT_FILE_PREFIX, CONSOLIDATED_REPORT_FILE_SUFFIX_RAW)
consolidated_human= build_human_report(consolidated_raw)
store_consolidated(consolidated_human, CONSOLIDATED_REPORT_DIRECTORY, CONSOLIDATED_REPORT_FILE_PREFIX, CONSOLIDATED_REPORT_FILE_SUFFIX_HUMAN)
else:
print(f"No raw files found for timeframe {start_datetime} through {end_datetime}")
if __name__=="__main__":
main()
This diff is collapsed.
"""GÉANT Capacity Planner."""
[alembic]
script_location = alembic
prepend_sys_path = .
version_path_separator = os
sqlalchemy.url = driver://user:pass@localhost/dbname
[post_write_hooks]
# lint with attempts to fix using "ruff" - use the exec runner, execute a binary
hooks = ruff
ruff.type = exec
ruff.executable = %(here)s/venv/bin/ruff
ruff.options = check --fix REVISION_SCRIPT_FILENAME
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARNING
handlers = console
qualname =
[logger_sqlalchemy]
level = WARNING
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S
from logging.config import fileConfig
from alembic import context
from sqlalchemy import engine_from_config, pool
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = None
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision: str = ${repr(up_revision)}
down_revision: Union[str, None] = ${repr(down_revision)}
branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)}
depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}
def upgrade() -> None:
"""Upgrade schema."""
${upgrades if upgrades else "pass"}
def downgrade() -> None:
"""Downgrade schema."""
${downgrades if downgrades else "pass"}
"""Generate a capacity report."""
import argparse
import re
import sys
from datetime import UTC, datetime, timedelta
from logging import getLogger
from pathlib import Path
import numpy as np
import pandas as pd
logger = getLogger(__name__)
# make sure this matches with the What-If Scenario runner script
RAW_REPORT_DIRECTORY = "/Users/daniel.verlouw/Desktop/rawreports"
RAW_REPORT_FILE_PREFIX = "raw_capacityreport_"
RAW_REPORT_FILE_SUFFIX = "csv"
CONSOLIDATED_REPORT_DIRECTORY = "/Users/daniel.verlouw/Desktop/consolidatedreports"
CONSOLIDATED_REPORT_FILE_PREFIX = "consolidated_capacityreport_"
CONSOLIDATED_REPORT_FILE_SUFFIX_RAW = "csv"
CONSOLIDATED_REPORT_FILE_SUFFIX_HUMAN = "txt"
# Scenarios triggering a 'MUST UPGRADE' if threshold is exceeded
MUST_UPGRADE_SCENARIOS = ["normal", "onelinkfail", "onenodefail"]
# Scenarios triggering a 'SHOULD UPGRADE' if threshold is exceeded
SHOULD_UPGRADE_SCENARIOS = ["twolinkfail", "nodelinkfail"]
# ISO 8601 basic timestamp format (YYYYMMDDTHHMMZ)
ISO8601_FORMAT = "%Y%m%dT%H%MZ"
ISO8601_REGEXP = r"\d{4}\d{2}\d{2}T\d{2}\d{2}Z"
# --- Helper function to get the row of the max usage for a given column, handling empty/missing columns ---
def get_max_usage_row(group, usage_col):
"""Given a list of rows, return the row with the highest usage.
Returns a single row (as a Series) within `group` that has the maximum value in `usage_col`.
If `usage_col` does not exist or is entirely NaN, returns None.
"""
# If the column doesn't exist or has all null values, return None
if usage_col not in group.columns or group[usage_col].dropna().empty:
return None
max_val = group[usage_col].max()
# Filter to rows where usage_col equals the maximum value.
max_rows = group[group[usage_col] == max_val]
# Return only the first row among those with the maximum value.
return max_rows.iloc[0]
def extract_usage_details(group):
"""Extract usage details.
For a single group of rows (all links with the same ID), find the row with the max usage for each usage field (Gbps)
and extract the relevant columns. Booleans are set to True if at least one row in the group is True.
"""
# We'll create a dict to hold the final data for this ID.
out = {}
# Basic info columns (these are assumed to be consistent within the group)
first_row = group.iloc[0]
out["ID"] = first_row.get("ID", None)
out["NodeA"] = first_row.get("NodeA", None)
out["NodeB"] = first_row.get("NodeB", None)
out["CapacityGbps"] = first_row.get("CapacityGbps", None)
out["ConfNormalThrPct"] = first_row.get("ConfNormalThrPct", None)
out["ConfNormalThrGbps"] = first_row.get("ConfNormalThrGbps", None)
out["ConfFailThrPct"] = first_row.get("ConfFailThrPct", None)
out["ConfFailThrGbps"] = first_row.get("ConfFailThrGbps", None)
# Normal usage
normal_row = get_max_usage_row(group, "NormalUsageGbps")
out["NormalEnabled"] = bool(group["NormalEnabled"].any()) if "NormalEnabled" in group.columns else False
out["NormalUsagePct"] = normal_row.get("NormalUsagePct", np.nan) if normal_row is not None else np.nan
out["NormalUsageGbps"] = normal_row.get("NormalUsageGbps", np.nan) if normal_row is not None else np.nan
out["NormalUsageExceed"] = bool(group["NormalUsageExceed"].any()) if "NormalUsageExceed" in group.columns else False
out["NormalDateTime"] = normal_row.get("NormalDateTime", pd.NaT) if normal_row is not None else pd.NaT
# 1 Link Fail usage
one_link_row = get_max_usage_row(group, "1LinkFailUsageGbps")
out["1LinkFailEnabled"] = bool(group["1LinkFailEnabled"].any()) if "1LinkFailEnabled" in group.columns else False
out["1LinkFailScenario"] = one_link_row.get("1LinkFailScenario", None) if one_link_row is not None else None
out["1LinkFailUsagePct"] = one_link_row.get("1LinkFailUsagePct", np.nan) if one_link_row is not None else np.nan
out["1LinkFailUsageGbps"] = one_link_row.get("1LinkFailUsageGbps", np.nan) if one_link_row is not None else np.nan
out["1LinkFailUsageExceed"] = (
bool(group["1LinkFailUsageExceed"].any()) if "1LinkFailUsageExceed" in group.columns else False
)
out["1LinkFailUsageTime"] = one_link_row.get("1LinkFailUsageTime", pd.NaT) if one_link_row is not None else pd.NaT
# 2 Link Fail usage
two_link_row = get_max_usage_row(group, "2LinkFailUsageGbps")
out["2LinkFailEnabled"] = bool(group["2LinkFailEnabled"].any()) if "2LinkFailEnabled" in group.columns else False
out["2LinkFailScenario"] = two_link_row.get("2LinkFailScenario", None) if two_link_row is not None else None
out["2LinkFailUsagePct"] = two_link_row.get("2LinkFailUsagePct", np.nan) if two_link_row is not None else np.nan
out["2LinkFailUsageGbps"] = two_link_row.get("2LinkFailUsageGbps", np.nan) if two_link_row is not None else np.nan
out["2LinkFailUsageExceed"] = (
bool(group["2LinkFailUsageExceed"].any()) if "2LinkFailUsageExceed" in group.columns else False
)
out["2LinkFailUsageTime"] = two_link_row.get("2LinkFailUsageTime", pd.NaT) if two_link_row is not None else pd.NaT
# 1 Node Fail usage
one_node_row = get_max_usage_row(group, "1NodeFailUsageGbps")
out["1NodeFailEnabled"] = bool(group["1NodeFailEnabled"].any()) if "1NodeFailEnabled" in group.columns else False
out["1NodeFailScenario"] = one_node_row.get("1NodeFailScenario", None) if one_node_row is not None else None
out["1NodeFailUsagePct"] = one_node_row.get("1NodeFailUsagePct", np.nan) if one_node_row is not None else np.nan
out["1NodeFailUsageGbps"] = one_node_row.get("1NodeFailUsageGbps", np.nan) if one_node_row is not None else np.nan
out["1NodeFailUsageExceed"] = (
bool(group["1NodeFailUsageExceed"].any()) if "1NodeFailUsageExceed" in group.columns else False
)
out["1NodeFailUsageTime"] = one_node_row.get("1NodeFailUsageTime", pd.NaT) if one_node_row is not None else pd.NaT
# Node+1 Link Fail usage
node_plus_link_row = get_max_usage_row(group, "Node+1LinkFailUsageGbps")
out["Node+1LinkFailEnabled"] = (
bool(group["Node+1LinkFailEnabled"].any()) if "Node+1LinkFailEnabled" in group.columns else False
)
out["Node+1LinkFailScenario"] = (
node_plus_link_row.get("Node+1LinkFailScenario", None) if node_plus_link_row is not None else None
)
out["Node+1LinkFailUsagePct"] = (
node_plus_link_row.get("Node+1LinkFailUsagePct", np.nan) if node_plus_link_row is not None else np.nan
)
out["Node+1LinkFailUsageGbps"] = (
node_plus_link_row.get("Node+1LinkFailUsageGbps", np.nan) if node_plus_link_row is not None else np.nan
)
out["Node+1LinkFailUsageExceed"] = (
bool(group["Node+1LinkFailUsageExceed"].any()) if "Node+1LinkFailUsageExceed" in group.columns else False
)
out["Node+1LinkFailUsageTime"] = (
node_plus_link_row.get("Node+1LinkFailUsageTime", pd.NaT) if node_plus_link_row is not None else pd.NaT
)
# Finally, consolidated upgrade flags
out["MustUpgrade"] = bool(group["MustUpgrade"].any()) if "MustUpgrade" in group.columns else False
out["ShouldUpgrade"] = bool(group["ShouldUpgrade"].any()) if "ShouldUpgrade" in group.columns else False
return pd.Series(out)
def build_human_report(df_raw):
"""Build a human-readable report."""
df_human = df_raw.copy()
# Helper formatting functions
def mark_upgrade(scenario, exceed):
if scenario in MUST_UPGRADE_SCENARIOS and exceed:
return " (**)"
if scenario in SHOULD_UPGRADE_SCENARIOS and exceed:
return " (*)"
return ""
def normal_thr(row):
return f"{row['ConfNormalThrPct'] * 100:.0f}% / {row['ConfNormalThrGbps']:.0f}G"
def fail_thr(row):
return f"{row['ConfFailThrPct'] * 100:.0f}% / {row['ConfFailThrGbps']:.0f}G"
def normal_usage(row):
return f"{row['NormalUsagePct'] * 100:.0f}% / {row['NormalUsageGbps']:.1f}G" + mark_upgrade(
"normal", row["NormalUsageExceed"]
)
def onelinkfail_usage(row):
return f"{row['1LinkFailUsagePct'] * 100:.0f}% / {row['1LinkFailUsageGbps']:.1f}G" + mark_upgrade(
"onelinkfail", row["1LinkFailUsageExceed"]
)
def twolinkfail_usage(row):
return f"{row['2LinkFailUsagePct'] * 100:.0f}% / {row['2LinkFailUsageGbps']:.1f}G" + mark_upgrade(
"twolinkfail", row["2LinkFailUsageExceed"]
)
def onenodefail_usage(row):
return f"{row['1NodeFailUsagePct'] * 100:.0f}% / {row['1NodeFailUsageGbps']:.1f}G" + mark_upgrade(
"onenodefail", row["1NodeFailUsageExceed"]
)
def nodelinkfail_usage(row):
return f"{row['Node+1LinkFailUsagePct'] * 100:.0f}% / {row['Node+1LinkFailUsageGbps']:.1f}G" + mark_upgrade(
"nodelinkfail", row["Node+1LinkFailUsageExceed"]
)
def upgrade(row):
return "MUST (**)" if row["MustUpgrade"] else ("SHOULD (*)" if row["ShouldUpgrade"] else "NO")
df_human["ConfNormalThr"] = df_human.apply(normal_thr, axis=1)
df_human["ConfFailThr"] = df_human.apply(fail_thr, axis=1)
df_human["NormalUsage"] = df_human.apply(normal_usage, axis=1)
df_human["1LinkFailUsage"] = df_human.apply(onelinkfail_usage, axis=1)
df_human["2LinkFailUsage"] = df_human.apply(twolinkfail_usage, axis=1)
df_human["1NodeFailUsage"] = df_human.apply(onenodefail_usage, axis=1)
df_human["Node+1LinkFailUsage"] = df_human.apply(nodelinkfail_usage, axis=1)
df_human["Upgrade"] = df_human.apply(upgrade, axis=1)
# Drop unused columns
df_human.drop(["ConfNormalThrPct", "ConfNormalThrGbps"], axis=1, inplace=True)
df_human.drop(["ConfFailThrPct", "ConfFailThrGbps"], axis=1, inplace=True)
df_human.drop(["NormalUsagePct", "NormalUsageGbps", "NormalUsageExceed"], axis=1, inplace=True)
df_human.drop(
["1LinkFailUsagePct", "1LinkFailUsageGbps", "1LinkFailUsageExceed", "1LinkFailUsageTime"], axis=1, inplace=True
)
df_human.drop(
["2LinkFailUsagePct", "2LinkFailUsageGbps", "2LinkFailUsageExceed", "2LinkFailUsageTime"], axis=1, inplace=True
)
df_human.drop(
["1NodeFailUsagePct", "1NodeFailUsageGbps", "1NodeFailUsageExceed", "1NodeFailUsageTime"], axis=1, inplace=True
)
df_human.drop(
["Node+1LinkFailUsagePct", "Node+1LinkFailUsageGbps", "Node+1LinkFailUsageExceed", "Node+1LinkFailUsageTime"],
axis=1,
inplace=True,
)
df_human.drop(["MustUpgrade", "ShouldUpgrade"], axis=1, inplace=True)
# Replace NaN and NaT values with "N/A"
df_human["NormalDateTime"] = df_human["NormalDateTime"].astype(object)
df_human.fillna("N/A", inplace=True)
return df_human[
[
"NodeA",
"NodeB",
"CapacityGbps",
"ConfNormalThr",
"ConfFailThr",
"NormalUsage",
"NormalDateTime",
"1LinkFailScenario",
"1LinkFailUsage",
"2LinkFailScenario",
"2LinkFailUsage",
"1NodeFailScenario",
"1NodeFailUsage",
"Node+1LinkFailScenario",
"Node+1LinkFailUsage",
"Upgrade",
]
]
def find_files_by_timeframe(directory, prefix, suffix, start_datetime, end_datetime):
"""Find all files that fall within a given timeframe."""
# List all raw reports in directory
all_raw_reports = [
file
for file in Path(directory).iterdir()
if Path(directory / file).is_file()
and file.name.startswith(prefix)
and file.name.endswith(suffix)
and re.search(ISO8601_REGEXP, file.name)
]
# Filter to files that match the timestamp pattern within the specified datetime range
matching_files = []
for file in all_raw_reports:
match = re.search(ISO8601_REGEXP, file.name)
file_date = datetime.strptime(match.group(), ISO8601_FORMAT).replace(tzinfo=UTC)
if start_datetime <= file_date <= end_datetime:
matching_files.append(Path(directory / file))
return matching_files
def store_consolidated(df_consolidated, directory, prefix, suffix):
"""Store consolidated results in a file."""
path = Path(directory)
path.mkdir(parents=True, exist_ok=True) # Create directory if it doesn't exist
# Create a ISO8601 basic format UTC timestamped filename
timestamp = datetime.now(UTC).strftime(ISO8601_FORMAT)
filename = f"{prefix}{timestamp}.{suffix}"
if suffix == "csv":
df_consolidated.to_csv(
Path(path / filename), sep=",", encoding="utf-8", date_format=ISO8601_FORMAT, header=True
)
elif suffix == "txt":
markdown = df_consolidated.to_markdown(headers="keys", tablefmt="psql")
# Write the markdown string to a file
Path(path / filename).write_text(markdown)
def main():
"""Main method for running the capacity planner."""
# Parse commandline arguments
parser = argparse.ArgumentParser(description="Script usage:")
parser.add_argument("--daily", action="store_true", help="Create daily report (past day)")
parser.add_argument("--weekly", action="store_true", help="Create weekly report (past week)")
parser.add_argument("--monthly", action="store_true", help="Create monthly report (past month)")
parser.add_argument("--quarterly", action="store_true", help="Create quarterly report (past quarter)")
parser.add_argument("--yearly", action="store_true", help="Create yearly report (past year)")
if len(sys.argv) == 1:
# No arguments were provided; print help message.
parser.print_help()
sys.exit(1)
args = parser.parse_args()
today = datetime.now(UTC).replace(hour=0, minute=0, second=0, microsecond=0)
first_day_of_current_month = today.replace(day=1)
if args.daily:
start_datetime = today - timedelta(days=1)
end_datetime = today
elif args.weekly:
start_datetime = today - timedelta(weeks=1)
end_datetime = today
elif args.monthly:
# First day of last month
start_datetime = (first_day_of_current_month - timedelta(days=1)).replace(day=1)
# First day of current month
end_datetime = first_day_of_current_month
elif args.quarterly:
# Approximates the quarter as 90 days before the start of the current month.
start_datetime = (first_day_of_current_month - timedelta(days=1)).replace(day=1) - timedelta(
days=(first_day_of_current_month.month - 1) % 3 * 30
)
end_datetime = (first_day_of_current_month - timedelta(days=1)).replace(day=1)
elif args.yearly:
# First day of the previous year
start_datetime = today.replace(year=today.year - 1, month=1, day=1)
end_datetime = today.replace(year=today.year, month=1, day=1)
matching_files = find_files_by_timeframe(
RAW_REPORT_DIRECTORY, RAW_REPORT_FILE_PREFIX, RAW_REPORT_FILE_SUFFIX, start_datetime, end_datetime
)
if len(matching_files) > 0:
msg = (
f"Generating consolidated report for {len(matching_files)} raw reports for timeframe {start_datetime} "
f"through {end_datetime}"
)
logger.info(msg)
# List of columns that should be parsed as dates from CSV
date_columns = [
"NormalDateTime",
"1LinkFailUsageTime",
"2LinkFailUsageTime",
"1NodeFailUsageTime",
"Node+1LinkFailUsageTime",
]
# Read and concat CSVs
dataframes = [
pd.read_csv(
file,
sep=",",
encoding="utf-8",
parse_dates=date_columns,
date_format=ISO8601_FORMAT,
float_precision="round_trip",
)
for file in matching_files
]
concat_df = pd.concat(dataframes)
# Walk over the results for each link and extract and store the highest usage for each scenario
results = []
for id_val, group in concat_df.groupby("ID"):
details = extract_usage_details(group)
# Overwrite ID with the group key to be sure
details["ID"] = id_val
results.append(details)
consolidated_raw = pd.DataFrame(results)
consolidated_raw.set_index("ID", inplace=True)
store_consolidated(
consolidated_raw,
CONSOLIDATED_REPORT_DIRECTORY,
CONSOLIDATED_REPORT_FILE_PREFIX,
CONSOLIDATED_REPORT_FILE_SUFFIX_RAW,
)
consolidated_human = build_human_report(consolidated_raw)
store_consolidated(
consolidated_human,
CONSOLIDATED_REPORT_DIRECTORY,
CONSOLIDATED_REPORT_FILE_PREFIX,
CONSOLIDATED_REPORT_FILE_SUFFIX_HUMAN,
)
else:
msg = f"No raw files found for timeframe {start_datetime} through {end_datetime}"
logger.warning(msg)
if __name__ == "__main__":
main()
"""Different services that the capacity planner interacts with."""
"""Interactions with Kentik through their API."""
import os
import requests
import json
# Extract Kentik API settings from environment variables
API_URL = os.environ["KENTIK_API_URL"]
API_EMAIL= os.environ["KENTIK_API_EMAIL"]
API_TOKEN= os.environ["KENTIK_API_TOKEN"]
API_URL = os.environ["KENTIK_API_URL"]
API_EMAIL = os.environ["KENTIK_API_EMAIL"]
API_TOKEN = os.environ["KENTIK_API_TOKEN"]
# Kentik report period (in minutes)
KENTIK_REPORTING_PERIOD=60
KENTIK_REPORTING_PERIOD = 60
# Kentik flow aggregation window (in minutes -- lower values improve accuracy at the expense of processing time)
KENTIK_FLOW_AGGR_WINDOW=5
KENTIK_FLOW_AGGR_WINDOW = 5
# aggregate ingress (source) and egress (destination) on the Kentik 'Site'-level
INGRESS_DIMENSION="i_device_site_name"
EGRESS_DIMENSION ="i_ult_exit_site"
INGRESS_DIMENSION = "i_device_site_name"
EGRESS_DIMENSION = "i_ult_exit_site"
###############################################################################
# Fetch traffic matrix from the Kentik API
###############################################################################
def kentik_api_query(payload):
def _api_query(payload):
# Headers for authentication
headers = {
"Content-Type": "application/json",
"X-CH-Auth-Email": API_EMAIL,
"X-CH-Auth-API-Token": API_TOKEN
}
response = requests.post(API_URL, headers=headers, json=payload)
headers = {"Content-Type": "application/json", "X-CH-Auth-Email": API_EMAIL, "X-CH-Auth-API-Token": API_TOKEN}
response = requests.post(API_URL, headers=headers, json=payload, timeout=120)
response.raise_for_status()
return response.json()
if response.status_code == 200:
return response.json()
else:
print(f"Error fetching data from Kentik API: {response.status_code} - {response.text}")
return None
# Example in Kentik dashboard: https://portal.kentik.eu/v4/core/explorer/5787b62a7e8ae1ef7d399e08cdea2800
def fetch_kentik_trafic_matrix():
# JSON query payload
def fetch_kentik_traffic_matrix():
"""Fetch a traffic matrix from Kentik."""
payload = {
"version": 4,
"queries": [
{
"bucket": "Table",
"isOverlay": False,
"query": {
"all_devices": True,
"aggregateTypes": [
"max_in_bits_per_sec"
],
"depth": 350,
"topx": 350, # 350=max supported by Kentik
"device_name": [],
"fastData": "Auto",
"lookback_seconds": 60 * KENTIK_REPORTING_PERIOD,
# "starting_time": null,
# "ending_time": null,
"matrixBy": [],
"metric": [
"in_bytes"
],
"minsPolling": KENTIK_FLOW_AGGR_WINDOW,
"forceMinsPolling": True,
"outsort": "max_in_bits_per_sec",
"viz_type": "table",
"aggregates": [
{
"value": "max_in_bits_per_sec",
"column": "f_sum_in_bytes",
"fn": "max",
"label": "Bits/s Sampled at Ingress Max",
"unit": "in_bytes",
"parentUnit": "bytes",
"group": "Bits/s Sampled at Ingress",
"origLabel": "Max",
"sample_rate": 1,
"raw": True,
"name": "max_in_bits_per_sec"
}
],
"filters": {
"connector": "All",
"filterGroups": [
{
"connector": "All",
"filters": [
{
"filterField": EGRESS_DIMENSION,
"metric": "",
"aggregate": "",
"operator": "<>",
"filterValue": "", # Filter unassigned/unknown destinations
"rightFilterField": ""
}
],
"filterGroups": []
}
]
},
"dimension": [
INGRESS_DIMENSION,
EGRESS_DIMENSION
]
}
}
]
"version": 4,
"queries": [
{
"bucket": "Table",
"isOverlay": False,
"query": {
"all_devices": True,
"aggregateTypes": ["max_in_bits_per_sec"],
"depth": 350,
"topx": 350,
"device_name": [],
"fastData": "Auto",
"lookback_seconds": 60 * KENTIK_REPORTING_PERIOD,
"matrixBy": [],
"metric": ["in_bytes"],
"minsPolling": KENTIK_FLOW_AGGR_WINDOW,
"forceMinsPolling": True,
"outsort": "max_in_bits_per_sec",
"viz_type": "table",
"aggregates": [
{
"value": "max_in_bits_per_sec",
"column": "f_sum_in_bytes",
"fn": "max",
"label": "Bits/s Sampled at Ingress Max",
"unit": "in_bytes",
"parentUnit": "bytes",
"group": "Bits/s Sampled at Ingress",
"origLabel": "Max",
"sample_rate": 1,
"raw": True,
"name": "max_in_bits_per_sec",
}
],
"filters": {
"connector": "All",
"filterGroups": [
{
"connector": "All",
"filters": [
{
"filterField": EGRESS_DIMENSION,
"metric": "",
"aggregate": "",
"operator": "<>",
"filterValue": "", # Filter unassigned/unknown destinations
"rightFilterField": "",
}
],
"filterGroups": [],
}
],
},
"dimension": [INGRESS_DIMENSION, EGRESS_DIMENSION],
},
}
],
}
response = kentik_api_query(payload)
return response
return _api_query(payload)
###############################################################################
# Transform Kentik data to traffic matrices (one matrix per timestamp)
###############################################################################
def kentik_to_traffic_matrices(json_data, nodes):
"""
Convert the given JSON structure returned by Kentik into a dictionary
keyed by timestamp. For each timestamp, we store a nested dict of:
traffic_matrices[timestamp][ingress][egress] = traffic_rate_Mbps
"""
"""Convert the given JSON structure returned by Kentik into a dictionary keyed by timestamp.
For each timestamp, we store a nested dict of: traffic_matrices[timestamp][ingress][egress] = traffic_rate_Mbps
"""
# We'll gather all flows in the JSON
data_entries = json_data["results"][0]["data"]
......@@ -128,7 +114,7 @@ def kentik_to_traffic_matrices(json_data, nodes):
traffic_matrices = {}
for entry in data_entries:
ingress= entry[INGRESS_DIMENSION]
ingress = entry[INGRESS_DIMENSION]
egress = entry[EGRESS_DIMENSION]
# skip unknown ingress and/or egress nodes
......@@ -139,9 +125,9 @@ def kentik_to_traffic_matrices(json_data, nodes):
# timeSeries -> in_bits_per_sec -> flow => list of [timestamp, in_bits_per_sec, ignored_interval]
flow_list = entry["timeSeries"]["in_bits_per_sec"]["flow"]
for (timestamp, bits_per_sec, _ignored_interval) in flow_list:
for timestamp, bits_per_sec, _ignored_interval in flow_list:
traffic_matrices.setdefault(timestamp, {})
traffic_matrices[timestamp].setdefault(ingress, {})
traffic_matrices[timestamp][ingress][egress] = int(bits_per_sec / 1_000_000) # convert bps to Mbps
traffic_matrices[timestamp][ingress][egress] = int(bits_per_sec / 1_000_000) # convert bps to Mbps
return traffic_matrices
\ No newline at end of file
return traffic_matrices
"""Utilities for capacity-planner."""
"""Define each core link in a topology.
Each link is a tuple shaped like:
( linkID, nodeA, nodeB, igp_metric, capacity, srlg_list, [normal_threshold], [failure_threshold] )
where:
linkID: network-wide unique numeric ID (e.g. 1001)
nodeA, nodeB: core link endpoints
igp_metric: IGP cost/distance
capacity: full-duplex link capacity in Gbps
srlg_list: list of Shared Risk Link Group (SRLG) names (or empty)
normal_threshold: fraction for normal usage. If omitted default is used
failure_threshold: fraction for usage under failure. If omitted default is used
"""
CORELINKS = [
(1, "AMS", "FRA", 2016, 800, []),
(2, "AMS", "LON", 1428, 800, []),
(3, "FRA", "GEN", 2478, 800, []),
(4, "GEN", "PAR", 2421, 800, []),
(5, "LON", "LON2", 180, 800, []),
(6, "LON2", "PAR", 1866, 800, []),
(7, "FRA", "PRA", 3315, 300, []),
(8, "GEN", "MIL2", 4280, 300, []),
(9, "GEN", "MAR", 2915, 400, []),
(10, "HAM", "POZ", 3435, 300, []),
(11, "MAR", "MIL2", 3850, 300, []),
(12, "MIL2", "VIE", 5400, 400, []),
(13, "POZ", "PRA", 3215, 300, []),
(14, "PRA", "VIE", 2255, 300, []),
(15, "AMS", "HAM", 3270, 300, []),
(16, "BRU", "PAR", 50000, 100, []),
(17, "AMS", "BRU", 50000, 100, []),
(18, "BIL", "PAR", 5600, 300, []),
(19, "BUD", "ZAG", 2370, 200, []),
(20, "COR", "DUB", 5500, 100, []),
(21, "DUB", "LON", 8850, 100, []),
(22, "MAD", "MAR", 6805, 300, []),
(23, "BUC", "SOF", 5680, 200, []),
(24, "BRA", "VIE", 50000, 100, ["BRA-VIE"]),
(25, "LJU", "MIL2", 3330, 300, []),
(26, "LJU", "ZAG", 985, 200, []),
(27, "BUC", "BUD", 11850, 200, []),
(28, "LIS", "MAD", 8970, 200, []),
(29, "BIL", "POR", 9250, 200, []),
(30, "LIS", "POR", 3660, 200, []),
(31, "BIL", "MAD", 4000, 200, []),
(32, "ATH2", "MIL2", 25840, 100, ["MIL2-PRE"]),
(33, "ATH2", "THE", 8200, 100, []),
(34, "SOF", "THE", 5800, 100, []),
(35, "RIG", "TAR", 2900, 100, []),
(36, "KAU", "POZ", 10050, 100, []),
(37, "ZAG", "SOF", 9720, 200, []),
(38, "BRA", "BUD", 50000, 100, ["BRA-BUD"]),
(39, "COR", "LON2", 7160, 100, ["COR-LON2"]),
(40, "KAU", "RIG", 4500, 100, []),
(41, "COR", "PAR", 12549, 100, ["COR-LON2"]),
(42, "KIE", "POZ", 50000, 100, []),
(43, "CHI", "BUC", 50000, 40, []),
(44, "CHI", "KIE", 10000, 100, []),
(45, "HAM", "TAR", 12490, 100, []),
(46, "BUD", "VIE", 1725, 200, ["BRA-BUD", "BRA-VIE"]),
(47, "MIL2", "THE", 29200, 100, ["MIL2-PRE"]),
(48, "AMS", "LON", 50000, 100, []), # backup link via SURF
(49, "BUC", "FRA", 50000, 100, []), # backup link via TTI
]
# (B) List of nodes (must match the node names in CORELINKS and node/site names retrieved from Kentik)
# to be added: "BEL", "COP","HEL","LUX","STO"
NODES = [
"AMS",
"ATH2",
"BIL",
"BRA",
"BRU",
"BUC",
"BUD",
"CHI",
"COR",
"DUB",
"FRA",
"GEN",
"HAM",
"KAU",
"KIE",
"LIS",
"LJU",
"LON",
"LON2",
"MAD",
"MAR",
"MIL2",
"PAR",
"POR",
"POZ",
"PRA",
"RIG",
"SOF",
"TAR",
"THE",
"VIE",
"ZAG",
]
# Node failover ratio map for single-node fails
NODE_FAILOVER_RATIOS = {
"AMS": {"LON": 0.6, "FRA": 0.4},
"ATH": {"THE": 0.5, "MAR": 0.5},
"BIL": {"MAD": 1.0},
"BRA": {"VIE": 1.0},
"BRU": {"AMS": 1.0},
"BUC": {"VIE": 0.5, "SOF": 0.5},
"BUD": {"ZAG": 1.0},
"COR": {"DUB": 1.0},
"DUB": {"COR": 1.0},
"FRA": {"HAM": 0.4, "AMS": 0.4, "LON": 0.2},
"GEN": {"PAR": 0.6, "MIL2": 0.4},
"HAM": {"FRA": 0.5, "POZ": 0.2, "LON": 0.3},
"KAU": {"RIG": 1.0},
"LIS": {"POR": 1.0},
"LJU": {"ZAG": 1.0},
"LON": {"AMS": 0.4, "HAM": 0.2, "FRA": 0.4},
"LON2": {"LON": 1.0},
"MAD": {"BIL": 1.0},
"MAR": {"MIL2": 0.6, "ATH": 0.4},
"MIL2": {"GEN": 0.3, "MAR": 0.3, "VIE": 0.3},
"PAR": {"GEN": 1.0},
"POR": {"LIS": 1.0},
"POZ": {"HAM": 1.0},
"PRA": {"VIE": 1.0},
"RIG": {"KAU": 1.0},
"SOF": {"THE": 0.5, "BUC": 0.5},
"TAR": {"RIG": 1.0},
"THE": {"ATH": 0.5, "SOF": 0.5},
"VIE": {"MIL2": 0.6, "PRA": 0.2, "BUC": 0.2},
"ZAG": {"BUD": 0.5, "LJU": 0.5},
}
This diff is collapsed.
[tool.mypy]
exclude = ["venv"]
ignore_missing_imports = true
disallow_untyped_calls = true
disallow_untyped_defs = true
disallow_incomplete_defs = true
disallow_untyped_decorators = true
no_implicit_optional = true
strict_optional = true
namespace_packages = true
warn_unused_ignores = true
warn_redundant_casts = true
warn_no_return = true
warn_unreachable = true
implicit_reexport = false
strict_equality = true
show_error_codes = true
show_column_numbers = true
# Suppress "note: By default the bodies of untyped functions are not checked"
disable_error_code = "annotation-unchecked"
# Forbid the use of a generic "type: ignore" without specifying the exact error that is ignored
enable_error_code = "ignore-without-code"
[tool.ruff]
extend-exclude = [
"htmlcov",
"docs",
"capacity_planner/alembic"
]
target-version = "py312"
line-length = 120
[tool.ruff.lint]
ignore = [
"C901",
"COM812",
"D203",
"D213",
"ISC001",
"N805",
"PLC2801",
"PLR0913",
"PLR0904",
"PLW1514",
]
select = [
"A",
"ARG",
"B",
"BLE",
"C",
"COM",
"C4",
"C90",
"D",
"DTZ",
"E",
"EM",
"ERA",
"F",
"FA",
"FBT",
"FLY",
"FURB",
"G",
"I",
"ICN",
"INP",
"ISC",
"LOG",
"N",
"PERF",
"PGH",
"PIE",
"PL",
"PT",
"PTH",
"PYI",
"Q",
"RET",
"R",
"RET",
"RSE",
"RUF",
"S",
"SIM",
"SLF",
"T",
"T20",
"TID",
"TRY",
"UP",
"W",
"YTT"
]
[tool.ruff.lint.pydocstyle]
convention = "google"
[tool.ruff.lint.flake8-tidy-imports]
ban-relative-imports = "all"
[tool.ruff.lint.per-file-ignores]
"setup.py" = ["D100"]
[tool.ruff.lint.isort]
known-third-party = ["pydantic"]
known-first-party = ["test", "docs"]
[tool.pytest.ini_options]
markers = [
"noautofixt"
]
filterwarnings = [
"ignore",
"default:::capacity-planner",
]
asyncio_default_fixture_loop_scope = "function"