test_ims_data.py 8.43 KiB
import json
import os
import jsonschema
import inventory_provider
from inventory_provider.db.ims import InventoryStatus
from inventory_provider.db.ims_data import lookup_lg_routers, \
get_node_locations, IMS_OPSDB_STATUS_MAP, \
get_port_id_services, get_port_details, \
get_circuit_hierarchy, NODE_LOCATION_SCHEMA
def _json_test_data(filename):
abs_filename = os.path.join(
os.path.dirname(__file__),
'data',
filename)
with open(abs_filename) as data:
return json.load(data)
def test_get_circuit_hierarchy(mocker):
ds = inventory_provider.db.ims.IMS(
'http://dummy_base', 'dummy_username', 'dummy_password')
se_data = _json_test_data('ims_circuit_hierarchy_data.json')
mocker.patch.object(
inventory_provider.db.ims.IMS,
'get_filtered_entities',
side_effect=[se_data, [
{'selection': 'IP PEERING - R&E'}
]]
)
res = list(get_circuit_hierarchy(ds))
assert ds.get_filtered_entities.call_count == 2
predicted = [
{
'id': 661591,
'name': 'UK_ORIENT',
'status': 'operational',
'product': 'IP PEERING - R&E',
'speed': 'ETHS',
'project': 'ORIENTPLUS',
'circuit-type': 'service',
'sub-circuits': [],
'carrier-circuits': [660461],
'customerid': 57773
},
{
'id': 660461,
'name': 'V-LAN_200_MX1.LON.UK_XE-0/0/1_',
'status': 'operational',
'product': 'ETHERNET',
'speed': 'V-LAN',
'project': 'ORIENTPLUS',
'circuit-type': 'circuit',
'sub-circuits': [661591],
'carrier-circuits': [668866],
'customerid': 57773
}
]
assert res == predicted
def test_get_port_details(mocker):
def _se(entity, y, step_count):
return _json_test_data(f'ims_{entity}_details_data.json')
mocker.patch.object(
inventory_provider.db.ims.IMS,
'get_all_entities',
side_effect=_se
)
ds = inventory_provider.db.ims.IMS(
'dummy_base', 'dummy_username', 'dummy_password')
res = list(get_port_details(ds))
assert ds.get_all_entities.call_count == 2
predicted = [
{
'port_id': 6417059,
'equipment_name': 'RT1.TAL.EE',
'interface_name': 'XE-0/1/7'
},
{
'port_id': 6417563,
'equipment_name': 'SW1.CH.OFFICE.GEANT.NET',
'interface_name': 'XE-0/2/0'
},
{
'port_id': 6420287,
'equipment_name': 'GEN01-DTNX10-1',
'interface_name': '1-A-1-L1'
},
{
'port_id': 222451,
'equipment_name': 'QFX.FRA.DE',
'interface_name': 'AE0'
},
{
'port_id': 222452,
'equipment_name': 'QFX.FRA.DE',
'interface_name': 'AE1'
},
{
'port_id': 225742,
'equipment_name': 'MX1.MAD.ES',
'interface_name': 'AE3.103'
}
]
assert res == predicted
def test_get_port_id_services(mocker):
d = _json_test_data('ims_port_id_services_data.json')
mocker.patch.object(
inventory_provider.db.ims.IMS,
'get_all_entities',
side_effect=[
[
{'id': 57658, 'name': 'ORG A'},
{'id': 57664, 'name': 'ORG B'},
{'id': 57640, 'name': 'ORG C'},
{'id': 57744, 'name': 'ETH'},
],
[
{'id': 3800, 'name': 'PRODUCT A'},
{'id': 3804, 'name': 'GEANT IP'},
{'id': 3810, 'name': 'GEANT PEERING'},
{'id': 3677, 'name': 'ETHERNET'},
]]
)
se = [
[
{'selection': 'GEANT IP'},
{'selection': 'GEANT PEERING'},
{'selection': 'PRODUCT A'}
]
]
se.extend(d)
mocker.patch.object(
inventory_provider.db.ims.IMS,
'get_filtered_entities',
side_effect=se
)
ds = inventory_provider.db.ims.IMS(
'http://dummy_base', 'dummy_username', 'dummy_password')
res = list(get_port_id_services(ds))
assert ds.get_all_entities.call_count == 2
assert ds.get_filtered_entities.call_count == 4
predicted = [
{
'id': 663060,
'name': 'ORG_A_AP2',
'status': 'operational',
'circuit_type': 'service',
'service_type': 'GEANT IP',
'project': 'ORG A',
'port_a_id': 224507,
'customerid': 57658
},
{
'id': 663104,
'name': 'ORG_B_AP2_IAS',
'status': 'operational',
'circuit_type': 'service',
'service_type': 'GEANT PEERING',
'project': 'ORG B',
'port_a_id': 224464,
'customerid': 57664
},
{
'id': 679324,
'name': 'AMSTERDAM-LONDON-100GBE-001(ETH)',
'status': 'operational',
'circuit_type': 'circuit',
'service_type': 'ETHERNET',
'project': 'ETH',
'port_a_id': 6423107,
'port_b_id': 6419340,
'customerid': 57744
},
{
'id': 679324,
'name': 'AMSTERDAM-LONDON-100GBE-001(ETH)',
'status': 'operational',
'circuit_type': 'circuit',
'service_type': 'ETHERNET',
'project': 'ETH',
'port_a_id': 6419340,
'port_b_id': 6423107,
'customerid': 57744
},
{
'id': 679324,
'name': 'AMSTERDAM-LONDON-100GBE-001(ETH)',
'status': 'operational',
'circuit_type': 'circuit',
'service_type': 'ETHERNET',
'project': 'ETH',
'port_a_id': 6423111,
'customerid': 57744
},
{
'id': 702560,
'name': 'VMPORT Circuit 1',
'status': 'operational',
'circuit_type': 'service',
'service_type': 'PRODUCT A',
'project': 'ORG C',
'port_a_id': 6419453,
'customerid': 57640
}
]
assert res == predicted
def test_lookup_lg_routers(mocker):
ims = mocker.patch('inventory_provider.db.ims.IMS')
ims.return_value.get_filtered_entities.return_value \
= _json_test_data('ims_lg_data.json')
ims.return_value.get_entity_by_id.return_value = {
'name': 'pop name',
'longitude': 'long',
'latitude': 'lat',
'city': {
'name': 'city name',
'country': {
'name': 'country name',
'abbreviation': 'country code'
}
},
'sitealiases': [
{
'aliasname': 'abbr'
}
]
}
ds = inventory_provider.db.ims.IMS(
'dummy_base', 'dummy_username', 'dummy_password')
res = list(lookup_lg_routers(ds))
ds.get_filtered_entities.assert_called_once_with(
'EquipmentDefinition',
'Name like MX',
inventory_provider.db.ims.EQUIP_DEF_PROPERTIES['Nodes'])
assert ds.get_entity_by_id.call_count == 36
assert len(res) == 36
pop = {
'name': 'pop name',
'city': 'city name',
'country': 'country name',
'country code': 'country code',
'abbreviation': 'abbr',
'longitude': 'long',
'latitude': 'lat'
}
assert res[0] == {
'equipment name': 'MX3.LAB.OFFICE.GEANT.NET',
'type': 'CORE',
'pop': pop
}
def test_get_node_location(mocker):
ims = mocker.patch('inventory_provider.db.ims.IMS')
resp_data = _json_test_data('ims_nodes_data.json')
ims.return_value.get_all_entities.return_value = resp_data
ds = inventory_provider.db.ims.IMS(
'dummy_base', 'dummy_username', 'dummy_password')
res = list(get_node_locations(ds))
for name, node in res:
assert isinstance(name, str)
jsonschema.validate(node, NODE_LOCATION_SCHEMA)
assert len(res) == 36
assert res[0] == ('LON3_CX_01', {
'equipment-name': 'LON3_CX_01',
'status': IMS_OPSDB_STATUS_MAP[InventoryStatus.IN_SERVICE],
'pop': {
'name': 'LONDON 3 POWERGATE',
'city': 'LONDON',
'country': 'UNITED KINGDOM',
'abbreviation': 'LON3',
'longitude': -0.257712,
'latitude': 51.5308142,
}
})