Skip to content
Snippets Groups Projects
Select Git revision
  • 83c12d27c1ade5b10a2c4a7fd768c9db19cc8ce2
  • develop default
  • master protected
  • feature/DBOARD3-1156-move-whois/nren-naming-to-inventory-update
  • inventoryProvider-functional
  • inventoryProvider-morework2
  • circuit-service-details-fix
  • lookup-SPECTRUM-SCHF-ports
  • inventoryProvider-1267-cleanup
  • inventoryProvider-moreWork
  • feature/DBOARD3-958
  • release/0.110
  • fix-uuid-validation-error
  • docker-poc
  • 0.154
  • 0.153
  • 0.152
  • 0.151
  • 0.150
  • 0.149
  • 0.148
  • 0.147
  • 0.146
  • 0.145
  • 0.144
  • 0.143
  • 0.142
  • 0.141
  • 0.140
  • 0.139
  • 0.138
  • 0.137
  • 0.136
  • 0.135
34 results

test_worker_utils.py

Blame
  • test_worker_utils.py 5.90 KiB
    """
    tests of a few worker utilities
    """
    import copy
    import ipaddress
    import json
    import re
    
    import jsonschema
    
    from inventory_provider.tasks import worker
    from inventory_provider.tasks import common
    from inventory_provider.routes import msr
    from inventory_provider import config
    
    
    def backend_db():
        return common._get_redis({
            'redis': {
                'hostname': None,
                'port': None
            },
            'redis-databases': [0, 7]
        }).db
    
    
    def test_build_subnet_db(mocked_worker_module, data_config_filename):
        """
        Verify that valid reverse subnet objects are created.
    
        :param mocked_worker_module: fixture
        """
    
        address_schema = {
            '$schema': 'http://json-schema.org/draft-07/schema#',
    
            'definitions': {
                'interface': {
                    'type': 'object',
                    'properties': {
                        'name': {'type': 'string'},
                        'interface address': {'type': 'string'},
                        'interface name': {'type': 'string'},
                        'router': {'type': 'string'}
                    },
                    'required': ['name', 'interface address',
                                 'interface name', 'router'],
                    'additionalProperties': False
                }
            },
    
            'type': 'array',
            'items': {"$ref": "#/definitions/interface"},
        }
    
        all_subnet_interfaces = set()
        unmanaged_interfaces = set()
        with open(data_config_filename) as f:
            params = config.load(f)
            for ifc in params.get('unmanaged-interfaces', []):
                ifc_key = (f'{ifc["router"].lower()}'
                           f':{ifc["interface"].lower()}'
                           f':{ifc["network"]}')
                unmanaged_interfaces.add(ifc_key)
    
        db = backend_db()  # also forces initialization
    
        def _x(k):
            return k.startswith('subnets:')
    
        for k in list(filter(_x, db.keys())):
            del db[k]
    
        worker._build_subnet_db()
    
        found_record = False
        for key, value in db.items():
    
            if not _x(key):
                continue
    
            found_record = True
    
            m = re.match('^subnets:(.+)', key)
            assert m
            address = m.group(1)
    
            value = json.loads(value)
            jsonschema.validate(value, address_schema)
    
            for ifc in value:
                assert ifc['interface address'] == address
    
            ifc_key = (f'{ifc["router"]}'
                       f':{ifc["interface name"]}'
                       f':{ifc["interface address"]}')
    
            all_subnet_interfaces.add(ifc_key)
    
        assert found_record
    
        assert unmanaged_interfaces <= all_subnet_interfaces
    
    
    def test_build_juniper_peering_db(mocked_worker_module):
        """
        Verify that valid juniper peering db objects are created.
    
        :param mocked_worker_module: fixture
        """
        logical_system_peering_list_schema = copy.deepcopy(msr.PEERING_LIST_SCHEMA)
        logical_system_peering_list_schema[
            'definitions']['peering-instance']['required'].append('logical-system')
    
        db = backend_db()  # also forces initialization
    
        # remove the juniper-peerings:* items that
        # will be created by _build_juniper_peering_db
        def _x(k):
            if not k.startswith('juniper-peerings'):
                return False
            if k.startswith('juniper-peerings:hosts:'):
                return False
            return True
    
        for k in list(filter(_x, db.keys())):
            del db[k]
    
        worker._build_juniper_peering_db()
    
        found_record = False
        found_logical_system = False
        found_group = False
        for key, value in db.items():
    
            if not _x(key):
                continue
    
            value = json.loads(value)
            assert value
    
            found_record = True
    
            if key.startswith('juniper-peerings:ix-groups:'):
                for address in value:
                    canonical = ipaddress.ip_interface(address).ip.exploded
                    assert address == canonical
                continue
    
            jsonschema.validate(value, msr.PEERING_LIST_SCHEMA)
    
            if 'logical-system:' in key:
                jsonschema.validate(value, logical_system_peering_list_schema)
                m = re.match(r'.*logical-system:(.+)$', key)
                assert all(p['logical-system'] == m.group(1) for p in value)
                found_logical_system = True
    
            if 'group:' in key:
                m = re.match(r'.*group:(.+)$', key)
                assert all(p['group'] == m.group(1) for p in value)
                found_group = True
    
        assert found_record
        assert found_logical_system
        assert found_group
    
    
    def test_build_snmp_peering_db(mocked_worker_module):
        """
        Verify that valid snmp peering db objects are .
    
        :param mocked_worker_module: fixture
        """
        peering_list_schema = {
            "$schema": "http://json-schema.org/draft-07/schema#",
            "definitions": {
                "peering": {
                    "type": "object",
                    "properties": {
                        "local": {"type": "string"},
                        "remote": {"type": "string"},
                        "oid": {"type": "string"},
                        "community": {"type": "string"},
                        "hostname": {"type": "string"}
                    },
                    "required": ["local", "remote",
                                 "oid", "community",
                                 "hostname"],
                    "additionalProperties": False
                }
            },
            "type": "array",
            "items": {"$ref": "#/definitions/peering"}
        }
    
        db = backend_db()  # also forces initialization
    
        def _x(k):
            if not k.startswith('snmp-peerings'):
                return False
            if k.startswith('snmp-peerings:hosts:'):
                return False
            return True
    
        for k in list(filter(_x, db.keys())):
            del db[k]
    
        worker._build_snmp_peering_db()
    
        found_record = False
        for key, value in db.items():
    
            if not _x(key):
                continue
    
            value = json.loads(value)
            jsonschema.validate(value, peering_list_schema)
            assert value
            found_record = True
    
        assert found_record