Skip to content
Snippets Groups Projects
test_worker_utils.py 1.46 KiB
"""
tests of a few worker utilities
"""
import json
import re

import jsonschema

from inventory_provider.tasks import worker
from inventory_provider.tasks import common


def backend_db():
    return common._get_redis({
        'redis': {
            'hostname': None,
            'port': None
        },
        'redis-databases': [0, 7]
    }).db


def test_build_subnet_db(mocked_worker_module):
    """
    checks that valid reverse subnet objects are created
    :param mocked_worker_module: fixture
    :return:
    """

    address_schema = {
        '$schema': 'http://json-schema.org/draft-07/schema#',

        'type': 'object',
        'properties': {
            'name': {'type': 'string'},
            'interface address': {'type': 'string'},
            'interface name': {'type': 'string'},
            'router': {'type': 'string'}
        },
        'required': ['name', 'interface address', 'interface name', 'router'],
        'additionalProperties': False
    }

    db = backend_db()  # also forces initialization
    worker._build_subnet_db()

    found_record = False
    for key, value in db.items():

        if not key.startswith('reverse_interface_addresses:'):
            continue

        found_record = True

        m = re.match('^reverse_interface_addresses:(.+)', key)
        assert m
        address = m.group(1)

        value = json.loads(value)
        jsonschema.validate(value, address_schema)
        assert value['name'] == address

    assert found_record