""" tests of a few worker utilities """ import json import re import jsonschema from inventory_provider.tasks import worker from inventory_provider.tasks import common def backend_db(): return common._get_redis({ 'redis': { 'hostname': None, 'port': None }, 'redis-databases': [0, 7] }).db def test_build_subnet_db(mocked_worker_module): """ checks that valid reverse subnet objects are created :param mocked_worker_module: fixture :return: """ address_schema = { '$schema': 'http://json-schema.org/draft-07/schema#', 'type': 'object', 'properties': { 'name': {'type': 'string'}, 'interface address': {'type': 'string'}, 'interface name': {'type': 'string'}, 'router': {'type': 'string'} }, 'required': ['name', 'interface address', 'interface name', 'router'], 'additionalProperties': False } db = backend_db() # also forces initialization worker._build_subnet_db() found_record = False for key, value in db.items(): if not key.startswith('reverse_interface_addresses:'): continue found_record = True m = re.match('^reverse_interface_addresses:(.+)', key) assert m address = m.group(1) value = json.loads(value) jsonschema.validate(value, address_schema) assert value['name'] == address assert found_record