Skip to content
Snippets Groups Projects
Select Git revision
  • 3fde4f8d59ef7478f9dcf472f9e58abdfdbca9c5
  • python3 default protected
  • feature/exabgp_support2
  • feature/exabgp_support2.bgpextcommunity
  • feature/exabgp_support2.django4.2
  • fix/existingcheck_honor_fragtype
  • feature/python3-authz_netmask
  • feature/authz_netmask
  • fix/wrong_ratelimit_stats
  • feature/requirements_version_update2024-01
  • feature/split_celery
  • feature/improved-warning-mails
  • fix/reenable_expireset_via_restapi
  • feature/admin_user_delete_with_owned_rule_reassigning1
  • feature/admin_user_delete_with_owned_rule_reassigning
  • feature/branded_doc
  • fix/forked_snmp_polling_worker_exit_issue
  • fix/false_user_activation_error
  • feature/exabgp_with_docker-compose
  • fix/prefix_overlap_handling
  • fix/js_security_issues-a
  • save1
  • rpm-1.5-7
  • working1
  • myv1.6
  • t12b1
  • v1.5_newnew2
  • merged_final
  • v1.5_newnew
  • startstop_old
  • myadd2
  • tomas3
  • merge_jra2t6_and_RESTAPI
  • mytomas2
  • mynew1
  • new_jra2t6
  • v1.5_final
  • fod16_ruleroutes-merged_old
  • merged_new
  • v1.6_new_old
  • v1.5_new_old_follower
41 results

tasks.py

Blame
  • gws_direct.py 1.93 KiB
    import binascii
    import hashlib
    import json
    
    from brian_polling_manager import sensu
    
    
    def load_gws_direct_checks(sensu_params):
        def _is_gws_direct_check(check):
            name = check['metadata']['name']
            return name.startswith('gwsd')
        ifc_checks = filter(
            _is_gws_direct_check, sensu.load_all_checks(sensu_params))
        return {c['metadata']['name']: c for c in ifc_checks}
    
    
    class GwSDirectInterfaceCheck(sensu.AbstractCheck):
    
        def __init__(self, ifc_check_params, interface):
            super().__init__()
            self.ifc_check_params = ifc_check_params
            self.interface = interface
    
        @sensu.AbstractCheck.name.getter
        def name(self):
            ifc_str = json.dumps(self.interface, indent=2, sort_keys=True)
            m = hashlib.sha256()
            m.update(ifc_str.encode('utf-8'))
            check_id = binascii.b2a_hex(m.digest()).decode('utf-8')
            isp = self.interface['isp']
            isp = isp.replace(' ', '_')
            return f'gwsd-{self.interface["nren"]}-{isp}-{check_id.upper()[-8:]}'
    
        @sensu.AbstractCheck.command.getter
        def command(self):
            isp = self.interface["isp"]
            if ' ' in isp:
                isp = f'"{self.interface["isp"]}"'
    
            return self.ifc_check_params['command'].format(
                script=self.ifc_check_params['script'],
                measurement=self.ifc_check_params['measurement'],
                hostname=self.interface['hostname'],
                isp=isp,
                nren=self.interface['nren'],
                tag=self.interface['tag'])
    
        @sensu.AbstractCheck.proxy_entity_name.getter
        def proxy_entity_name(self):
            return self.interface['hostname']
    
    
    def refresh(sensu_params, inventory_interfaces):
    
        required_checks = [
            GwSDirectInterfaceCheck(
                sensu_params['gws-direct-interface-check'], ifc)
            for ifc in inventory_interfaces]
    
        return sensu.refresh(
            sensu_params,
            required_checks,
            load_gws_direct_checks(sensu_params))