Skip to content
Snippets Groups Projects
inventory.py 5.41 KiB
import concurrent.futures
import logging
import time
from threading import Event
from typing import Any

import jsonschema
import requests

from . import cache

logger = logging.getLogger(__name__)

REPORTING_SCID_CURRENT_CACHE_FILENAME = 'reporting-scid-current.json'
INPROV_EQUIPMENT_CACHE_FILENAME = 'inprov-equipment.json'


REPORTING_SCID_CURRENT_CACHE_SCHEMA = {
    '$schema': 'https://json-schema.org/draft/2020-12/schema',
    'definitions': {

        'interface': {
            'type': 'object',
            'properties': {
                'hostname': {'type': 'string'},
                'interface': {'type': 'string'},
                # 'addresses': {
                #     'type': 'array',
                #     'items': {'type': 'string'}
                # }
            },
            'required': ['hostname', 'interface']
        },

        'lambda_interface': {
            'type': 'object',
            'properties': {
                'equipment': {'type': 'string'},
                'port': {'type': 'string'},
            },
            'required': ['equipment', 'port']
        },

        'service': {
            'type': 'object',
            'properties': {
                'scid': {'type': 'string'},
                'sid': {'type': 'string'},
                'name': {'type': 'string'},
                'speed': {'type': 'integer'},
                'status': {'type': 'string'},
                # 'monitored': {'type': 'boolean'},
                'service_type': {'type': ['string', 'null']},
                # 'imsid': {'type': 'integer'},
                # 'customers': {
                #     'type': 'array',
                #     'items': {'type': 'string'}
                # },
                'endpoints': {
                    'type': 'array',
                    'items': {
                        'anyOf': [
                            {'$ref': '#/definitions/interface'},
                            {'$ref': '#/definitions/lambda_interface'},
                        ]
                    }
                }
            },
            'required': ['scid', 'sid', 'name', 'speed', 'status', 'service_type', 'endpoints'],
            # 'required': ['scid', 'sid', 'name',
            #              'speed', 'status', 'monitored',
            #              'service_type', 'imsid', 'customers', 'endpoints'],
            # 'additionalProperties': False
        },
    },

    'type': 'array',
    'items': {'$ref': '#/definitions/service'}
}


INPROV_EQUIPMENT_LIST_SCHEMA = {
    '$schema': 'https://json-schema.org/draft/2020-12/schema',

    'definitions': {
        'equipment': {
            'type': 'object',
            'properties': {
                'name': {'type': 'string'},
                'pop': {'type': 'string'},
                'status': {'type': 'string'},
            },
            'required': ['name', 'pop', 'status'],
            # 'additionalProperties': False
        },
    },

    'type': 'array',
    'items': {'$ref': '#/definitions/equipment'}
}


def _load_and_cache_json(
        key: str,
        url: str,
        cache_filename: str,
        schema: dict[str, Any] | None = None) -> dict[str, Any]:
    """
    Load the JSON from the URL, return and cache it.

    :param key: key to return with the loaded data for distinguishing futures
    :param url: URL to load the data from
    :param cache_filename: filename to cache the data under
    """
    # TODO: proper error handling
    # TODO: maybe try to return cached data if http fails?
    rv = requests.get(url)
    rv.raise_for_status()

    rsp_object = rv.json()
    if schema:
        jsonschema.validate(instance=rsp_object, schema=schema)

    cache.set(cache_filename, rsp_object)
    return {
        'key': key,
        'value': rsp_object
    }


def _load_all_inventory(inventory_base_uri: str, reporting_base_uri: str) -> dict[str, Any]:
    """
    Load and process service info from inventory+reporting provider.

    :param inventory_base_uri: base URI of the inventory provider
    :param reporting_base_uri: base URI of the reporting provider
    :return: list of services
    """

    with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
        futures = [
            executor.submit(
                _load_and_cache_json,
                key='map-services',
                url=f'{inventory_base_uri}/map/equipment',
                cache_filename=INPROV_EQUIPMENT_CACHE_FILENAME,
                schema=INPROV_EQUIPMENT_LIST_SCHEMA),
            executor.submit(
                _load_and_cache_json,
                key='scid-current',
                url=f'{reporting_base_uri}/scid/current',
                cache_filename=REPORTING_SCID_CURRENT_CACHE_FILENAME,
                schema=REPORTING_SCID_CURRENT_CACHE_SCHEMA),
        ]
        responses = {}
        for _f in concurrent.futures.as_completed(futures):
            responses[_f.result()['key']] = _f.result()['value']

    return responses


def worker_proc(
        inventory_base_uri: str,
        reporting_base_uri: str,
        refresh_frequency_seconds: int,
        stop_event: Event | None = None) -> None:

    def _should_stop() -> bool:
        return stop_event.is_set() if stop_event else False

    while not _should_stop():

        _load_all_inventory(inventory_base_uri, reporting_base_uri)

        # wait and the restart the loop
        if stop_event:
            stop_event.wait(timeout=refresh_frequency_seconds)
        else:
            time.sleep(refresh_frequency_seconds)