Skip to content
Snippets Groups Projects
test_worker_utils.py 5.90 KiB
"""
tests of a few worker utilities
"""
import copy
import ipaddress
import json
import re

import jsonschema

from inventory_provider.tasks import worker
from inventory_provider.tasks import common
from inventory_provider.routes import msr
from inventory_provider import config


def backend_db():
    return common._get_redis({
        'redis': {
            'hostname': None,
            'port': None
        },
        'redis-databases': [0, 7]
    }).db


def test_build_subnet_db(mocked_worker_module, data_config_filename):
    """
    Verify that valid reverse subnet objects are created.

    :param mocked_worker_module: fixture
    """

    address_schema = {
        '$schema': 'http://json-schema.org/draft-07/schema#',

        'definitions': {
            'interface': {
                'type': 'object',
                'properties': {
                    'name': {'type': 'string'},
                    'interface address': {'type': 'string'},
                    'interface name': {'type': 'string'},
                    'router': {'type': 'string'}
                },
                'required': ['name', 'interface address',
                             'interface name', 'router'],
                'additionalProperties': False
            }
        },

        'type': 'array',
        'items': {"$ref": "#/definitions/interface"},
    }

    all_subnet_interfaces = set()
    unmanaged_interfaces = set()
    with open(data_config_filename) as f:
        params = config.load(f)
        for ifc in params.get('unmanaged-interfaces', []):
            ifc_key = (f'{ifc["router"].lower()}'
                       f':{ifc["interface"].lower()}'
                       f':{ifc["network"]}')
            unmanaged_interfaces.add(ifc_key)

    db = backend_db()  # also forces initialization

    def _x(k):
        return k.startswith('subnets:')

    for k in list(filter(_x, db.keys())):
        del db[k]

    worker._build_subnet_db()

    found_record = False
    for key, value in db.items():

        if not _x(key):
            continue

        found_record = True

        m = re.match('^subnets:(.+)', key)
        assert m
        address = m.group(1)

        value = json.loads(value)
        jsonschema.validate(value, address_schema)

        for ifc in value:
            assert ifc['interface address'] == address

        ifc_key = (f'{ifc["router"]}'
                   f':{ifc["interface name"]}'
                   f':{ifc["interface address"]}')

        all_subnet_interfaces.add(ifc_key)

    assert found_record

    assert unmanaged_interfaces <= all_subnet_interfaces


def test_build_juniper_peering_db(mocked_worker_module):
    """
    Verify that valid juniper peering db objects are created.

    :param mocked_worker_module: fixture
    """
    logical_system_peering_list_schema = copy.deepcopy(msr.PEERING_LIST_SCHEMA)
    logical_system_peering_list_schema[
        'definitions']['peering-instance']['required'].append('logical-system')

    db = backend_db()  # also forces initialization

    # remove the juniper-peerings:* items that
    # will be created by _build_juniper_peering_db
    def _x(k):
        if not k.startswith('juniper-peerings'):
            return False
        if k.startswith('juniper-peerings:hosts:'):
            return False
        return True

    for k in list(filter(_x, db.keys())):
        del db[k]

    worker._build_juniper_peering_db()

    found_record = False
    found_logical_system = False
    found_group = False
    for key, value in db.items():

        if not _x(key):
            continue

        value = json.loads(value)
        assert value

        found_record = True

        if key.startswith('juniper-peerings:ix-groups:'):
            for address in value:
                canonical = ipaddress.ip_interface(address).ip.exploded
                assert address == canonical
            continue

        jsonschema.validate(value, msr.PEERING_LIST_SCHEMA)

        if 'logical-system:' in key:
            jsonschema.validate(value, logical_system_peering_list_schema)
            m = re.match(r'.*logical-system:(.+)$', key)
            assert all(p['logical-system'] == m.group(1) for p in value)
            found_logical_system = True

        if 'group:' in key:
            m = re.match(r'.*group:(.+)$', key)
            assert all(p['group'] == m.group(1) for p in value)
            found_group = True

    assert found_record
    assert found_logical_system
    assert found_group


def test_build_snmp_peering_db(mocked_worker_module):
    """
    Verify that valid snmp peering db objects are .

    :param mocked_worker_module: fixture
    """
    peering_list_schema = {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "definitions": {
            "peering": {
                "type": "object",
                "properties": {
                    "local": {"type": "string"},
                    "remote": {"type": "string"},
                    "oid": {"type": "string"},
                    "community": {"type": "string"},
                    "hostname": {"type": "string"}
                },
                "required": ["local", "remote",
                             "oid", "community",
                             "hostname"],
                "additionalProperties": False
            }
        },
        "type": "array",
        "items": {"$ref": "#/definitions/peering"}
    }

    db = backend_db()  # also forces initialization

    def _x(k):
        if not k.startswith('snmp-peerings'):
            return False
        if k.startswith('snmp-peerings:hosts:'):
            return False
        return True

    for k in list(filter(_x, db.keys())):
        del db[k]

    worker._build_snmp_peering_db()

    found_record = False
    for key, value in db.items():

        if not _x(key):
            continue

        value = json.loads(value)
        jsonschema.validate(value, peering_list_schema)
        assert value
        found_record = True

    assert found_record