inventory.py 5.41 KiB
import concurrent.futures
import logging
import time
from threading import Event
from typing import Any
import jsonschema
import requests
from . import cache
logger = logging.getLogger(__name__)
REPORTING_SCID_CURRENT_CACHE_FILENAME = 'reporting-scid-current.json'
INPROV_EQUIPMENT_CACHE_FILENAME = 'inprov-equipment.json'
REPORTING_SCID_CURRENT_CACHE_SCHEMA = {
'$schema': 'https://json-schema.org/draft/2020-12/schema',
'definitions': {
'interface': {
'type': 'object',
'properties': {
'hostname': {'type': 'string'},
'interface': {'type': 'string'},
# 'addresses': {
# 'type': 'array',
# 'items': {'type': 'string'}
# }
},
'required': ['hostname', 'interface']
},
'lambda_interface': {
'type': 'object',
'properties': {
'equipment': {'type': 'string'},
'port': {'type': 'string'},
},
'required': ['equipment', 'port']
},
'service': {
'type': 'object',
'properties': {
'scid': {'type': 'string'},
'sid': {'type': 'string'},
'name': {'type': 'string'},
'speed': {'type': 'integer'},
'status': {'type': 'string'},
# 'monitored': {'type': 'boolean'},
'service_type': {'type': ['string', 'null']},
# 'imsid': {'type': 'integer'},
# 'customers': {
# 'type': 'array',
# 'items': {'type': 'string'}
# },
'endpoints': {
'type': 'array',
'items': {
'anyOf': [
{'$ref': '#/definitions/interface'},
{'$ref': '#/definitions/lambda_interface'},
]
}
}
},
'required': ['scid', 'sid', 'name', 'speed', 'status', 'service_type', 'endpoints'],
# 'required': ['scid', 'sid', 'name',
# 'speed', 'status', 'monitored',
# 'service_type', 'imsid', 'customers', 'endpoints'],
# 'additionalProperties': False
},
},
'type': 'array',
'items': {'$ref': '#/definitions/service'}
}
INPROV_EQUIPMENT_LIST_SCHEMA = {
'$schema': 'https://json-schema.org/draft/2020-12/schema',
'definitions': {
'equipment': {
'type': 'object',
'properties': {
'name': {'type': 'string'},
'pop': {'type': 'string'},
'status': {'type': 'string'},
},
'required': ['name', 'pop', 'status'],
# 'additionalProperties': False
},
},
'type': 'array',
'items': {'$ref': '#/definitions/equipment'}
}
def _load_and_cache_json(
key: str,
url: str,
cache_filename: str,
schema: dict[str, Any] | None = None) -> dict[str, Any]:
"""
Load the JSON from the URL, return and cache it.
:param key: key to return with the loaded data for distinguishing futures
:param url: URL to load the data from
:param cache_filename: filename to cache the data under
"""
# TODO: proper error handling
# TODO: maybe try to return cached data if http fails?
rv = requests.get(url)
rv.raise_for_status()
rsp_object = rv.json()
if schema:
jsonschema.validate(instance=rsp_object, schema=schema)
cache.set(cache_filename, rsp_object)
return {
'key': key,
'value': rsp_object
}
def _load_all_inventory(inventory_base_uri: str, reporting_base_uri: str) -> dict[str, Any]:
"""
Load and process service info from inventory+reporting provider.
:param inventory_base_uri: base URI of the inventory provider
:param reporting_base_uri: base URI of the reporting provider
:return: list of services
"""
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
futures = [
executor.submit(
_load_and_cache_json,
key='map-services',
url=f'{inventory_base_uri}/map/equipment',
cache_filename=INPROV_EQUIPMENT_CACHE_FILENAME,
schema=INPROV_EQUIPMENT_LIST_SCHEMA),
executor.submit(
_load_and_cache_json,
key='scid-current',
url=f'{reporting_base_uri}/scid/current',
cache_filename=REPORTING_SCID_CURRENT_CACHE_FILENAME,
schema=REPORTING_SCID_CURRENT_CACHE_SCHEMA),
]
responses = {}
for _f in concurrent.futures.as_completed(futures):
responses[_f.result()['key']] = _f.result()['value']
return responses
def worker_proc(
inventory_base_uri: str,
reporting_base_uri: str,
refresh_frequency_seconds: int,
stop_event: Event | None = None) -> None:
def _should_stop() -> bool:
return stop_event.is_set() if stop_event else False
while not _should_stop():
_load_all_inventory(inventory_base_uri, reporting_base_uri)
# wait and the restart the loop
if stop_event:
stop_event.wait(timeout=refresh_frequency_seconds)
else:
time.sleep(refresh_frequency_seconds)