diff --git a/brian_polling_manager/configuration.py b/brian_polling_manager/configuration.py index bbb136dd4601b54c3863813d80a39eb4b2463f6d..ec177a678265d24a70d72026a970ef5c83ce6994 100644 --- a/brian_polling_manager/configuration.py +++ b/brian_polling_manager/configuration.py @@ -29,11 +29,6 @@ _DEFAULT_CONFIG = { 'interface-check': { 'script': '/var/lib/sensu/bin/counter2influx.sh', 'measurement': 'counters', - 'interval': 300, - 'subscriptions': ['interfacecounters'], - 'output_metric_handlers': ['influx-db-handler'], - 'namespace': 'default', - 'round_robin': True, 'command': ('{script} {measurement} ' '{community} {hostname} ' '{interface} {ifIndex}'), @@ -56,22 +51,9 @@ CONFIG_SCHEMA = { 'properties': { 'script': {'type': 'string'}, 'measurement': {'type': 'string'}, - 'interval': {'type': 'integer'}, - 'subscriptions': { - 'type': 'array', - 'items': {'type': 'string'} - }, - 'output_metric_handlers': { - 'type': 'array', - 'items': {'type': 'string'} - }, - 'namespace': {'type': 'string'}, - 'round_robin': {'type': 'boolean'}, 'command': {'type': 'string'}, }, - 'required': ['script', 'measurement', 'interval', - 'subscriptions', 'output_metric_handlers', - 'namespace', 'round_robin', 'command'], + 'required': ['script', 'measurement', 'command'], 'additionalProperties': False }, 'sensu': { @@ -118,6 +100,7 @@ CONFIG_SCHEMA = { class State(object): + GWS_DIRECT = 'gws-direct.json' INTERFACES = 'interfaces.json' STATE = 'state.json' diff --git a/brian_polling_manager/interfaces.py b/brian_polling_manager/interfaces.py index f824c0d69d30262a4d87501f5b6291213b817131..9bdfa77a27bf82b1e651d80acee5fd0f2d262d8e 100644 --- a/brian_polling_manager/interfaces.py +++ b/brian_polling_manager/interfaces.py @@ -11,99 +11,68 @@ def load_ifc_checks(sensu_params): name = check['metadata']['name'] # check-* is the old-style name (add to the returned # data so it can be deleted) - return re.match(r'^(check|ifc)-([^-]+\.geant\.net)-(.+)$', name) + return re.match(r'^(check|ifc)-[^-]+\.geant\.net-.+$', name) ifc_checks = filter(_is_ifc_check, sensu.load_all_checks(sensu_params)) return {c['metadata']['name']: c for c in ifc_checks} -def _check_name(interface): - ifc_name = interface['name'].replace('/', '-') - return f'ifc-{interface["router"]}-{ifc_name}' +class InterfaceCheck(sensu.AbstractCheck): + def __init__(self, ifc_check_params, interface): + super().__init__() + self.ifc_check_params = ifc_check_params + self.interface = interface -def _make_check(check_params, interface): - command = check_params['command'].format( - script=check_params['script'], - measurement=check_params['measurement'], - community='0pBiFbD', # TODO: add this to /poller/interfaces response - hostname=interface['router'], - interface=interface['name'], - ifIndex=interface['snmp-index'] - ) + @sensu.AbstractCheck.name.getter + def name(self): + ifc_name = self.interface['name'].replace('/', '-') + return f'ifc-{self.interface["router"]}-{ifc_name}' - return { - 'command': command, - 'interval': check_params['interval'], - 'subscriptions': sorted(check_params['subscriptions']), - 'proxy_entity_name': interface['router'], - 'round_robin': check_params['round_robin'], - 'output_metric_format': 'influxdb_line', - 'output_metric_handlers': sorted( - check_params['output_metric_handlers']), - 'metadata': { - 'name': _check_name(interface), - 'namespace': check_params['namespace'] - }, - 'publish': True - } + @sensu.AbstractCheck.command.getter + def command(self): + return self.ifc_check_params['command'].format( + script=self.ifc_check_params['script'], + measurement=self.ifc_check_params['measurement'], + community='0pBiFbD', # TODO: add this to /poller/interfaces response + hostname=self.interface['router'], + interface=self.interface['name'], + ifIndex=self.interface['snmp-index']) - -def _checks_match(a, b) -> bool: - if a['publish'] != b['publish']: - return False - if a['command'] != b['command']: - return False - if a['interval'] != b['interval']: - return False - if a['proxy_entity_name'] != b['proxy_entity_name']: - return False - if a['round_robin'] != b['round_robin']: - return False - if a['output_metric_format'] != b['output_metric_format']: - return False - if sorted(a['subscriptions']) != sorted(b['subscriptions']): - return False - if sorted(a['output_metric_handlers']) \ - != sorted(b['output_metric_handlers']): - return False - if a['metadata']['name'] != b['metadata']['name']: - return False - if a['metadata']['namespace'] != b['metadata']['namespace']: - return False - return True + @sensu.AbstractCheck.proxy_entity_name.getter + def proxy_entity_name(self): + return self.interface['router'] def refresh(sensu_params, state): - ifc_checks = load_ifc_checks(sensu_params) + current_ifc_checks = load_ifc_checks(sensu_params) created = 0 updated = 0 - interfaces = 0 - for interface in state.interfaces: - - interfaces += 1 + required_checks = [ + InterfaceCheck(sensu_params['interface-check'], ifc) + for ifc in state.interfaces] - expected_check = _make_check( - sensu_params['interface-check'], interface) + for expected_check in required_checks: - expected_name = _check_name(interface) - if expected_name not in ifc_checks: + if expected_check.name not in current_ifc_checks: + sensu.create_check(sensu_params, expected_check.to_dict()) created += 1 - sensu.create_check(sensu_params, expected_check) - elif not _checks_match(ifc_checks[expected_name], expected_check): + elif not sensu.AbstractCheck.match_check_dicts( + current_ifc_checks[expected_check.name], + expected_check.to_dict()): + sensu.update_check(sensu_params, expected_check.to_dict()) updated += 1 - sensu.update_check(sensu_params, expected_check) - wanted_checks = {_check_name(ifc) for ifc in state.interfaces} - extra_checks = set(ifc_checks.keys()) - wanted_checks + wanted_checks = {check.name for check in required_checks} + extra_checks = set(current_ifc_checks.keys()) - wanted_checks for name in extra_checks: sensu.delete_check(sensu_params, name) # cf. main.REFRESH_RESULT_SCHEMA return { - 'checks': len(ifc_checks), - 'input': interfaces, + 'checks': len(current_ifc_checks), + 'input': len(required_checks), 'created': created, 'updated': updated, 'deleted': len(extra_checks) diff --git a/brian_polling_manager/inventory.py b/brian_polling_manager/inventory.py index b5211049129076e924420c21d2a7fd4ede1e0eee..ac950c461f3ecdb11f40912c0fd0ae5e4620596e 100644 --- a/brian_polling_manager/inventory.py +++ b/brian_polling_manager/inventory.py @@ -80,33 +80,123 @@ INVENTORY_INTERFACES_SCHEMA = { } -def _pick_one(things): - if not isinstance(things, (list, tuple, set)): - things = [things] - return random.choice(things) +GWS_DIRECT_SCHEMA = { + '$schema': 'http://json-schema.org/draft-07/schema#', + + 'definitions': { + 'oid': { + 'type': 'string', + 'pattern': r'^(\d+\.)*\d+$' + }, + 'counter': { + 'type': 'object', + 'properties': { + 'field': { + 'enum': [ + 'discards_in', + 'discards_out', + 'errors_in', + 'errors_out', + 'traffic_in', + 'traffic_out' + ] + }, + 'oid': {'$ref': '#/definitions/oid'}, + 'community': {'type': 'string'} + }, + 'required': ['field', 'oid', 'community'], + 'additionalProperties': False + }, + 'interface-counters': { + 'type': 'object', + 'properties': { + 'nren': {'type': 'string'}, + 'isp': { + 'type': 'string', + 'enum': ['Cogent', 'Telia', 'Century Link'] + }, + 'hostname': {'type': 'string'}, + 'tag': {'type': 'string'}, + 'counters': { + 'type': 'array', + 'items': {'$ref': '#/definitions/counter'}, + 'minItems': 1 + } + }, + 'required': [ + 'nren', 'isp', 'hostname', 'tag', 'counters'], + 'additionalProperties': False + } + }, + + 'type': 'array', + 'items': {'$ref': '#/definitions/interface-counters'} +} -def load_interfaces(base_urls): +def _pick_one(haystack): + if not isinstance(haystack, (list, tuple, set)): + haystack = [haystack] + return random.choice(haystack) + + +def _load_inventory_json(api_route, base_urls, schema): """ - Load /poller/interfaces from inventory provider - and return a slightly reformatted dict. + Load & decode the specified inventory api data + :param api_route: the api-specific handler route :param base_urls: inventory provider base api url, or a list of them - :return: a dict like [<router>][<interface>] = inventory leaf data + :param schema: jsonschema to validate the response against + :return: the decoded json reponse """ url = _pick_one(base_urls) logger.debug(f'using inventory base api url: {url}') rsp = requests.get( - f'{url}/poller/interfaces', + f'{url}/{api_route}', headers={'Accept': 'application/json'}) rsp.raise_for_status() result = rsp.json() - jsonschema.validate(result, INVENTORY_INTERFACES_SCHEMA) + jsonschema.validate(result, schema) return result +def load_interfaces(base_urls): + """ + Load /poller/interfaces from inventory provider + and return a slightly reformatted dict. + + :param base_urls: inventory provider base api url, or a list of them + :return: a list (INVENTORY_INTERFACES_SCHEMA) + """ + return _load_inventory_json( + 'poller/interfaces', base_urls, INVENTORY_INTERFACES_SCHEMA) + + +def load_gws_direct_interfaces(base_urls): + """ + Load /poller/gws/direct from inventory provider + + :param base_urls: inventory provider base api url, or a list of them + :return: an interable of interface-specific check data + """ + + gws_direct_config = _load_inventory_json( + 'poller/gws/direct', base_urls, INVENTORY_INTERFACES_SCHEMA) + + def _ifc_check_params(config_item): + # i.e. all but the counters element + return { + 'nren': config_item['nren'], + 'isp': config_item['isp'], + 'hostname': config_item['hostname'], + 'tag': config_item['tag'] + } + + return map(_ifc_check_params, gws_direct_config) + + def last_update_timestamp(base_urls) -> float: try: r = requests.get( diff --git a/brian_polling_manager/main.py b/brian_polling_manager/main.py index 5e92a164b9706344ec9d558fd37ed2fd2fdc4473..bd2c484473f9f76059fbf9a6651e618ecaf03e3f 100644 --- a/brian_polling_manager/main.py +++ b/brian_polling_manager/main.py @@ -29,7 +29,8 @@ import click import jsonschema from statsd import StatsClient -from brian_polling_manager import inventory, interfaces, configuration +from brian_polling_manager \ + import inventory, configuration, interfaces, gws_direct logger = logging.getLogger(__name__) @@ -76,9 +77,11 @@ def refresh(config, force=False): if force or not last or last != state.last: state.last = last state.interfaces = inventory.load_interfaces(config['inventory']) - + # state.gws_direct = inventory.load_gws_direct_interfaces( + # config['inventory']) result = { - 'interfaces': interfaces.refresh(config['sensu'], state) + 'interfaces': interfaces.refresh(config['sensu'], state), + # 'gws direct': gws_direct.refresh(config['sensu'], state) } statsd_config = config.get('statsd', None) diff --git a/brian_polling_manager/sensu.py b/brian_polling_manager/sensu.py index 0c60d091320f7ddfbc8a94aa5423b34c7a1dc9f2..228ea5188e67e81055168b152eb87ffa9c418736 100644 --- a/brian_polling_manager/sensu.py +++ b/brian_polling_manager/sensu.py @@ -1,6 +1,7 @@ """ Sensu api utilities """ +import functools import logging import random import requests @@ -8,7 +9,14 @@ import requests logger = logging.getLogger(__name__) +_cached_checks = None # not using lru_cache, since params is a dict + + def load_all_checks(params, namespace='default'): + global _cached_checks + if _cached_checks is not None: + return _cached_checks + url = random.choice(params['api-base']) r = requests.get( f'{url}/api/core/v2/namespaces/{namespace}/checks', @@ -18,8 +26,8 @@ def load_all_checks(params, namespace='default'): }) r.raise_for_status() - for check in r.json(): - yield check + _cached_checks = r.json() + return _cached_checks def create_check(params, check, namespace='default'): @@ -60,3 +68,103 @@ def delete_check(params, check, namespace='default'): f'{url}/api/core/v2/namespaces/{namespace}/checks/{name}', headers={'Authorization': f'Key {params["api-key"]}'}) r.raise_for_status() + + +def checks_match(a, b) -> bool: + if a['publish'] != b['publish']: + return False + if a['command'] != b['command']: + return False + if a['interval'] != b['interval']: + return False + if a['proxy_entity_name'] != b['proxy_entity_name']: + return False + if a['round_robin'] != b['round_robin']: + return False + if a['output_metric_format'] != b['output_metric_format']: + return False + if sorted(a['subscriptions']) != sorted(b['subscriptions']): + return False + if sorted(a['output_metric_handlers']) \ + != sorted(b['output_metric_handlers']): + return False + if a['metadata']['name'] != b['metadata']['name']: + return False + if a['metadata']['namespace'] != b['metadata']['namespace']: + return False + return True + + +class AbstractCheck(object): + """ + not explicitly using abc.ABC, to avoid stacks of decorators + """ + + INTERVAL_S = 300 + SUBSCRIPTIONS = ['interfacecounters'] + METRIC_FORMAT = 'influxdb_line' + METRIC_HANDLERS = ['influx-db-handler'] + NAMESPACE = 'default' + + def __init__(self): + self.publish = True + self.round_robin = True + self.interval = AbstractCheck.INTERVAL_S + self.subscriptions = AbstractCheck.SUBSCRIPTIONS + self.output_metric_handlers = AbstractCheck.METRIC_HANDLERS + self.output_metric_format = AbstractCheck.METRIC_FORMAT + self.namespace = AbstractCheck.NAMESPACE + + @property + def name(self): + assert False, 'property StandardCheck.name must be overridden' + + @property + def command(self): + assert False, 'property StandardCheck.command must be overridden' + + @property + def proxy_entity_name(self): + assert False, \ + 'property StandardCheck.proxy_entity_name must be overridden' + + def to_dict(self): + return { + 'command': self.command, + 'interval': self.interval, + 'subscriptions': sorted(self.subscriptions), + 'proxy_entity_name': self.proxy_entity_name, + 'round_robin': self.round_robin, + 'output_metric_format': self.output_metric_format, + 'output_metric_handlers': sorted(self.output_metric_handlers), + 'metadata': { + 'name': self.name, + 'namespace': self.namespace + }, + 'publish': self.publish + } + + @staticmethod + def match_check_dicts(a, b) -> bool: + if a['publish'] != b['publish']: + return False + if a['command'] != b['command']: + return False + if a['interval'] != b['interval']: + return False + if a['proxy_entity_name'] != b['proxy_entity_name']: + return False + if a['round_robin'] != b['round_robin']: + return False + if a['output_metric_format'] != b['output_metric_format']: + return False + if sorted(a['subscriptions']) != sorted(b['subscriptions']): + return False + if sorted(a['output_metric_handlers']) \ + != sorted(b['output_metric_handlers']): + return False + if a['metadata']['name'] != b['metadata']['name']: + return False + if a['metadata']['namespace'] != b['metadata']['namespace']: + return False + return True diff --git a/test/conftest.py b/test/conftest.py index 2553e8fcafecbeb13bf967ab4ae92e28ebe5f704..4b6295e2e525d036daae16c59df24f63581a8ee1 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -36,11 +36,6 @@ def config(): 'interface-check': { 'script': '/var/lib/sensu/bin/counter2influx.sh', 'measurement': 'counters', - 'interval': 300, - 'subscriptions': ['interfacecounters'], - 'output_metric_handlers': ['influx-db-handler'], - 'namespace': 'default', - 'round_robin': True, 'command': ('{script} {measurement} ' '{community} {hostname} ' '{interface} {ifIndex}'), diff --git a/test/test_sensu_checks.py b/test/test_sensu_checks.py index 7a5a8178cfd72ff4d3d6a1899b1d1012e45f5df4..7e48b600c8142a4a090ac017397cb1598bba4328 100644 --- a/test/test_sensu_checks.py +++ b/test/test_sensu_checks.py @@ -8,7 +8,7 @@ from brian_polling_manager import sensu, inventory, interfaces @responses.activate def test_load_checks(config, mocked_sensu): - checks = list(sensu.load_all_checks(config['sensu'])) + checks = sensu.load_all_checks(config['sensu']) assert len(checks) > 0 # test data has checks in it @@ -19,9 +19,9 @@ def test_check_lifecycle(config, mocked_sensu, mocked_inventory): inventory.load_interfaces(config['inventory'])) test_interface['name'] = 'xyz' - new_check = interfaces._make_check( + new_check = interfaces.InterfaceCheck( config['sensu']['interface-check'], - test_interface) + test_interface).to_dict() # create the new check check_name = new_check['metadata']['name']