Select Git revision
services.py 4.46 KiB
import functools
import logging
import re
from collections.abc import Generator
from typing import Any
from pydantic import BaseModel
from . import brian, cache, correlator, inventory
logger = logging.getLogger(__name__)
class BitRates(BaseModel):
ingress: float | None
egress: float | None
class Overlays(BaseModel):
speed: int
up: bool
latest: BitRates
mean: BitRates
max: BitRates
class Service(BaseModel):
sid: str
scid: str
name: str
type: str
pops: list[str]
# TODO: temporarily removed for simplicity (first map POC is only POP-based)
# equipment: list[str]
overlays: Overlays
class ServiceList(BaseModel):
services: list[Service]
def endpoint_equipment(endpoint: dict[str, Any]) -> str:
"""
convert the correlator router hostname or optical equipment name
to the inventory equipment format
"""
def _hostname_to_equipment(_hn: str) -> str:
m = re.match(r'^(.+)\.geant\.net$', _hn)
if not m:
logger.error(f'unexpected hostname pattern: {_hn}')
return '?'
return m.group(1).upper()
if 'hostname' in endpoint:
return _hostname_to_equipment(endpoint['hostname'])
elif 'equipment' in endpoint:
return endpoint['equipment']
# should already be validated
raise AssertionError(f'no equipment or hostname in endpoint: {endpoint}')
def _services(service_type: str | None = None) -> Generator[Service]:
"""
load the cached backend data and yield map service records
only return operational services that match the service type, if provided
"""
try:
scid_current = cache.get(inventory.REPORTING_SCID_CURRENT_CACHE_FILENAME)
correlator_state = cache.get(correlator.CACHED_CORRELATOR_STATE_FILENAME)
brian_rates = cache.get(brian.CACHED_BRIAN_SCID_RATES_FILENAME)
equipment_list = cache.get(inventory.INPROV_EQUIPMENT_CACHE_FILENAME)
except FileNotFoundError:
logger.exception('not enough data available to build the service list')
return
def _get_down_correlator_services() -> Generator[str]:
for _e in correlator_state['endpoints']:
if _e['up']:
continue
for _s in _e['services']:
if 'sid' in _s:
yield _s['sid']
down_service_sids = set(_get_down_correlator_services())
brian_scid_rates = {r['scid']: r['values'] for r in brian_rates}
equipment_dict = {_x['name']: _x for _x in equipment_list}
def _get_equipment_pop(equipment_name: str) -> str:
if equipment_name not in equipment_dict:
# TODO: is this really possible if all data is read from IMS at the same time?
logger.error(f'unknown endpoint equipment: {equipment_name}')
return '?'
_pop_name = equipment_dict[equipment_name]['pop']
assert isinstance(_pop_name, str) # mypy noise
return _pop_name
for _s in scid_current:
if _s['status'] != 'operational':
continue
if service_type and _s['service_type'] != service_type:
continue
equipment = sorted(set(map(endpoint_equipment, _s['endpoints'])))
pops = sorted(set(map(_get_equipment_pop, equipment)))
rates = brian_scid_rates.get(_s['scid'], {})
overlays = Overlays(
speed = _s['speed'],
up = _s['sid'] not in down_service_sids,
latest = BitRates(
egress = rates.get('latest', {}).get('egress'),
ingress = rates.get('latest', {}).get('ingress'),
),
mean = BitRates(
egress = rates.get('mean', {}).get('egress'),
ingress = rates.get('mean', {}).get('ingress'),
),
max = BitRates(
egress = rates.get('max', {}).get('egress'),
ingress = rates.get('max', {}).get('ingress'),
),
)
yield Service(
sid = _s['sid'],
scid = _s['scid'],
name = _s['name'],
type = _s['service_type'],
pops = pops,
# TODO: temporarily removed for simplicity (first map POC is only POP-based)
# equipment = equipment,
overlays = overlays,
)
def build_service_info_list(service_type: str | None = None) -> ServiceList:
"""
return a list of mappable info about all operational services
"""
return ServiceList(services=list(_services(service_type)))