sensu.py 8.00 KiB
"""
Sensu api utilities
"""
import copy
import logging
import random
import requests
logger = logging.getLogger(__name__)
_cached_checks = None # not using lru_cache, since params is a dict
def load_all_checks(params, namespace='default'):
global _cached_checks
if _cached_checks is not None:
return _cached_checks
url = random.choice(params['api-base'])
r = requests.get(
f'{url}/api/core/v2/namespaces/{namespace}/checks',
headers={
'Authorization': f'Key {params["api-key"]}',
'Accepts': 'application/json',
})
r.raise_for_status()
_cached_checks = r.json()
return _cached_checks
def create_check(params, check, namespace='default'):
logger.info(f'creating missing check: {check["metadata"]["name"]}')
url = random.choice(params['api-base'])
r = requests.post(
f'{url}/api/core/v2/namespaces/{namespace}/checks',
headers={
'Authorization': f'Key {params["api-key"]}',
'Content-Type': 'application/json',
},
json=check)
r.raise_for_status()
def update_check(params, check, namespace='default'):
name = check["metadata"]["name"]
logger.info(f'updating existing check: {name}')
url = random.choice(params['api-base'])
r = requests.put(
f'{url}/api/core/v2/namespaces/{namespace}/checks/{name}',
headers={
'Authorization': f'Key {params["api-key"]}',
'Content-Type': 'application/json',
},
json=check)
r.raise_for_status()
def delete_check(params, check, namespace='default'):
if isinstance(check, str):
name = check
else:
name = check["metadata"]["name"]
logger.info(f'deleting unwanted check: {name}')
url = random.choice(params['api-base'])
r = requests.delete(
f'{url}/api/core/v2/namespaces/{namespace}/checks/{name}',
headers={'Authorization': f'Key {params["api-key"]}'})
r.raise_for_status()
def checks_match(a, b) -> bool:
if a['publish'] != b['publish']:
return False
if a['command'] != b['command']:
return False
if a['interval'] != b['interval']:
return False
if a['proxy_entity_name'] != b['proxy_entity_name']:
return False
if a['round_robin'] != b['round_robin']:
return False
if a['output_metric_format'] != b['output_metric_format']:
return False
if sorted(a['subscriptions']) != sorted(b['subscriptions']):
return False
if sorted(a['output_metric_handlers']) \
!= sorted(b['output_metric_handlers']):
return False
if a['metadata']['name'] != b['metadata']['name']:
return False
if a['metadata']['namespace'] != b['metadata']['namespace']:
return False
return True
class AbstractCheck(object):
"""
not explicitly using abc.ABC ... more readable than stacks of decorators
"""
INTERVAL_S = 300
SUBSCRIPTIONS = ['interfacecounters']
METRIC_FORMAT = 'influxdb_line'
METRIC_HANDLERS = ['influx-db-handler']
NAMESPACE = 'default'
CHECK_DICT_SCHEMA = {
# for unit tests
'$schema': 'http://json-schema.org/draft-07/schema#',
'definitions': {
'metadata': {
'type': 'object',
'properties': {
'name': {'type': 'string'},
'namespace': {'type': 'string'}
},
'required': ['name', 'namespace']
}
},
'type': 'object',
'properties': {
'command': {'type': 'string'},
'interval': {'type': 'integer'},
'subscriptions': {
'type': 'array',
'items': {'type': 'string'},
'minItems': 1
},
'proxy_entity_name': {'type': 'string'},
'round_robin': {'type': 'boolean'},
'output_metric_format': {'type': 'string'},
'output_metric_handlers': {
'type': 'array',
'items': {'type': 'string'},
'minItems': 1
},
'metadata': {'$ref': '#/definitions/metadata'},
'publish': {'type': 'boolean'}
},
'required': [
'command', 'interval', 'subscriptions',
'proxy_entity_name', 'round_robin',
'output_metric_format', 'output_metric_handlers',
'metadata', 'publish']
}
def __init__(self):
self.publish = True
self.round_robin = True
self.interval = AbstractCheck.INTERVAL_S
self.subscriptions = copy.copy(AbstractCheck.SUBSCRIPTIONS)
self.output_metric_handlers = copy.copy(AbstractCheck.METRIC_HANDLERS)
self.output_metric_format = AbstractCheck.METRIC_FORMAT
self.namespace = AbstractCheck.NAMESPACE
@property
def name(self):
# overridden implementation must return a string
assert False, 'property StandardCheck.name must be overridden'
@property
def command(self):
# overridden implementation must return a string
assert False, 'property StandardCheck.command must be overridden'
@property
def proxy_entity_name(self):
# overridden implementation must return a string
assert False, \
'property StandardCheck.proxy_entity_name must be overridden'
def to_dict(self):
return {
'command': self.command,
'interval': self.interval,
'subscriptions': sorted(self.subscriptions),
'proxy_entity_name': self.proxy_entity_name,
'round_robin': self.round_robin,
'output_metric_format': self.output_metric_format,
'output_metric_handlers': sorted(self.output_metric_handlers),
'metadata': {
'name': self.name,
'namespace': self.namespace
},
'publish': self.publish
}
@staticmethod
def match_check_dicts(a, b) -> bool:
if a['publish'] != b['publish']:
return False
if a['command'] != b['command']:
return False
if a['interval'] != b['interval']:
return False
if a['proxy_entity_name'] != b['proxy_entity_name']:
return False
if a['round_robin'] != b['round_robin']:
return False
if a['output_metric_format'] != b['output_metric_format']:
return False
if sorted(a['subscriptions']) != sorted(b['subscriptions']):
return False
if sorted(a['output_metric_handlers']) \
!= sorted(b['output_metric_handlers']):
return False
if a['metadata']['name'] != b['metadata']['name']:
return False
if a['metadata']['namespace'] != b['metadata']['namespace']:
return False
return True
def refresh(sensu_params, required_checks, current_checks):
"""
update any current_checks that are not present in required_checks
remove any extras
:param sensu_params:
:param required_checks: list of AbstractCheck instances
:param current_checks: dict of {name:check_dict} from sensu
:return: dict with change counts
"""
# cf. main.REFRESH_RESULT_SCHEMA
result = {
'checks': len(current_checks),
'input': len(required_checks),
'created': 0,
'updated': 0,
'deleted': 0
}
for expected_check in required_checks:
if expected_check.name not in current_checks:
create_check(sensu_params, expected_check.to_dict())
result['created'] += 1
elif not AbstractCheck.match_check_dicts(
current_checks[expected_check.name],
expected_check.to_dict()):
update_check(sensu_params, expected_check.to_dict())
result['updated'] += 1
wanted_checks = {check.name for check in required_checks}
extra_checks = set(current_checks.keys()) - wanted_checks
for name in extra_checks:
delete_check(sensu_params, name)
result['deleted'] = len(extra_checks)
return result