Skip to content
Snippets Groups Projects
Commit 5da5ce3a authored by Erik Reid's avatar Erik Reid
Browse files

Merge branch 'release/0.1'

parents 25c14572 85065efb
No related branches found
No related tags found
No related merge requests found
Showing
with 1906 additions and 110 deletions
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
build/
dist/
*.egg-info/
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Virtual environments
venv/
.env
.venv/
# PyInstaller
*.manifest
*.spec
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.tox
bom.json
coverage.xml
.cache
.pytest_cache/
# Sphinx documentation
docs/_build/
# Editor / OS junk
.coverage
htmlcov
*.egg-info
__pycache__
.vscode
docs/build
.DS_Store
Thumbs.db
.idea/
.vscode/
*.swp
*.swo
\ No newline at end of file
# drawio tmp files
*.bkp
# Changelog
All notable changes to this project will be documented in this file.
## [0.1] - 2025-06-07
- api prototype/demo
// https://gitlab.geant.net/live-projects/jenkins-pipeline/-/tree/master/vars
library 'SWDPipeline'
// Parameters:
// project_name (must match the name of the project in GitLab/SWD release jenkins)
// extra_recipients (optional, list of email addresses to always receive notifications, in addition to the default in jenkins)
// python_test_versions (list of python versions, resolving to docker tags, to test against)
String name = 'mapping-provider'
List<String> extraRecipients = ['erik.reid@geant.org']
List<String> pythonTestVersions = ['3.11']
SimplePythonBuild(name, extraRecipients, pythonTestVersions)
......@@ -7,6 +7,7 @@ from datetime import datetime
import json
import os
import sys
import tempfile
sys.path.insert(
0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
......@@ -28,7 +29,7 @@ release = '0.0'
extensions = [
"sphinx_rtd_theme",
# "sphinx.ext.autodoc",
"sphinx.ext.autodoc",
"sphinx.ext.coverage",
"sphinxcontrib.plantuml",
"sphinxcontrib.openapi",
......@@ -53,7 +54,30 @@ html_theme = 'sphinx_rtd_theme'
html_static_path = ['_static']
api_schema = create_app().openapi()
# we need a minimal/parseable config file in order to
# start the server and dump the schema
with tempfile.NamedTemporaryFile(delete=False) as f:
bogus_config = {
'inventory': 'http://bogus',
'reporting': 'https://bogus',
'correlator_exchange': 'bogus',
'brian': {
'hostname': 'bogus',
'username': 'bogus',
'password': 'bogus',
'database': 'bogus',
'measurement': 'bogus',
}
}
with open(f.name, 'w') as f:
json.dump(bogus_config, f)
f.flush()
os.environ['SETTINGS_FILENAME'] = f.name
api_schema = create_app().openapi()
openapi_filename = os.path.join(os.path.dirname(__file__), "openapi.json")
with open(openapi_filename, 'w') as f:
json.dump(api_schema, f, indent=4)
mapping_provider package
=========================
Submodules
----------
mapping\_provider.main module
-----------------------------
.. automodule:: mapping_provider.main
:members:
:undoc-members:
:show-inheritance:
Module contents
---------------
.. automodule:: mapping_provider
:members:
:undoc-members:
:show-inheritance:
mapping_provider
================
.. toctree::
:maxdepth: 4
mapping_provider
This diff is collapsed.
......@@ -35,6 +35,24 @@ Ideas
- For the Geant-managed heat maps: maybe nice to agree upon a simple grid-locked geo-node/edge approach
Updated High-Level Architecture
-------------------------------
.. only:: drawio
.. drawio-image:: design.drawio
:page-name: ml-arch
Schematic of SVG Element Management
-------------------------------------
.. only:: drawio
.. drawio-image:: design.drawio
:page-name: svg-class-schematic
High-Level Architecture
-------------------------
......
"""Initializes the FastAPI application."""
"""
Default entry point for the FastAPI application.
"""
import contextlib
import logging
import tempfile
import threading
from collections.abc import AsyncIterator
from functools import partial
from fastapi import FastAPI
from mapping_provider.api.common import router as version_router
from mapping_provider import config, environment
from mapping_provider.api import common, map
from mapping_provider.backends import brian, cache, correlator, inventory
logger = logging.getLogger(__name__)
@contextlib.asynccontextmanager
async def lifespan(app: FastAPI, app_config: config.Configuration) -> AsyncIterator[None]:
if not app_config.rmq:
return # don't start any background tasks
with tempfile.TemporaryDirectory() as _tmp_working_dir:
cache.init(_tmp_working_dir)
stop_event = threading.Event()
_correlator_thread = threading.Thread(
target=correlator.consume_status,
daemon=True,
kwargs={
'correlator_state_broadcast_exchange_name': app_config.correlator_exchange,
'rmq_params': app_config.rmq,
'stop_event': stop_event,
})
_correlator_thread.start()
_inventory_thread = threading.Thread(
target=inventory.worker_proc,
daemon=True,
kwargs={
'inventory_base_uri': app_config.inventory,
'reporting_base_uri': app_config.reporting,
'refresh_frequency_seconds': 300,
'stop_event': stop_event,
})
_inventory_thread.start()
_brian_thread = threading.Thread(
target=brian.worker_proc,
daemon=True,
kwargs={
'brian_params': app_config.brian,
'refresh_frequency_seconds': 300,
'stop_event': stop_event,
})
_brian_thread.start()
yield None # wait here until the app is shutting down (mypy needs a value to be yielded)
stop_event.set()
_correlator_thread.join(timeout=10)
_inventory_thread.join(timeout=10)
logger.info("background threads joined")
def create_app() -> FastAPI:
"""Create a FastAPI application."""
"""
Creates the FastAPI application instance, with routers attached.
"""
environment.setup_logging()
app_config = config.load()
if app_config.sentry:
environment.setup_sentry(app_config.sentry)
app = FastAPI(
title="Mapping provider",
description="Mapping provider endpoints for GÉANT maps",
lifespan=partial(lifespan, app_config=app_config),
)
app.include_router(version_router)
app.include_router(common.router)
app.include_router(map.router, prefix='/map')
return app
from importlib.metadata import PackageNotFoundError, version
from importlib.metadata import version
from fastapi import APIRouter
from pydantic import BaseModel
router = APIRouter()
class Version(BaseModel):
module: str
api: str
@router.get("/version")
def get_version() -> dict[str, str]:
"""Get the version of the package."""
try:
return {"version": version("mapping_provider")}
except PackageNotFoundError:
return {"version": "unknown"}
def get_version() -> Version:
"""
handler for /version
"""
return Version(
module = version('mapping_provider'),
api = '0.1'
)
from typing import Any
import jsonschema
import requests
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from mapping_provider import config
from mapping_provider.backends import services
router = APIRouter()
class Pop(BaseModel):
latitude: float | None
longitude: float | None
name: str
abbreviation: str
city: str
country: str
class PopList(BaseModel):
pops: list[Pop]
class Equipment(BaseModel):
name: str
pop: str
status: str
class EquipmentList(BaseModel):
equipment: list[Equipment]
INPROV_POP_LIST_SCHEMA = {
'$schema': 'https://json-schema.org/draft/2020-12/schema',
'definitions': {
'pop': {
'type': 'object',
'properties': {
'name': {'type': 'string'},
'abbreviation': {'type': 'string'},
'city': {'type': 'string'},
'country': {'type': 'string'},
'latitude': {'type': ['number', 'null']},
'longitude': {'type': ['number', 'null']},
},
'required': ['name', 'abbreviation', 'city', 'country', 'latitude', 'longitude'],
'additionalProperties': True,
},
},
'type': 'array',
'items': {'$ref': '#/definitions/pop'}
}
INPROV_EQUIPMENT_LIST_SCHEMA = {
'$schema': 'https://json-schema.org/draft/2020-12/schema',
'definitions': {
'router': {
'type': 'object',
'properties': {
'name': {'type': 'string'},
'pop': {'type': 'string'},
'status': {'type': 'string'},
},
'required': ['name', 'pop', 'status'],
},
},
'type': 'array',
'items': {'$ref': '#/definitions/router'}
}
@router.get("/pops")
def get_pops() -> PopList:
"""
handler for /pops
"""
# TODO: catch/handle the usual exceptions
app_params = config.load()
rv = requests.get(
f'{app_params.inventory}/map/pops',
headers={'Accept': 'application/json'})
rv.raise_for_status()
pop_list_obj = rv.json()
jsonschema.validate(pop_list_obj, INPROV_POP_LIST_SCHEMA)
def _make_pop(pop_dict: dict[str, Any]) -> Pop:
return Pop(
latitude=pop_dict['latitude'],
longitude=pop_dict['longitude'],
name=pop_dict['name'],
abbreviation=pop_dict['abbreviation'],
city=pop_dict['city'],
country=pop_dict['country'],
)
return PopList(pops=list(map(_make_pop, pop_list_obj)))
@router.get("/equipment")
def get_equipment() -> EquipmentList:
"""
handler for /equipment
"""
# TODO: catch/handle the usual exceptions
app_params = config.load()
rv = requests.get(
f'{app_params.inventory}/map/equipment',
headers={'Accept': 'application/json'})
rv.raise_for_status()
equipment_list_obj = rv.json()
jsonschema.validate(equipment_list_obj, INPROV_EQUIPMENT_LIST_SCHEMA)
def _make_equipment(equipment_dict: dict[str, Any]) -> Equipment:
return Equipment(
name=equipment_dict['name'],
pop=equipment_dict['pop'],
status=equipment_dict['status'],
)
return EquipmentList(equipment=list(map(_make_equipment, equipment_list_obj)))
@router.get("/services")
@router.get("/services/{service_type}")
def get_services(service_type: str | None = None) -> services.ServiceList:
"""
handler for /trunks
"""
return_value = services.build_service_info_list(service_type=service_type)
if not return_value.services:
raise HTTPException(status_code=404, detail=f'unrecognized service type: {service_type}')
return return_value
@router.get("/trunks")
def get_trunks() -> services.ServiceList:
"""
handler for /trunks, same as /services/IP TRUNK
"""
return get_services(service_type='IP TRUNK')
import contextlib
import logging
import time
from collections.abc import Generator
from threading import Event
from typing import Any
from influxdb import InfluxDBClient
from mapping_provider import config
from mapping_provider.backends import cache
logger = logging.getLogger(__name__)
# os.environ['SETTINGS_FILENAME'] = os.path.join(os.path.dirname(__file__), 'config.json')
# params = config.load()
CACHED_BRIAN_SCID_RATES_FILENAME = 'brian-scid-rates.json'
CACHED_SCID_RATES_SCHEMA = {
"$schema": "https://json-schema.org/draft/2020-12/schema",
"definitions": {
"in-out-rates": {
"type": "object",
"properties": {
"ingress": {"type": ["number", "null"]},
"egress": {"type": ["number", "null"]},
},
"required": ["ingress", "egress"],
"additionalProperties": False,
},
"rate-stats": {
"type": "object",
"properties": {
"latest": {"$ref": "#/definitions/in-out-rates"},
"mean": {"$ref": "#/definitions/in-out-rates"},
"max": {"$ref": "#/definitions/in-out-rates"},
},
"required": ["latest", "mean", "max"],
"additionalProperties": False,
},
"service-rate": {
"type": "object",
"properties": {
"scid": {"type": "string"},
"values": {"$ref": "#/definitions/rate-stats"},
},
"required": ["scid", "values"],
"additionalProperties": False,
},
},
"type": "array",
"items": {"$ref": "#/definitions/service-rate"},
}
@contextlib.contextmanager
def influx_client(
influx_params: config.InfluxConnectionParams) -> Generator[InfluxDBClient]:
"""
context manager for creating an influx db connection
exactly one of schema_name or database should be provided
:param influx_params: the 'influx' element from config
:param database: a key from INFLUX_COUNTER_SCHEMAS
:return:
"""
client = None
try:
client = InfluxDBClient(
host=influx_params.hostname,
port=influx_params.port,
database=influx_params.database,
username=influx_params.username,
password=influx_params.password,
ssl=influx_params.ssl,
verify_ssl=influx_params.ssl)
yield client
finally:
if client:
logger.debug('closing influx session')
client.close()
def _load_scid_rates_rows(
influx_params: config.InfluxConnectionParams,
window: str = '24h') -> Generator[dict[str, str | float | None]]:
"""
Return the count of all fields, grouped by
hostname & interface_name.
:param influx_params: the 'influx' element from config
:param schema_name: a key from INFLUX_COUNTER_SCHEMAS
:param window: time filter
:return:
"""
with influx_client(influx_params) as influx:
query = ('select'
' last(ingress) as latest_ingress,'
' last(egress) as latest_egress,'
' mean(ingress) as mean_ingress,'
' mean(egress) as mean_egress,'
' max(ingress) as max_ingress,'
' max(egress) as max_egress'
' from scid_rates'
f' where time > now() - {window}'
' group by scid')
logger.debug(query)
result = influx.query(query)
for series in result.raw['series']:
tags = series['tags']
columns = series['columns']
values = series['values']
assert len(values) == 1
for row in values:
rate = {'scid': tags['scid']}
for field_name, field_value in zip(columns, row, strict=False):
rate[field_name] = field_value
yield rate
def load_scid_rates(influx_params: config.InfluxConnectionParams) -> list[dict[str, Any]]:
rates = []
def _bitrate_or_none(row: dict[str, Any], field_name: str) -> float | None:
_rate = row.get(field_name, None)
assert isinstance(_rate, float | None) # mypy noise
if _rate:
return _rate * 8
return _rate # could be 0 or None
for r in _load_scid_rates_rows(influx_params):
values = {
'latest': {
'ingress': _bitrate_or_none(r, 'latest_ingress'),
'egress': _bitrate_or_none(r, 'latest_egress')
},
'mean': {
'ingress': _bitrate_or_none(r, 'mean_ingress'),
'egress': _bitrate_or_none(r, 'mean_egress')
},
'max': {
'ingress': _bitrate_or_none(r, 'max_ingress'),
'egress': _bitrate_or_none(r, 'max_egress')
},
}
rates.append({
'scid': r['scid'],
'values': values
})
cache.set(CACHED_BRIAN_SCID_RATES_FILENAME, rates)
return rates # <-- caller can also retrieve this from the cache
def worker_proc(
brian_params: config.InfluxConnectionParams,
refresh_frequency_seconds: int,
stop_event: Event | None = None) -> None:
def _should_stop() -> bool:
return stop_event.is_set() if stop_event else False
while not _should_stop():
load_scid_rates(brian_params)
# wait and the restart the loop
if stop_event:
stop_event.wait(timeout=refresh_frequency_seconds)
else:
time.sleep(refresh_frequency_seconds)
"""
Trivial caching module for the mapping provider.
The global variable `cache_dir` is set during FastAPI server startup.
Could be replaced with a proper caching module in the future, or maybe
just a simple dict.
"""
import json
import logging
import os
from typing import Any
logger = logging.getLogger(__name__)
_cache_dir: str | None = None
def init(cache_dir: str) -> None:
global _cache_dir
assert _cache_dir is None, "cache_dir has already been initialized"
_cache_dir = cache_dir
logger.debug(f"set cache directory: {_cache_dir}")
def set(filename: str, data: Any) -> None:
"""
data must be JSON-serializable.
"""
assert _cache_dir is not None, "cache_dir hasn't been initialized"
with open(os.path.join(_cache_dir, filename), 'w') as f:
f.write(json.dumps(data))
logger.debug(f"wrote cached data: {filename}")
def get(filename: str) -> Any:
"""
Loads the cached data, parses it as json & returns the object.
"""
assert _cache_dir is not None, "cache_dir hasn't been initialized"
with open(os.path.join(_cache_dir, filename)) as f:
logger.debug(f"reading cached data: {filename}")
return json.load(f)
import logging
import os
import socket
import threading
import time
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from mapping_provider.config import RMQConnectionParams
from pika.exchange_type import ExchangeType
from . import cache
from .rmq.exchange import RabbitMQClient, default_rmq_connection_params
logger = logging.getLogger(__name__)
CACHED_CORRELATOR_STATE_FILENAME = 'correlator-state.json'
ALARM_STATE_MESSAGE_SCHEMA = {
"$schema": "https://json-schema.org/draft/2020-12/schema",
"definitions": {
"list-of-strings": {
# shorthand for a repeated type ...
"type": "array",
"items": {"type": "string"},
},
"location": {
"type": "object",
"properties": {
"site": {"type": "string"},
"equipment": {"type": "string"}
},
},
"severity": {
"type": "string",
# TODO: gen from AlarmSeverity
"enum": ["HIDE", "WARNING", "MINOR", "MAJOR", "CRITICAL"],
},
"endpoint": {
"type": "object",
"properties": {
"uuid": {"type": "string"},
"type": {"type": "string"},
"name": {"type": "string"},
"projects": {"$ref": "#/definitions/list-of-strings"},
"alarm": {"type": "string"},
"init_time": {"type": "number"},
"last_activity_ts": {"type": "number"},
"services": {"type": "array", "items": {"type": "object"}},
"locations": {
"type": "array",
"items": {"$ref": "#/definitions/location"},
},
# not needed - sanity checks only
"event_history": {"$ref": "#/definitions/list-of-strings"},
"up": {"type": "boolean"},
# skipping validation of type-specific params
# such as peer, router, interfaces, snmp, asn, etc
},
"required": [
"uuid",
"type",
"name",
"alarm",
"projects",
"init_time",
"last_activity_ts",
"locations",
"event_history",
"up",
],
},
"alarm": {
"type": "object",
"properties": {
"uuid": {"type": "string"},
"db_id": {"type": "integer"},
"phase": {
"type": "string",
"enum": ["PENDING", "FINALIZED", "KILL_ME"],
},
"final_severity": {
# this field is null until phase is FINALIZED
"oneOf": [{"$ref": "#/definitions/severity"}, {"type": "null"}]
},
"severity": {"$ref": "#/definitions/severity"},
"state": {
"type": "string",
"enum": ["OPEN", "CLOSED"], # TODO: gen from AlarmState
},
# note: list-of-strings is in _ENDPOINT_SCHEMA_DEFS
"endpoints": {"$ref": "#/definitions/list-of-strings"},
"description": {"type": "string"},
"devoured": {"type": "array", "items": {"type": "integer"}},
},
"required": [
"uuid",
"db_id",
"phase",
"severity",
"final_severity",
"state",
"endpoints",
"description",
],
},
},
"type": "object",
"properties": {
"alarms": {"type": "array", "items": {"$ref": "#/definitions/alarm"}},
"endpoints": {"type": "array", "items": {"$ref": "#/definitions/endpoint"}},
},
"required": ["alarms", "endpoints"],
}
def handle_correlator_state_broadcast(message: dict[str, Any]) -> None:
cache.set(CACHED_CORRELATOR_STATE_FILENAME, message)
def consume_status(
correlator_state_broadcast_exchange_name: str,
rmq_params: 'RMQConnectionParams',
stop_event: threading.Event | None = None) -> None:
"""
Consume the correlator state broadcast exchange forever.
When a message is received, update the global availability state cache.
:param correlator_state_broadcast_exchange_name:
The name of the correlator state broadcast exchange.
:param rmq_params: RabbitMQ connection parameters.
"""
cp = default_rmq_connection_params(
hostname=rmq_params.brokers,
username=rmq_params.username,
password=rmq_params.password,
vhost=rmq_params.vhost)
queue_name = (
f"mapping-provider-{socket.getfqdn()}"
f"-{os.getpid()}.{threading.get_ident()}.{time.time()}"
)
client = RabbitMQClient(
connection_params=cp,
exchange_name=correlator_state_broadcast_exchange_name,
exchange_type=ExchangeType.fanout,
queue_name=queue_name,
exclusive=True,
reconnect_on_idle_timeout=120,
stop_event=stop_event)
client.consume_forever(
callback=handle_correlator_state_broadcast,
json=True,
schema=ALARM_STATE_MESSAGE_SCHEMA)
import concurrent.futures
import logging
import time
from threading import Event
from typing import Any
import jsonschema
import requests
from . import cache
logger = logging.getLogger(__name__)
REPORTING_SCID_CURRENT_CACHE_FILENAME = 'reporting-scid-current.json'
INPROV_EQUIPMENT_CACHE_FILENAME = 'inprov-equipment.json'
REPORTING_SCID_CURRENT_CACHE_SCHEMA = {
'$schema': 'https://json-schema.org/draft/2020-12/schema',
'definitions': {
'interface': {
'type': 'object',
'properties': {
'hostname': {'type': 'string'},
'interface': {'type': 'string'},
# 'addresses': {
# 'type': 'array',
# 'items': {'type': 'string'}
# }
},
'required': ['hostname', 'interface']
},
'lambda_interface': {
'type': 'object',
'properties': {
'equipment': {'type': 'string'},
'port': {'type': 'string'},
},
'required': ['equipment', 'port']
},
'service': {
'type': 'object',
'properties': {
'scid': {'type': 'string'},
'sid': {'type': 'string'},
'name': {'type': 'string'},
'speed': {'type': 'integer'},
'status': {'type': 'string'},
# 'monitored': {'type': 'boolean'},
'service_type': {'type': ['string', 'null']},
# 'imsid': {'type': 'integer'},
# 'customers': {
# 'type': 'array',
# 'items': {'type': 'string'}
# },
'endpoints': {
'type': 'array',
'items': {
'anyOf': [
{'$ref': '#/definitions/interface'},
{'$ref': '#/definitions/lambda_interface'},
]
}
}
},
'required': ['scid', 'sid', 'name', 'speed', 'status', 'service_type', 'endpoints'],
# 'required': ['scid', 'sid', 'name',
# 'speed', 'status', 'monitored',
# 'service_type', 'imsid', 'customers', 'endpoints'],
# 'additionalProperties': False
},
},
'type': 'array',
'items': {'$ref': '#/definitions/service'}
}
INPROV_EQUIPMENT_LIST_SCHEMA = {
'$schema': 'https://json-schema.org/draft/2020-12/schema',
'definitions': {
'equipment': {
'type': 'object',
'properties': {
'name': {'type': 'string'},
'pop': {'type': 'string'},
'status': {'type': 'string'},
},
'required': ['name', 'pop', 'status'],
# 'additionalProperties': False
},
},
'type': 'array',
'items': {'$ref': '#/definitions/equipment'}
}
def _load_and_cache_json(
key: str,
url: str,
cache_filename: str,
schema: dict[str, Any] | None = None) -> dict[str, Any]:
"""
Load the JSON from the URL, return and cache it.
:param key: key to return with the loaded data for distinguishing futures
:param url: URL to load the data from
:param cache_filename: filename to cache the data under
"""
# TODO: proper error handling
# TODO: maybe try to return cached data if http fails?
rv = requests.get(url)
rv.raise_for_status()
rsp_object = rv.json()
if schema:
jsonschema.validate(instance=rsp_object, schema=schema)
cache.set(cache_filename, rsp_object)
return {
'key': key,
'value': rsp_object
}
def _load_all_inventory(inventory_base_uri: str, reporting_base_uri: str) -> dict[str, Any]:
"""
Load and process service info from inventory+reporting provider.
:param inventory_base_uri: base URI of the inventory provider
:param reporting_base_uri: base URI of the reporting provider
:return: list of services
"""
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
futures = [
executor.submit(
_load_and_cache_json,
key='map-services',
url=f'{inventory_base_uri}/map/equipment',
cache_filename=INPROV_EQUIPMENT_CACHE_FILENAME,
schema=INPROV_EQUIPMENT_LIST_SCHEMA),
executor.submit(
_load_and_cache_json,
key='scid-current',
url=f'{reporting_base_uri}/scid/current',
cache_filename=REPORTING_SCID_CURRENT_CACHE_FILENAME,
schema=REPORTING_SCID_CURRENT_CACHE_SCHEMA),
]
responses = {}
for _f in concurrent.futures.as_completed(futures):
responses[_f.result()['key']] = _f.result()['value']
return responses
def worker_proc(
inventory_base_uri: str,
reporting_base_uri: str,
refresh_frequency_seconds: int,
stop_event: Event | None = None) -> None:
def _should_stop() -> bool:
return stop_event.is_set() if stop_event else False
while not _should_stop():
_load_all_inventory(inventory_base_uri, reporting_base_uri)
# wait and the restart the loop
if stop_event:
stop_event.wait(timeout=refresh_frequency_seconds)
else:
time.sleep(refresh_frequency_seconds)
This diff is collapsed.
"""
copied from: dashboard-v3-python v0.226, dashboard.messaging.queue
updated typehints to satisfy mypy, and linted with changes suggested by ruff
"""
import logging
import os
from collections.abc import Sequence
from typing import Any
from pika.adapters.blocking_connection import BlockingChannel, BlockingConnection
from pika.exceptions import ChannelClosedByBroker
from pika.exchange_type import ExchangeType
DASHBOARD_USE_QUORUM_QUEUE = os.getenv(
"DASHBOARD_USE_QUORUM_QUEUE", ""
).lower() not in {"0", "", "false"}
logger = logging.getLogger(__name__)
def loose_queue_declare(
channel: BlockingChannel,
queue: str = "",
exclusive: bool = False,
single_active_consumer: bool = False,
force_quorum_queue: bool = False,
) -> str | None:
"""declare a queue (either classic or quorum, depending on the USE_QUORUM_QUEUE
global variable and if it is a ``dashboard.*`` queue). In case of a failure that
the queue already exists as a different type, this function fails silently.
:param channel: the channel to use
:param queue: the queue name. If empty string, the broker will create a unique queue
name (default: '')
:param kwargs: additional parameters to pass to ``channel.queue_declare``
"""
durable = False
arguments: dict[str, Any] = {}
if force_quorum_queue or (
DASHBOARD_USE_QUORUM_QUEUE and queue.startswith("dashboard.")
):
durable = True
arguments["x-queue-type"] = "quorum"
if single_active_consumer:
arguments["x-single-active-consumer"] = True
try:
result = channel.queue_declare(
queue, durable=durable, exclusive=exclusive, arguments=arguments or None
)
assert isinstance(result.method.queue, str) # for mypy
return result.method.queue
except ChannelClosedByBroker as e:
if e.reply_code == 406: # PRECONDITION_FAILED due to incompatible queue type
requested_type, existing_type = "classic", "quorum"
if DASHBOARD_USE_QUORUM_QUEUE:
requested_type, existing_type = existing_type, requested_type
logger.warning(
f"Trying to declare {requested_type} queue '{queue}'"
f" but queue already exists as {existing_type} queue"
)
return None
raise
def setup_channel(
connection: BlockingConnection,
exchange_name: str,
exchange_type: ExchangeType = ExchangeType.fanout,
queue_name: str | None = None,
queue_declare: bool = True,
exclusive: bool = False,
single_active_consumer: bool = False,
routing_keys: Sequence[str] = [],
prefetch_count: int | None = None,
force_quorum_queue: bool = False,
) -> tuple[BlockingChannel, str | None]:
"""Setup a channel and declare the exchange and optionally the queue.
:param connection: A ``pika`` ``BlockingConnection``
:param exchange_name: the exchange to declare
:param exchange_type: the exchange type (default: ``fanout``)
:param queue_name: The queue to bind to, if given. Can be set to empty string to
get a temporary queue from the broker. When queue_name=None, the channel can
only be used for publishing (default: None)
:param queue_declare: Whether to declare the queue before binding (default: True)
:param exclusive: Whether this should be declared as an exclusive queue (default:
False)
:param routing_keys: Optional routing keys to bind to the queue.
:returns: A tuple (channel: ``BlockingChannel``, queue_name: Optional[str])
"""
channel = connection.channel()
if prefetch_count is not None:
channel.basic_qos(prefetch_count=prefetch_count)
if exchange_name != "": # cannot declare default exchange
channel.exchange_declare(exchange=exchange_name, exchange_type=exchange_type)
if queue_name is None:
return channel, None
if queue_name == "" and not queue_declare:
raise ValueError("must set queue_declare=True when supplying empty queue name")
if queue_name == "" and not exclusive:
raise ValueError("must set exclusive=True for anonymous queues")
if exclusive and single_active_consumer:
raise ValueError("Exclusive queues cannot have single active consumer")
if queue_declare:
if exclusive:
# Short circuit exclusive queues that are never quorum and never
# pre-exist
result = channel.queue_declare(queue_name, exclusive=True)
queue_name = result.method.queue
else:
queue_name = loose_queue_declare(
channel,
queue_name,
exclusive=exclusive,
single_active_consumer=single_active_consumer,
force_quorum_queue=force_quorum_queue,
)
# if a queue declare fails, the channel is in an unusable state.
# Start over but skip the declare
if queue_name is None:
return setup_channel(
connection=connection,
exchange_name=exchange_name,
exchange_type=exchange_type,
queue_name=queue_name,
queue_declare=False,
exclusive=exclusive,
routing_keys=routing_keys,
prefetch_count=prefetch_count,
single_active_consumer=single_active_consumer,
force_quorum_queue=force_quorum_queue,
)
assert queue_name, "queue name must not be empty here"
if not routing_keys:
# in case no routing keys are provided (as for fanout exchanges),
# ensure the queue is still bound to the exchange
channel.queue_bind(exchange=exchange_name, queue=queue_name)
for rk in routing_keys:
channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key=rk)
return channel, queue_name
import logging
import re
from collections.abc import Generator
from typing import Any
from pydantic import BaseModel
from . import brian, cache, correlator, inventory
logger = logging.getLogger(__name__)
class BitRates(BaseModel):
ingress: float | None
egress: float | None
class Overlays(BaseModel):
speed: int
up: bool
latest: BitRates
mean: BitRates
max: BitRates
class Service(BaseModel):
sid: str
scid: str
name: str
type: str
pops: list[str]
# TODO: temporarily removed for simplicity (first map POC is only POP-based)
# equipment: list[str]
overlays: Overlays
class ServiceList(BaseModel):
services: list[Service]
def endpoint_equipment(endpoint: dict[str, Any]) -> str:
"""
convert the correlator router hostname or optical equipment name
to the inventory equipment format
"""
def _hostname_to_equipment(_hn: str) -> str:
m = re.match(r'^(.+)\.geant\.net$', _hn)
if not m:
logger.error(f'unexpected hostname pattern: {_hn}')
return '?'
return m.group(1).upper()
if 'hostname' in endpoint:
return _hostname_to_equipment(endpoint['hostname'])
elif 'equipment' in endpoint:
_name = endpoint['equipment']
assert isinstance(_name, str) # mypy noise
return _name
# should already be validated
raise AssertionError(f'no equipment or hostname in endpoint: {endpoint}')
def _services(service_type: str | None = None) -> Generator[Service]:
"""
load the cached backend data and yield map service records
only return operational services that match the service type, if provided
"""
try:
scid_current = cache.get(inventory.REPORTING_SCID_CURRENT_CACHE_FILENAME)
correlator_state = cache.get(correlator.CACHED_CORRELATOR_STATE_FILENAME)
brian_rates = cache.get(brian.CACHED_BRIAN_SCID_RATES_FILENAME)
equipment_list = cache.get(inventory.INPROV_EQUIPMENT_CACHE_FILENAME)
except FileNotFoundError:
logger.exception('not enough data available to build the service list')
return
def _get_down_correlator_services() -> Generator[str]:
for _e in correlator_state['endpoints']:
if _e['up']:
continue
for _s in _e['services']:
if 'sid' in _s:
yield _s['sid']
down_service_sids = set(_get_down_correlator_services())
brian_scid_rates = {r['scid']: r['values'] for r in brian_rates}
equipment_dict = {_x['name']: _x for _x in equipment_list}
def _get_equipment_pop(equipment_name: str) -> str:
if equipment_name not in equipment_dict:
# TODO: is this really possible if all data is read from IMS at the same time?
logger.error(f'unknown endpoint equipment: {equipment_name}')
return '?'
_pop_name = equipment_dict[equipment_name]['pop']
assert isinstance(_pop_name, str) # mypy noise
return _pop_name
for _s in scid_current:
if _s['status'] != 'operational':
continue
if service_type and _s['service_type'] != service_type:
continue
equipment = sorted(set(map(endpoint_equipment, _s['endpoints'])))
pops = sorted(set(map(_get_equipment_pop, equipment)))
rates = brian_scid_rates.get(_s['scid'], {})
overlays = Overlays(
speed = _s['speed'],
up = _s['sid'] not in down_service_sids,
latest = BitRates(
egress = rates.get('latest', {}).get('egress'),
ingress = rates.get('latest', {}).get('ingress'),
),
mean = BitRates(
egress = rates.get('mean', {}).get('egress'),
ingress = rates.get('mean', {}).get('ingress'),
),
max = BitRates(
egress = rates.get('max', {}).get('egress'),
ingress = rates.get('max', {}).get('ingress'),
),
)
yield Service(
sid = _s['sid'],
scid = _s['scid'],
name = _s['name'],
type = _s['service_type'],
pops = pops,
# TODO: temporarily removed for simplicity (first map POC is only POP-based)
# equipment = equipment,
overlays = overlays,
)
def build_service_info_list(service_type: str | None = None) -> ServiceList:
"""
return a list of mappable info about all operational services
"""
return ServiceList(services=list(_services(service_type)))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment