-
Robert Latta authoredRobert Latta authored
test_worker_utils.py 5.90 KiB
"""
tests of a few worker utilities
"""
import copy
import ipaddress
import json
import re
import jsonschema
from inventory_provider.tasks import worker
from inventory_provider.tasks import common
from inventory_provider.routes import msr
from inventory_provider import config
def backend_db():
return common._get_redis({
'redis': {
'hostname': None,
'port': None
},
'redis-databases': [0, 7]
}).db
def test_build_subnet_db(mocked_worker_module, data_config_filename):
"""
Verify that valid reverse subnet objects are created.
:param mocked_worker_module: fixture
"""
address_schema = {
'$schema': 'http://json-schema.org/draft-07/schema#',
'definitions': {
'interface': {
'type': 'object',
'properties': {
'name': {'type': 'string'},
'interface address': {'type': 'string'},
'interface name': {'type': 'string'},
'router': {'type': 'string'}
},
'required': ['name', 'interface address',
'interface name', 'router'],
'additionalProperties': False
}
},
'type': 'array',
'items': {"$ref": "#/definitions/interface"},
}
all_subnet_interfaces = set()
unmanaged_interfaces = set()
with open(data_config_filename) as f:
params = config.load(f)
for ifc in params.get('unmanaged-interfaces', []):
ifc_key = (f'{ifc["router"].lower()}'
f':{ifc["interface"].lower()}'
f':{ifc["network"]}')
unmanaged_interfaces.add(ifc_key)
db = backend_db() # also forces initialization
def _x(k):
return k.startswith('subnets:')
for k in list(filter(_x, db.keys())):
del db[k]
worker._build_subnet_db()
found_record = False
for key, value in db.items():
if not _x(key):
continue
found_record = True
m = re.match('^subnets:(.+)', key)
assert m
address = m.group(1)
value = json.loads(value)
jsonschema.validate(value, address_schema)
for ifc in value:
assert ifc['interface address'] == address
ifc_key = (f'{ifc["router"]}'
f':{ifc["interface name"]}'
f':{ifc["interface address"]}')
all_subnet_interfaces.add(ifc_key)
assert found_record
assert unmanaged_interfaces <= all_subnet_interfaces
def test_build_juniper_peering_db(mocked_worker_module):
"""
Verify that valid juniper peering db objects are created.
:param mocked_worker_module: fixture
"""
logical_system_peering_list_schema = copy.deepcopy(msr.PEERING_LIST_SCHEMA)
logical_system_peering_list_schema[
'definitions']['peering-instance']['required'].append('logical-system')
db = backend_db() # also forces initialization
# remove the juniper-peerings:* items that
# will be created by _build_juniper_peering_db
def _x(k):
if not k.startswith('juniper-peerings'):
return False
if k.startswith('juniper-peerings:hosts:'):
return False
return True
for k in list(filter(_x, db.keys())):
del db[k]
worker._build_juniper_peering_db()
found_record = False
found_logical_system = False
found_group = False
for key, value in db.items():
if not _x(key):
continue
value = json.loads(value)
assert value
found_record = True
if key.startswith('juniper-peerings:ix-groups:'):
for address in value:
canonical = ipaddress.ip_interface(address).ip.exploded
assert address == canonical
continue
jsonschema.validate(value, msr.PEERING_LIST_SCHEMA)
if 'logical-system:' in key:
jsonschema.validate(value, logical_system_peering_list_schema)
m = re.match(r'.*logical-system:(.+)$', key)
assert all(p['logical-system'] == m.group(1) for p in value)
found_logical_system = True
if 'group:' in key:
m = re.match(r'.*group:(.+)$', key)
assert all(p['group'] == m.group(1) for p in value)
found_group = True
assert found_record
assert found_logical_system
assert found_group
def test_build_snmp_peering_db(mocked_worker_module):
"""
Verify that valid snmp peering db objects are .
:param mocked_worker_module: fixture
"""
peering_list_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"definitions": {
"peering": {
"type": "object",
"properties": {
"local": {"type": "string"},
"remote": {"type": "string"},
"oid": {"type": "string"},
"community": {"type": "string"},
"hostname": {"type": "string"}
},
"required": ["local", "remote",
"oid", "community",
"hostname"],
"additionalProperties": False
}
},
"type": "array",
"items": {"$ref": "#/definitions/peering"}
}
db = backend_db() # also forces initialization
def _x(k):
if not k.startswith('snmp-peerings'):
return False
if k.startswith('snmp-peerings:hosts:'):
return False
return True
for k in list(filter(_x, db.keys())):
del db[k]
worker._build_snmp_peering_db()
found_record = False
for key, value in db.items():
if not _x(key):
continue
value = json.loads(value)
jsonschema.validate(value, peering_list_schema)
assert value
found_record = True
assert found_record