Skip to content
Snippets Groups Projects
Select Git revision
  • a28a7e9f72f138f387ba0f1c6c94eece41bce47e
  • develop default protected
  • master protected
  • feature/POL1-813-error-report-sensu-check
  • 0.23
  • 0.22
  • 0.21
  • 0.20
  • 0.19
  • 0.18
  • 0.17
  • 0.16
  • 0.15
  • 0.14
  • 0.13
  • 0.12
  • 0.11
  • 0.10
  • 0.9
  • 0.8
  • 0.7
  • 0.6
  • 0.5
  • 0.4
24 results

inventory.py

Blame
  • inventory.py 5.38 KiB
    import logging
    import random
    
    import jsonschema
    import requests
    
    logger = logging.getLogger(__name__)
    
    # minimal inventory response schema for our purposes
    INVENTORY_VERSION_SCHEMA = {
        '$schema': 'https://json-schema.org/draft/2020-12/schema',
    
        'definitions': {
            'latch': {
                'type': 'object',
                'properties': {
                    'timestamp': {'type': 'number'}
                },
                'required': ['timestamp'],
                'additionalProperties': True
            }
        },
    
        'type': 'object',
        'properties': {
            'latch': {'$ref': '#/definitions/latch'}
        },
        'required': ['latch'],
        'additionalProperties': True
    }
    
    # minimal inventory response schema for our purposes
    INVENTORY_INTERFACES_SCHEMA = {
        '$schema': 'https://json-schema.org/draft/2020-12/schema',
    
        'definitions': {
            'interface': {
                'type': 'object',
                'properties': {
                    'router': {'type': 'string'},
                    'name': {'type': 'string'},
                },
                'required': ['router', 'name',],
            }
        },
    
        'type': 'array',
        'items': {'$ref': '#/definitions/interface'}
    }
    
    
    # minimal inventory response schema for our purposes
    GWS_DIRECT_SCHEMA = {
        '$schema': 'https://json-schema.org/draft/2020-12/schema',
    
        'definitions': {
            'interface-counters': {
                'type': 'object',
                'properties': {
                    'nren': {'type': 'string'},
                    'isp': {'type': 'string'},
                    'hostname': {'type': 'string'},
                    'tag': {'type': 'string'},
                },
                'required': ['nren', 'isp', 'hostname', 'tag']
            }
        },
    
        'type': 'array',
        'items': {'$ref': '#/definitions/interface-counters'}
    }
    
    GWS_INDIRECT_SCHEMA = {
        '$schema': 'https://json-schema.org/draft/2020-12/schema',
    
        'definitions': {
            'service': {
                'type': 'object',
                'properties': {
                    # minimal validation for our purposes
                    'name': {'type': 'string'},
                    'hostname': {'type': 'string'}
                },
                'required': ['name', 'hostname']
            }
        },
    
        'type': 'array',
        'items': {'$ref': '#/definitions/service'}
    }
    
    
    # much less strict version of the actual schema
    MULTICAST_SUBSCRIPTION_LIST_SCHEMA = {
        '$schema': 'https://json-schema.org/draft/2020-12/schema',
    
        'definitions': {
            'subscription': {
                'type': 'object',
                'properties': {
                    # we really only use this field
                    # don't depend strictly on unused data
                    'router': {'type': 'string'}
                },
                'required': ['router']
            }
        },
    
        'type': 'array',
        'items': {'$ref': '#/definitions/subscription'}
    }
    
    
    def _pick_one(haystack):
        if not isinstance(haystack, (list, tuple, set)):
            haystack = [haystack]
        return random.choice(haystack)
    
    
    def _load_inventory_json(api_route, base_urls, schema):
        """
        Load & decode the specified inventory api data
    
        :param api_route: the api-specific handler route
        :param base_urls: inventory provider base api url, or a list of them
        :param schema: jsonschema to validate the response against
        :return: the decoded json reponse
        """
        url = _pick_one(base_urls)
        logger.debug(f'using inventory base api url: {url}')
    
        rsp = requests.get(
            f'{url}/{api_route}',
            headers={'Accept': 'application/json'})
        rsp.raise_for_status()
    
        result = rsp.json()
        jsonschema.validate(result, schema)
        return result
    
    
    def load_interfaces(base_urls):
        """
        Load /poller/interfaces from inventory provider
        and return a slightly reformatted dict.
    
        :param base_urls: inventory provider base api url, or a list of them
        :return: a list (INVENTORY_INTERFACES_SCHEMA)
        """
        return _load_inventory_json(
            'poller/interfaces', base_urls, INVENTORY_INTERFACES_SCHEMA)
    
    
    def load_gws_direct_interfaces(base_urls):
        """
        Load /poller/gws/direct from inventory provider
    
        :param base_urls: inventory provider base api url, or a list of them
        :return: an interable of interface-specific check data
        """
        return _load_inventory_json(
            'poller/gws/direct', base_urls, GWS_DIRECT_SCHEMA)
    
    
    def load_gws_indirect_services(base_urls):
        """
        Load /poller/gws/indirect from inventory provider
    
        :param base_urls: inventory provider base api url, or a list of them
        :return: an iterable of strings (service names)
        """
        return _load_inventory_json(
            'poller/gws/indirect', base_urls, GWS_INDIRECT_SCHEMA)
    
    
    def load_eumetsat_multicast_subscriptions(base_urls):
        """
        Load /poller/eumetsat-multicast from inventory provider
    
        :param base_urls: inventory provider base api url, or a list of them
        :return: a list of dicts, each with a 'router' key
        """
        return _load_inventory_json(
            'poller/eumetsat-multicast',
            base_urls,
            MULTICAST_SUBSCRIPTION_LIST_SCHEMA)
    
    
    def last_update_timestamp(base_urls) -> float:
        try:
            r = requests.get(
                f'{_pick_one(base_urls)}/version',
                headers={'Accept': 'application/json'})
            r.raise_for_status()
    
            result = r.json()
            jsonschema.validate(result, INVENTORY_VERSION_SCHEMA)
            return result['latch']['timestamp']
        except (IOError, jsonschema.ValidationError, ValueError):
            logger.exception('connection error')
            return None