-
Robert Latta authoredRobert Latta authored
test_ims_data.py 11.23 KiB
import json
import inventory_provider
from inventory_provider.db.ims import InventoryStatus
from inventory_provider.db.ims_data import lookup_lg_routers, \
otrs_get_customer_company_rows, \
otrs_get_customer_users_rows, get_node_locations, IMS_OPSDB_STATUS_MAP, \
get_port_id_services, get_port_details, \
get_circuit_hierarchy
def test_get_circuit_hierarchy(mocker):
ds = inventory_provider.db.ims.IMS(
'http://dummy_base', 'dummy_username', 'dummy_password')
with open('test/data/ims_circuit_hierarchy_data.json') as data:
se_data = json.load(data)
mocker.patch.object(
inventory_provider.db.ims.IMS,
'get_filtered_entities',
side_effect=[se_data, [
{'selection': 'IP PEERING - R&E'}
]]
)
res = list(get_circuit_hierarchy(ds))
assert ds.get_filtered_entities.call_count == 2
predicted = [
{
'id': 661591,
'name': 'UK_ORIENT',
'status': 'operational',
'product': 'IP PEERING - R&E',
'speed': 'ETHS',
'project': 'ORIENTPLUS',
'circuit-type': 'service',
'sub-circuits': [],
'carrier-circuits': [660461],
'customerid': 57773
},
{
'id': 660461,
'name': 'V-LAN_200_MX1.LON.UK_XE-0/0/1_',
'status': 'operational',
'product': 'ETHERNET',
'speed': 'V-LAN',
'project': 'ORIENTPLUS',
'circuit-type': 'circuit',
'sub-circuits': [661591],
'carrier-circuits': [668866],
'customerid': 57773
}
]
assert res == predicted
def test_get_port_details(mocker):
def _se(entity, y, step_count):
with open(f'test/data/ims_{entity}_details_data.json') as data:
return json.load(data)
mocker.patch.object(
inventory_provider.db.ims.IMS,
'get_all_entities',
side_effect=_se
)
ds = inventory_provider.db.ims.IMS(
'dummy_base', 'dummy_username', 'dummy_password')
res = list(get_port_details(ds))
assert ds.get_all_entities.call_count == 2
predicted = [
{
'port_id': 6417059,
'equipment_name': 'RT1.TAL.EE',
'interface_name': 'XE-0/1/7'
},
{
'port_id': 6417563,
'equipment_name': 'SW1.CH.OFFICE.GEANT.NET',
'interface_name': 'XE-0/2/0'
},
{
'port_id': 6420287,
'equipment_name': 'GEN01-DTNX10-1',
'interface_name': '1-A-1-L1'
},
{
'port_id': 222451,
'equipment_name': 'QFX.FRA.DE',
'interface_name': 'AE0'
},
{
'port_id': 222452,
'equipment_name': 'QFX.FRA.DE',
'interface_name': 'AE1'
},
{
'port_id': 225742,
'equipment_name': 'MX1.MAD.ES',
'interface_name': 'AE3.103'
}
]
assert res == predicted
def test_get_port_id_services(mocker):
with open('test/data/ims_port_id_services_data.json') as data:
d = json.load(data)
mocker.patch.object(
inventory_provider.db.ims.IMS,
'get_all_entities',
side_effect=[
[
{'id': 57658, 'name': 'ORG A'},
{'id': 57664, 'name': 'ORG B'},
{'id': 57640, 'name': 'ORG C'},
{'id': 57744, 'name': 'ETH'},
],
[
{'id': 3800, 'name': 'PRODUCT A'},
{'id': 3804, 'name': 'GEANT IP'},
{'id': 3810, 'name': 'GEANT PEERING'},
{'id': 3677, 'name': 'ETHERNET'},
]]
)
se = [
[
{'selection': 'GEANT IP'},
{'selection': 'GEANT PEERING'},
{'selection': 'PRODUCT A'}
]
]
se.extend(d)
mocker.patch.object(
inventory_provider.db.ims.IMS,
'get_filtered_entities',
side_effect=se
)
ds = inventory_provider.db.ims.IMS(
'http://dummy_base', 'dummy_username', 'dummy_password')
res = list(get_port_id_services(ds))
assert ds.get_all_entities.call_count == 2
assert ds.get_filtered_entities.call_count == 4
predicted = [
{
'id': 663060,
'name': 'ORG_A_AP2',
'status': 'operational',
'circuit_type': 'service',
'service_type': 'GEANT IP',
'project': 'ORG A',
'port_a_id': 224507,
'customerid': 57658
},
{
'id': 663104,
'name': 'ORG_B_AP2_IAS',
'status': 'operational',
'circuit_type': 'service',
'service_type': 'GEANT PEERING',
'project': 'ORG B',
'port_a_id': 224464,
'customerid': 57664
},
{
'id': 679324,
'name': 'AMSTERDAM-LONDON-100GBE-001(ETH)',
'status': 'operational',
'circuit_type': 'circuit',
'service_type': 'ETHERNET',
'project': 'ETH',
'port_a_id': 6423107,
'port_b_id': 6419340,
'customerid': 57744
},
{
'id': 679324,
'name': 'AMSTERDAM-LONDON-100GBE-001(ETH)',
'status': 'operational',
'circuit_type': 'circuit',
'service_type': 'ETHERNET',
'project': 'ETH',
'port_a_id': 6419340,
'port_b_id': 6423107,
'customerid': 57744
},
{
'id': 679324,
'name': 'AMSTERDAM-LONDON-100GBE-001(ETH)',
'status': 'operational',
'circuit_type': 'circuit',
'service_type': 'ETHERNET',
'project': 'ETH',
'port_a_id': 6423111,
'customerid': 57744
},
{
'id': 702560,
'name': 'VMPORT Circuit 1',
'status': 'operational',
'circuit_type': 'service',
'service_type': 'PRODUCT A',
'project': 'ORG C',
'port_a_id': 6419453,
'customerid': 57640
}
]
assert res == predicted
def test_lookup_lg_routers(mocker):
ims = mocker.patch('inventory_provider.db.ims.IMS')
with open('test/data/ims_lg_data.json') as data:
ims.return_value.get_filtered_entities.return_value = json.load(data)
ims.return_value.get_entity_by_id.return_value = {
'name': 'pop name',
'longitude': 'long',
'latitude': 'lat',
'city': {
'name': 'city name',
'country': {
'name': 'country name',
'abbreviation': 'country code'
}
},
'sitealiases': [
{
'aliasname': 'abbr'
}
]
}
ds = inventory_provider.db.ims.IMS(
'dummy_base', 'dummy_username', 'dummy_password')
res = list(lookup_lg_routers(ds))
ds.get_filtered_entities.assert_called_once_with(
'EquipmentDefinition',
'Name like MX',
inventory_provider.db.ims.EQUIP_DEF_PROPERTIES['Nodes'])
assert ds.get_entity_by_id.call_count == 36
assert len(res) == 36
pop = {
'name': 'pop name',
'city': 'city name',
'country': 'country name',
'country code': 'country code',
'abbreviation': 'abbr',
'longitude': 'long',
'latitude': 'lat'
}
assert res[0] == {
'equipment name': 'MX3.LAB.OFFICE.GEANT.NET',
'type': 'CORE',
'pop': pop
}
def test_get_node_location(mocker):
ims = mocker.patch('inventory_provider.db.ims.IMS')
with open('test/data/ims_nodes_data.json') as data:
resp_data = json.load(data)
ims.return_value.get_all_entities.return_value = resp_data
ds = inventory_provider.db.ims.IMS(
'dummy_base', 'dummy_username', 'dummy_password')
res = list(get_node_locations(ds))
assert len(res) == 36
assert res[0] == ('LON3_CX_01', {
'equipment-name': 'LON3_CX_01',
'status': IMS_OPSDB_STATUS_MAP[InventoryStatus.IN_SERVICE],
'pop': {
'name': 'LONDON 3 POWERGATE',
'city': 'LONDON',
'country': 'UNITED KINGDOM',
'abbreviation': 'LON3',
'longitude': -0.257712,
'latitude': 51.5308142,
}
})
def test_otrs_get_customer_company_rows(mocker):
ims = mocker.patch('inventory_provider.db.ims.IMS')
with open('test/data/ims_otrs_customers.json') as data:
resp_data = json.load(data)
def se(*args, **kargs):
return resp_data[args[0]]
mocked_get_all_entities = ims.return_value.get_all_entities
mocked_get_all_entities.side_effect = se
ds = inventory_provider.db.ims.IMS(
'dummy_base', 'dummy_username', 'dummy_password')
cus_comp_rows = list(otrs_get_customer_company_rows(ds))
assert len(cus_comp_rows) == 6
mocked_get_all_entities.assert_any_call('Customer')
mocked_get_all_entities.assert_any_call('Vendor')
assert cus_comp_rows[0] == ['customer_id', 'name', 'street', 'zip',
'city', 'country', 'url', 'comments']
ids = []
names = []
for row in cus_comp_rows[1:]:
assert len(row) == 8
ids.append(row[0])
names.append(row[1])
assert ids == ['DUMMY1', 'DUMMY2', 'DUMMY3', 'DUMMY4', 'DUMMY5']
assert names == ['DUMMY 1', 'DUMMY 2', 'DUMMY 3', 'DUMMY 4', 'DUMMY 5']
def test_otrs_get_customer_users(mocker):
ims = mocker.patch('inventory_provider.db.ims.IMS')
resp_data = {}
with open('test/data/ims_otrs_customers_extra.json') as data:
resp_data['Customer'] = json.load(data)
with open('test/data/ims_otrs_vendor_contact_extra.json') as data:
resp_data['VendorRelatedContact'] = json.load(data)
def se(*args, **kargs):
return resp_data.get(args[0], [])
mocked_get_all_entities = ims.return_value.get_all_entities
mocked_get_all_entities.side_effect = se
mocked_get_all_entities = ims.return_value.get_all_entities
ds = inventory_provider.db.ims.IMS(
'dummy_base', 'dummy_username', 'dummy_password')
customer_users = list(otrs_get_customer_users_rows(ds))
mocked_get_all_entities.assert_any_call('Customer', [32768, 262144])
mocked_get_all_entities.assert_any_call('VendorRelatedContact', [8, 16])
assert customer_users[0] == [
'email',
'username',
'customer_id',
'customer_id_2',
'title',
'firstname',
'lastname',
'phone',
'fax',
'mobile',
'street',
'zip',
'city',
'country',
'comments'
]
assert len(customer_users) == 13
assert customer_users[1] == [
'BANOC@DUMMY2.COM', 'BANOC@DUMMY2.COM', 'DUMMY2', '', '',
'DUMMY 2 NOC', '-', '', '', '',
'', '', '', '', ''
]
assert customer_users[6] == [
'HNOC@DUMMY8.COM', 'HNOC@DUMMY8.COM', 'DUMMY8', 'OTRS-GEANT-NREN', '',
'H D_FIRST', 'H D_INFIX H D_LAST', '', '', '',
'', '', '', '', ''
]
assert customer_users[10] == [
'K@DUMMY10.FR', 'K@DUMMY10.FR', 'DUMMY10', '', '',
'K D_FIRST', 'K D_INFIX K D_LAST', '', '', '',
'', '', '', '', ''
]
assert customer_users[12][6] == 'M LAST'