Skip to content
Snippets Groups Projects
Select Git revision
  • 3889a7dbae91cf4b318ec3fe090cc81401fc58d0
  • develop default
  • master protected
  • NGM-30-hover-popup
  • svg-test
  • master-conflicted
  • 0.12
  • 0.11
  • 0.10
  • 0.9
  • 0.8
  • 0.7
  • 0.6
  • 0.5
  • 0.4
  • 0.3
  • 0.2
  • 0.1
18 results

services.py

Blame
  • services.py 4.46 KiB
    import functools
    import logging
    import re
    from collections.abc import Generator
    from typing import Any
    
    from pydantic import BaseModel
    
    from . import brian, cache, correlator, inventory
    
    logger = logging.getLogger(__name__)
    
    
    class BitRates(BaseModel):
        ingress: float | None
        egress: float | None
    
    
    class Overlays(BaseModel):
        speed: int
        up: bool
        latest: BitRates
        mean: BitRates
        max: BitRates
    
    
    class Service(BaseModel):
        sid: str
        scid: str
        name: str
        type: str
        pops: list[str]
        # TODO: temporarily removed for simplicity (first map POC is only POP-based)
        # equipment: list[str]
        overlays: Overlays
    
    
    class ServiceList(BaseModel):
        services: list[Service]
    
    
    def endpoint_equipment(endpoint: dict[str, Any]) -> str:
        """
        convert the correlator router hostname or optical equipment name
        to the inventory equipment format
        """
        def _hostname_to_equipment(_hn: str) -> str:
            m = re.match(r'^(.+)\.geant\.net$', _hn)
            if not m:
                logger.error(f'unexpected hostname pattern: {_hn}')
                return '?'
            return m.group(1).upper()
    
        if 'hostname' in endpoint:
            return _hostname_to_equipment(endpoint['hostname'])
        elif 'equipment' in endpoint:
            return endpoint['equipment']
    
        # should already be validated
        raise AssertionError(f'no equipment or hostname in endpoint: {endpoint}')
    
    
    def _services(service_type: str | None = None) -> Generator[Service]:
        """
        load the cached backend data and yield map service records
    
        only return operational services that match the service type, if provided
        """
        try:
            scid_current = cache.get(inventory.REPORTING_SCID_CURRENT_CACHE_FILENAME)
            correlator_state = cache.get(correlator.CACHED_CORRELATOR_STATE_FILENAME)
            brian_rates = cache.get(brian.CACHED_BRIAN_SCID_RATES_FILENAME)
            equipment_list = cache.get(inventory.INPROV_EQUIPMENT_CACHE_FILENAME)
        except FileNotFoundError:
            logger.exception('not enough data available to build the service list')
            return
    
        def _get_down_correlator_services() -> Generator[str]:
            for _e in correlator_state['endpoints']:
                if _e['up']:
                    continue
                for _s in _e['services']:
                    if 'sid' in _s:
                        yield _s['sid']
    
    
        down_service_sids = set(_get_down_correlator_services())
        brian_scid_rates = {r['scid']: r['values'] for r in brian_rates}
    
        equipment_dict = {_x['name']: _x for _x in equipment_list}
    
        def _get_equipment_pop(equipment_name: str) -> str:
            if equipment_name not in equipment_dict:
                # TODO: is this really possible if all data is read from IMS at the same time?
                logger.error(f'unknown endpoint equipment: {equipment_name}')
                return '?'
            _pop_name = equipment_dict[equipment_name]['pop']
            assert isinstance(_pop_name, str)  # mypy noise
            return _pop_name
    
        for _s in scid_current:
    
            if _s['status'] != 'operational':
                continue
    
            if service_type and _s['service_type'] != service_type:
                continue
    
            equipment = sorted(set(map(endpoint_equipment, _s['endpoints'])))
            pops = sorted(set(map(_get_equipment_pop, equipment)))
            
            rates = brian_scid_rates.get(_s['scid'], {})
            overlays = Overlays(
                speed = _s['speed'],
                up = _s['sid'] not in down_service_sids,
                latest = BitRates(
                    egress = rates.get('latest', {}).get('egress'),
                    ingress = rates.get('latest', {}).get('ingress'),
                ),
                mean = BitRates(
                    egress = rates.get('mean', {}).get('egress'),
                    ingress = rates.get('mean', {}).get('ingress'),
                ),
                max = BitRates(
                    egress = rates.get('max', {}).get('egress'),
                    ingress = rates.get('max', {}).get('ingress'),
                ),
            )
    
            yield Service(
                sid = _s['sid'],
                scid = _s['scid'],
                name = _s['name'],
                type = _s['service_type'],
                pops = pops,
                # TODO: temporarily removed for simplicity (first map POC is only POP-based)
                # equipment = equipment,
                overlays = overlays,
            )
    
    
    def build_service_info_list(service_type: str | None = None) -> ServiceList:
        """
        return a list of mappable info about all operational services
        """  
        return ServiceList(services=list(_services(service_type)))