Skip to content
Snippets Groups Projects
Commit 372a3111 authored by Erik Reid's avatar Erik Reid
Browse files

additional tests of peering cache data structures

parent 6fb3c30c
No related branches found
No related tags found
No related merge requests found
"""
tests of a few worker utilities
"""
import ipaddress
import json
import re
......@@ -22,42 +23,206 @@ def backend_db():
def test_build_subnet_db(mocked_worker_module):
"""
checks that valid reverse subnet objects are created
Verify that valid reverse subnet objects are created.
:param mocked_worker_module: fixture
:return:
"""
address_schema = {
'$schema': 'http://json-schema.org/draft-07/schema#',
'type': 'object',
'properties': {
'name': {'type': 'string'},
'interface address': {'type': 'string'},
'interface name': {'type': 'string'},
'router': {'type': 'string'}
'definitions': {
'interface': {
'type': 'object',
'properties': {
'name': {'type': 'string'},
'interface address': {'type': 'string'},
'interface name': {'type': 'string'},
'router': {'type': 'string'}
},
'required': ['name', 'interface address', 'interface name', 'router'],
'additionalProperties': False
}
},
'required': ['name', 'interface address', 'interface name', 'router'],
'additionalProperties': False
'type': 'array',
'items': {"$ref": "#/definitions/interface"},
}
db = backend_db() # also forces initialization
def _x(k):
return k.startswith('subnets:')
for k in list(filter(_x, db.keys())):
del db[k]
worker._build_subnet_db()
found_record = False
for key, value in db.items():
if not key.startswith('reverse_interface_addresses:'):
if not _x(key):
continue
found_record = True
m = re.match('^reverse_interface_addresses:(.+)', key)
m = re.match('^subnets:(.+)', key)
assert m
address = m.group(1)
value = json.loads(value)
jsonschema.validate(value, address_schema)
assert value['name'] == address
for ifc in value:
assert ifc['interface address'] == address
assert found_record
def test_build_juniper_peering_db(mocked_worker_module):
"""
Verify that valid juniper peering db objects are created.
:param mocked_worker_module: fixture
"""
peering_list_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"definitions": {
"instance-peering": {
"type": "object",
"properties": {
"instance": {"type": "string"},
"group": {"type": "string"},
"description": {"type": "string"},
"address": {"type": "string"},
"remote-asn": {"type": "integer"},
"local-asn": {"type": "integer"},
"hostname": {"type": "string"}
},
# description and-or local-asn is not always present,
# just based on empirical tests - not a problem
"required": ["instance", "group",
"address", "remote-asn",
"hostname"],
"additionalProperties": False
},
"logical-system-peering": {
"type": "object",
"properties": {
"logical-system": {"type": "string"},
"group": {"type": "string"},
"description": {"type": "string"},
"address": {"type": "string"},
"remote-asn": {"type": "integer"},
"local-asn": {"type": "integer"},
"hostname": {"type": "string"}
},
# local/remote-asn and/or description are not always present,
# just based on empirical tests - not a problem
"required": ["logical-system", "group", "address", "hostname"],
"additionalProperties": False
},
"peering": {
"oneOf": [
{"$ref": "#/definitions/instance-peering"},
{"$ref": "#/definitions/logical-system-peering"}
]
}
},
"type": "array",
"items": {"$ref": "#/definitions/peering"}
}
db = backend_db() # also forces initialization
def _x(k):
if not k.startswith('juniper-peerings'):
return False
if k.startswith('juniper-peerings:hosts:'):
return False
return True
for k in list(filter(_x, db.keys())):
del db[k]
worker._build_juniper_peering_db()
found_record = False
for key, value in db.items():
if not _x(key):
continue
value = json.loads(value)
assert value
found_record = True
if key.startswith('juniper-peerings:ix-groups:'):
for address in value:
canonical = ipaddress.ip_interface(address).ip.exploded
assert address == canonical
continue
jsonschema.validate(value, peering_list_schema)
assert found_record
def test_build_snmp_peering_db(mocked_worker_module):
"""
Verify that valid snmp peering db objects are .
:param mocked_worker_module: fixture
"""
peering_list_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"definitions": {
"peering": {
"type": "object",
"properties": {
"local": {"type": "string"},
"remote": {"type": "string"},
"oid": {"type": "string"},
"community": {"type": "string"},
"hostname": {"type": "string"}
},
"required": ["local", "remote",
"oid", "community",
"hostname"],
"additionalProperties": False
}
},
"type": "array",
"items": {"$ref": "#/definitions/peering"}
}
db = backend_db() # also forces initialization
def _x(k):
if not k.startswith('snmp-peerings'):
return False
if k.startswith('snmp-peerings:hosts:'):
return False
return True
for k in list(filter(_x, db.keys())):
del db[k]
worker._build_snmp_peering_db()
found_record = False
for key, value in db.items():
if not _x(key):
continue
value = json.loads(value)
jsonschema.validate(value, peering_list_schema)
assert value
found_record = True
assert found_record
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment