Skip to content
Snippets Groups Projects
Commit 020af674 authored by Erik Reid's avatar Erik Reid
Browse files

added common check processing to a class hierarchy

parent ae0d9b3d
No related branches found
No related tags found
No related merge requests found
......@@ -29,11 +29,6 @@ _DEFAULT_CONFIG = {
'interface-check': {
'script': '/var/lib/sensu/bin/counter2influx.sh',
'measurement': 'counters',
'interval': 300,
'subscriptions': ['interfacecounters'],
'output_metric_handlers': ['influx-db-handler'],
'namespace': 'default',
'round_robin': True,
'command': ('{script} {measurement} '
'{community} {hostname} '
'{interface} {ifIndex}'),
......@@ -56,22 +51,9 @@ CONFIG_SCHEMA = {
'properties': {
'script': {'type': 'string'},
'measurement': {'type': 'string'},
'interval': {'type': 'integer'},
'subscriptions': {
'type': 'array',
'items': {'type': 'string'}
},
'output_metric_handlers': {
'type': 'array',
'items': {'type': 'string'}
},
'namespace': {'type': 'string'},
'round_robin': {'type': 'boolean'},
'command': {'type': 'string'},
},
'required': ['script', 'measurement', 'interval',
'subscriptions', 'output_metric_handlers',
'namespace', 'round_robin', 'command'],
'required': ['script', 'measurement', 'command'],
'additionalProperties': False
},
'sensu': {
......@@ -118,6 +100,7 @@ CONFIG_SCHEMA = {
class State(object):
GWS_DIRECT = 'gws-direct.json'
INTERFACES = 'interfaces.json'
STATE = 'state.json'
......
......@@ -11,99 +11,68 @@ def load_ifc_checks(sensu_params):
name = check['metadata']['name']
# check-* is the old-style name (add to the returned
# data so it can be deleted)
return re.match(r'^(check|ifc)-([^-]+\.geant\.net)-(.+)$', name)
return re.match(r'^(check|ifc)-[^-]+\.geant\.net-.+$', name)
ifc_checks = filter(_is_ifc_check, sensu.load_all_checks(sensu_params))
return {c['metadata']['name']: c for c in ifc_checks}
def _check_name(interface):
ifc_name = interface['name'].replace('/', '-')
return f'ifc-{interface["router"]}-{ifc_name}'
class InterfaceCheck(sensu.AbstractCheck):
def __init__(self, ifc_check_params, interface):
super().__init__()
self.ifc_check_params = ifc_check_params
self.interface = interface
def _make_check(check_params, interface):
command = check_params['command'].format(
script=check_params['script'],
measurement=check_params['measurement'],
community='0pBiFbD', # TODO: add this to /poller/interfaces response
hostname=interface['router'],
interface=interface['name'],
ifIndex=interface['snmp-index']
)
@sensu.AbstractCheck.name.getter
def name(self):
ifc_name = self.interface['name'].replace('/', '-')
return f'ifc-{self.interface["router"]}-{ifc_name}'
return {
'command': command,
'interval': check_params['interval'],
'subscriptions': sorted(check_params['subscriptions']),
'proxy_entity_name': interface['router'],
'round_robin': check_params['round_robin'],
'output_metric_format': 'influxdb_line',
'output_metric_handlers': sorted(
check_params['output_metric_handlers']),
'metadata': {
'name': _check_name(interface),
'namespace': check_params['namespace']
},
'publish': True
}
@sensu.AbstractCheck.command.getter
def command(self):
return self.ifc_check_params['command'].format(
script=self.ifc_check_params['script'],
measurement=self.ifc_check_params['measurement'],
community='0pBiFbD', # TODO: add this to /poller/interfaces response
hostname=self.interface['router'],
interface=self.interface['name'],
ifIndex=self.interface['snmp-index'])
def _checks_match(a, b) -> bool:
if a['publish'] != b['publish']:
return False
if a['command'] != b['command']:
return False
if a['interval'] != b['interval']:
return False
if a['proxy_entity_name'] != b['proxy_entity_name']:
return False
if a['round_robin'] != b['round_robin']:
return False
if a['output_metric_format'] != b['output_metric_format']:
return False
if sorted(a['subscriptions']) != sorted(b['subscriptions']):
return False
if sorted(a['output_metric_handlers']) \
!= sorted(b['output_metric_handlers']):
return False
if a['metadata']['name'] != b['metadata']['name']:
return False
if a['metadata']['namespace'] != b['metadata']['namespace']:
return False
return True
@sensu.AbstractCheck.proxy_entity_name.getter
def proxy_entity_name(self):
return self.interface['router']
def refresh(sensu_params, state):
ifc_checks = load_ifc_checks(sensu_params)
current_ifc_checks = load_ifc_checks(sensu_params)
created = 0
updated = 0
interfaces = 0
for interface in state.interfaces:
interfaces += 1
required_checks = [
InterfaceCheck(sensu_params['interface-check'], ifc)
for ifc in state.interfaces]
expected_check = _make_check(
sensu_params['interface-check'], interface)
for expected_check in required_checks:
expected_name = _check_name(interface)
if expected_name not in ifc_checks:
if expected_check.name not in current_ifc_checks:
sensu.create_check(sensu_params, expected_check.to_dict())
created += 1
sensu.create_check(sensu_params, expected_check)
elif not _checks_match(ifc_checks[expected_name], expected_check):
elif not sensu.AbstractCheck.match_check_dicts(
current_ifc_checks[expected_check.name],
expected_check.to_dict()):
sensu.update_check(sensu_params, expected_check.to_dict())
updated += 1
sensu.update_check(sensu_params, expected_check)
wanted_checks = {_check_name(ifc) for ifc in state.interfaces}
extra_checks = set(ifc_checks.keys()) - wanted_checks
wanted_checks = {check.name for check in required_checks}
extra_checks = set(current_ifc_checks.keys()) - wanted_checks
for name in extra_checks:
sensu.delete_check(sensu_params, name)
# cf. main.REFRESH_RESULT_SCHEMA
return {
'checks': len(ifc_checks),
'input': interfaces,
'checks': len(current_ifc_checks),
'input': len(required_checks),
'created': created,
'updated': updated,
'deleted': len(extra_checks)
......
......@@ -80,33 +80,123 @@ INVENTORY_INTERFACES_SCHEMA = {
}
def _pick_one(things):
if not isinstance(things, (list, tuple, set)):
things = [things]
return random.choice(things)
GWS_DIRECT_SCHEMA = {
'$schema': 'http://json-schema.org/draft-07/schema#',
'definitions': {
'oid': {
'type': 'string',
'pattern': r'^(\d+\.)*\d+$'
},
'counter': {
'type': 'object',
'properties': {
'field': {
'enum': [
'discards_in',
'discards_out',
'errors_in',
'errors_out',
'traffic_in',
'traffic_out'
]
},
'oid': {'$ref': '#/definitions/oid'},
'community': {'type': 'string'}
},
'required': ['field', 'oid', 'community'],
'additionalProperties': False
},
'interface-counters': {
'type': 'object',
'properties': {
'nren': {'type': 'string'},
'isp': {
'type': 'string',
'enum': ['Cogent', 'Telia', 'Century Link']
},
'hostname': {'type': 'string'},
'tag': {'type': 'string'},
'counters': {
'type': 'array',
'items': {'$ref': '#/definitions/counter'},
'minItems': 1
}
},
'required': [
'nren', 'isp', 'hostname', 'tag', 'counters'],
'additionalProperties': False
}
},
'type': 'array',
'items': {'$ref': '#/definitions/interface-counters'}
}
def load_interfaces(base_urls):
def _pick_one(haystack):
if not isinstance(haystack, (list, tuple, set)):
haystack = [haystack]
return random.choice(haystack)
def _load_inventory_json(api_route, base_urls, schema):
"""
Load /poller/interfaces from inventory provider
and return a slightly reformatted dict.
Load & decode the specified inventory api data
:param api_route: the api-specific handler route
:param base_urls: inventory provider base api url, or a list of them
:return: a dict like [<router>][<interface>] = inventory leaf data
:param schema: jsonschema to validate the response against
:return: the decoded json reponse
"""
url = _pick_one(base_urls)
logger.debug(f'using inventory base api url: {url}')
rsp = requests.get(
f'{url}/poller/interfaces',
f'{url}/{api_route}',
headers={'Accept': 'application/json'})
rsp.raise_for_status()
result = rsp.json()
jsonschema.validate(result, INVENTORY_INTERFACES_SCHEMA)
jsonschema.validate(result, schema)
return result
def load_interfaces(base_urls):
"""
Load /poller/interfaces from inventory provider
and return a slightly reformatted dict.
:param base_urls: inventory provider base api url, or a list of them
:return: a list (INVENTORY_INTERFACES_SCHEMA)
"""
return _load_inventory_json(
'poller/interfaces', base_urls, INVENTORY_INTERFACES_SCHEMA)
def load_gws_direct_interfaces(base_urls):
"""
Load /poller/gws/direct from inventory provider
:param base_urls: inventory provider base api url, or a list of them
:return: an interable of interface-specific check data
"""
gws_direct_config = _load_inventory_json(
'poller/gws/direct', base_urls, INVENTORY_INTERFACES_SCHEMA)
def _ifc_check_params(config_item):
# i.e. all but the counters element
return {
'nren': config_item['nren'],
'isp': config_item['isp'],
'hostname': config_item['hostname'],
'tag': config_item['tag']
}
return map(_ifc_check_params, gws_direct_config)
def last_update_timestamp(base_urls) -> float:
try:
r = requests.get(
......
......@@ -29,7 +29,8 @@ import click
import jsonschema
from statsd import StatsClient
from brian_polling_manager import inventory, interfaces, configuration
from brian_polling_manager \
import inventory, configuration, interfaces, gws_direct
logger = logging.getLogger(__name__)
......@@ -76,9 +77,11 @@ def refresh(config, force=False):
if force or not last or last != state.last:
state.last = last
state.interfaces = inventory.load_interfaces(config['inventory'])
# state.gws_direct = inventory.load_gws_direct_interfaces(
# config['inventory'])
result = {
'interfaces': interfaces.refresh(config['sensu'], state)
'interfaces': interfaces.refresh(config['sensu'], state),
# 'gws direct': gws_direct.refresh(config['sensu'], state)
}
statsd_config = config.get('statsd', None)
......
"""
Sensu api utilities
"""
import functools
import logging
import random
import requests
......@@ -8,7 +9,14 @@ import requests
logger = logging.getLogger(__name__)
_cached_checks = None # not using lru_cache, since params is a dict
def load_all_checks(params, namespace='default'):
global _cached_checks
if _cached_checks is not None:
return _cached_checks
url = random.choice(params['api-base'])
r = requests.get(
f'{url}/api/core/v2/namespaces/{namespace}/checks',
......@@ -18,8 +26,8 @@ def load_all_checks(params, namespace='default'):
})
r.raise_for_status()
for check in r.json():
yield check
_cached_checks = r.json()
return _cached_checks
def create_check(params, check, namespace='default'):
......@@ -60,3 +68,103 @@ def delete_check(params, check, namespace='default'):
f'{url}/api/core/v2/namespaces/{namespace}/checks/{name}',
headers={'Authorization': f'Key {params["api-key"]}'})
r.raise_for_status()
def checks_match(a, b) -> bool:
if a['publish'] != b['publish']:
return False
if a['command'] != b['command']:
return False
if a['interval'] != b['interval']:
return False
if a['proxy_entity_name'] != b['proxy_entity_name']:
return False
if a['round_robin'] != b['round_robin']:
return False
if a['output_metric_format'] != b['output_metric_format']:
return False
if sorted(a['subscriptions']) != sorted(b['subscriptions']):
return False
if sorted(a['output_metric_handlers']) \
!= sorted(b['output_metric_handlers']):
return False
if a['metadata']['name'] != b['metadata']['name']:
return False
if a['metadata']['namespace'] != b['metadata']['namespace']:
return False
return True
class AbstractCheck(object):
"""
not explicitly using abc.ABC, to avoid stacks of decorators
"""
INTERVAL_S = 300
SUBSCRIPTIONS = ['interfacecounters']
METRIC_FORMAT = 'influxdb_line'
METRIC_HANDLERS = ['influx-db-handler']
NAMESPACE = 'default'
def __init__(self):
self.publish = True
self.round_robin = True
self.interval = AbstractCheck.INTERVAL_S
self.subscriptions = AbstractCheck.SUBSCRIPTIONS
self.output_metric_handlers = AbstractCheck.METRIC_HANDLERS
self.output_metric_format = AbstractCheck.METRIC_FORMAT
self.namespace = AbstractCheck.NAMESPACE
@property
def name(self):
assert False, 'property StandardCheck.name must be overridden'
@property
def command(self):
assert False, 'property StandardCheck.command must be overridden'
@property
def proxy_entity_name(self):
assert False, \
'property StandardCheck.proxy_entity_name must be overridden'
def to_dict(self):
return {
'command': self.command,
'interval': self.interval,
'subscriptions': sorted(self.subscriptions),
'proxy_entity_name': self.proxy_entity_name,
'round_robin': self.round_robin,
'output_metric_format': self.output_metric_format,
'output_metric_handlers': sorted(self.output_metric_handlers),
'metadata': {
'name': self.name,
'namespace': self.namespace
},
'publish': self.publish
}
@staticmethod
def match_check_dicts(a, b) -> bool:
if a['publish'] != b['publish']:
return False
if a['command'] != b['command']:
return False
if a['interval'] != b['interval']:
return False
if a['proxy_entity_name'] != b['proxy_entity_name']:
return False
if a['round_robin'] != b['round_robin']:
return False
if a['output_metric_format'] != b['output_metric_format']:
return False
if sorted(a['subscriptions']) != sorted(b['subscriptions']):
return False
if sorted(a['output_metric_handlers']) \
!= sorted(b['output_metric_handlers']):
return False
if a['metadata']['name'] != b['metadata']['name']:
return False
if a['metadata']['namespace'] != b['metadata']['namespace']:
return False
return True
......@@ -36,11 +36,6 @@ def config():
'interface-check': {
'script': '/var/lib/sensu/bin/counter2influx.sh',
'measurement': 'counters',
'interval': 300,
'subscriptions': ['interfacecounters'],
'output_metric_handlers': ['influx-db-handler'],
'namespace': 'default',
'round_robin': True,
'command': ('{script} {measurement} '
'{community} {hostname} '
'{interface} {ifIndex}'),
......
......@@ -8,7 +8,7 @@ from brian_polling_manager import sensu, inventory, interfaces
@responses.activate
def test_load_checks(config, mocked_sensu):
checks = list(sensu.load_all_checks(config['sensu']))
checks = sensu.load_all_checks(config['sensu'])
assert len(checks) > 0 # test data has checks in it
......@@ -19,9 +19,9 @@ def test_check_lifecycle(config, mocked_sensu, mocked_inventory):
inventory.load_interfaces(config['inventory']))
test_interface['name'] = 'xyz'
new_check = interfaces._make_check(
new_check = interfaces.InterfaceCheck(
config['sensu']['interface-check'],
test_interface)
test_interface).to_dict()
# create the new check
check_name = new_check['metadata']['name']
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment