gws_direct.py 1.93 KiB
import binascii
import hashlib
import json
from brian_polling_manager import sensu
def load_gws_direct_checks(sensu_params):
def _is_gws_direct_check(check):
name = check['metadata']['name']
return name.startswith('gwsd')
ifc_checks = filter(
_is_gws_direct_check, sensu.load_all_checks(sensu_params))
return {c['metadata']['name']: c for c in ifc_checks}
class GwSDirectInterfaceCheck(sensu.AbstractCheck):
def __init__(self, ifc_check_params, interface):
super().__init__()
self.ifc_check_params = ifc_check_params
self.interface = interface
@sensu.AbstractCheck.name.getter
def name(self):
ifc_str = json.dumps(self.interface, indent=2, sort_keys=True)
m = hashlib.sha256()
m.update(ifc_str.encode('utf-8'))
check_id = binascii.b2a_hex(m.digest()).decode('utf-8')
isp = self.interface['isp']
isp = isp.replace(' ', '_')
return f'gwsd-{self.interface["nren"]}-{isp}-{check_id.upper()[-8:]}'
@sensu.AbstractCheck.command.getter
def command(self):
isp = self.interface["isp"]
if ' ' in isp:
isp = f'"{self.interface["isp"]}"'
return self.ifc_check_params['command'].format(
script=self.ifc_check_params['script'],
measurement=self.ifc_check_params['measurement'],
hostname=self.interface['hostname'],
isp=isp,
nren=self.interface['nren'],
tag=self.interface['tag'])
@sensu.AbstractCheck.proxy_entity_name.getter
def proxy_entity_name(self):
return self.interface['hostname']
def refresh(sensu_params, inventory_interfaces):
required_checks = [
GwSDirectInterfaceCheck(
sensu_params['gws-direct-interface-check'], ifc)
for ifc in inventory_interfaces]
return sensu.refresh(
sensu_params,
required_checks,
load_gws_direct_checks(sensu_params))