eumetsat_multicast.py 1.47 KiB
from brian_polling_manager import sensu
def load_eumetsat_multicast_checks(sensu_params):
def _is_eumetsat_multicast_check(check):
name = check['metadata']['name']
return name.startswith('eumetmc')
ifc_checks = filter(
_is_eumetsat_multicast_check, sensu.load_all_checks(sensu_params))
return {c['metadata']['name']: c for c in ifc_checks}
class EUMETSATMulticastHostCheck(sensu.AbstractCheck):
def __init__(self, check_config, hostname):
super().__init__()
self.check_config = check_config
self.hostname = hostname
@sensu.AbstractCheck.name.getter
def name(self):
return f'eumetmc-{self.hostname}'
@sensu.AbstractCheck.command.getter
def command(self):
return self.check_config['command'].format(
script=self.check_config['script'],
measurement=self.check_config['measurement'],
hostname=self.hostname)
@sensu.AbstractCheck.proxy_entity_name.getter
def proxy_entity_name(self):
return self.hostname
def refresh(sensu_params, eumetsat_multicast_config):
# one check per unique host
all_routers = {x['router'] for x in eumetsat_multicast_config}
required_checks = [
EUMETSATMulticastHostCheck(
sensu_params['eumetsat-multicast-check'], hostname)
for hostname in all_routers]
return sensu.refresh(
sensu_params,
required_checks,
load_eumetsat_multicast_checks(sensu_params))