-
Pelle Koster authoredPelle Koster authored
inventory.py 5.38 KiB
import logging
import random
import jsonschema
import requests
logger = logging.getLogger(__name__)
# minimal inventory response schema for our purposes
INVENTORY_VERSION_SCHEMA = {
'$schema': 'https://json-schema.org/draft/2020-12/schema',
'definitions': {
'latch': {
'type': 'object',
'properties': {
'timestamp': {'type': 'number'}
},
'required': ['timestamp'],
'additionalProperties': True
}
},
'type': 'object',
'properties': {
'latch': {'$ref': '#/definitions/latch'}
},
'required': ['latch'],
'additionalProperties': True
}
# minimal inventory response schema for our purposes
INVENTORY_INTERFACES_SCHEMA = {
'$schema': 'https://json-schema.org/draft/2020-12/schema',
'definitions': {
'interface': {
'type': 'object',
'properties': {
'router': {'type': 'string'},
'name': {'type': 'string'},
},
'required': ['router', 'name'],
}
},
'type': 'array',
'items': {'$ref': '#/definitions/interface'}
}
# minimal inventory response schema for our purposes
GWS_DIRECT_SCHEMA = {
'$schema': 'https://json-schema.org/draft/2020-12/schema',
'definitions': {
'interface-counters': {
'type': 'object',
'properties': {
'nren': {'type': 'string'},
'isp': {'type': 'string'},
'hostname': {'type': 'string'},
'tag': {'type': 'string'},
},
'required': ['nren', 'isp', 'hostname', 'tag']
}
},
'type': 'array',
'items': {'$ref': '#/definitions/interface-counters'}
}
GWS_INDIRECT_SCHEMA = {
'$schema': 'https://json-schema.org/draft/2020-12/schema',
'definitions': {
'service': {
'type': 'object',
'properties': {
# minimal validation for our purposes
'name': {'type': 'string'},
'hostname': {'type': 'string'}
},
'required': ['name', 'hostname']
}
},
'type': 'array',
'items': {'$ref': '#/definitions/service'}
}
# much less strict version of the actual schema
MULTICAST_SUBSCRIPTION_LIST_SCHEMA = {
'$schema': 'https://json-schema.org/draft/2020-12/schema',
'definitions': {
'subscription': {
'type': 'object',
'properties': {
# we really only use this field
# don't depend strictly on unused data
'router': {'type': 'string'}
},
'required': ['router']
}
},
'type': 'array',
'items': {'$ref': '#/definitions/subscription'}
}
def _pick_one(haystack):
if not isinstance(haystack, (list, tuple, set)):
haystack = [haystack]
return random.choice(haystack)
def _load_inventory_json(api_route, base_urls, schema):
"""
Load & decode the specified inventory api data
:param api_route: the api-specific handler route
:param base_urls: inventory provider base api url, or a list of them
:param schema: jsonschema to validate the response against
:return: the decoded json reponse
"""
url = _pick_one(base_urls)
logger.debug(f'using inventory base api url: {url}')
rsp = requests.get(
f'{url}/{api_route}',
headers={'Accept': 'application/json'})
rsp.raise_for_status()
result = rsp.json()
jsonschema.validate(result, schema)
return result
def load_interfaces(base_urls):
"""
Load /poller/interfaces from inventory provider
and return a slightly reformatted dict.
:param base_urls: inventory provider base api url, or a list of them
:return: a list (INVENTORY_INTERFACES_SCHEMA)
"""
return _load_inventory_json(
'poller/interfaces', base_urls, INVENTORY_INTERFACES_SCHEMA)
def load_gws_direct_interfaces(base_urls):
"""
Load /poller/gws/direct from inventory provider
:param base_urls: inventory provider base api url, or a list of them
:return: an interable of interface-specific check data
"""
return _load_inventory_json(
'poller/gws/direct', base_urls, GWS_DIRECT_SCHEMA)
def load_gws_indirect_services(base_urls):
"""
Load /poller/gws/indirect from inventory provider
:param base_urls: inventory provider base api url, or a list of them
:return: an iterable of strings (service names)
"""
return _load_inventory_json(
'poller/gws/indirect', base_urls, GWS_INDIRECT_SCHEMA)
def load_eumetsat_multicast_subscriptions(base_urls):
"""
Load /poller/eumetsat-multicast from inventory provider
:param base_urls: inventory provider base api url, or a list of them
:return: a list of dicts, each with a 'router' key
"""
return _load_inventory_json(
'poller/eumetsat-multicast',
base_urls,
MULTICAST_SUBSCRIPTION_LIST_SCHEMA)
def last_update_timestamp(base_urls) -> float:
try:
r = requests.get(
f'{_pick_one(base_urls)}/version',
headers={'Accept': 'application/json'})
r.raise_for_status()
result = r.json()
jsonschema.validate(result, INVENTORY_VERSION_SCHEMA)
return result['latch']['timestamp']
except (IOError, jsonschema.ValidationError, ValueError):
logger.exception('connection error')
return None