import json import jsonschema import pytest from inventory_provider.routes.classifier_schema \ import JUNIPER_LINK_RESPONSE_SCHEMA, PEER_INFO_RESPONSE_SCHEMA, \ INFINERA_LAMBDA_INFO_RESPONSE_SCHEMA, CORIANT_INFO_RESPONSE_SCHEMA, \ INFINERA_FIBERLINK_INFO_RESPONSE_SCHEMA, \ MTC_INTERFACE_INFO_RESPONSE_SCHEMA, TNMS_FIBERLINK_INFO_RESPONSE_SCHEMA DEFAULT_REQUEST_HEADERS = { "Content-type": "application/json", "Accept": ["application/json"] } def test_juniper_link_info(client): rv = client.get( '/classifier/juniper-link-info/mx1.ams.nl.geant.net/ae16.100', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 assert rv.is_json response_data = json.loads(rv.data.decode('utf-8')) jsonschema.validate(response_data, JUNIPER_LINK_RESPONSE_SCHEMA) def test_juniper_link_info_not_found(client): rv = client.get( '/classifier/juniper-link-info/' 'mx1.ams.nl.geant.net/unknown-interface-name', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 assert rv.is_json response_data = json.loads(rv.data.decode('utf-8')) jsonschema.validate(response_data, JUNIPER_LINK_RESPONSE_SCHEMA) assert response_data == { 'interface': { 'name': 'unknown-interface-name', 'description': '', 'ipv4': [], 'ipv6': [], 'bundle': [], 'bundle_members': [], 'speed': '' }, 'locations': [{ 'a': { 'equipment': 'MX1.AMS.NL', 'name': 'AMSTERDAM', 'abbreviation': 'AMS'} }] } def test_juniper_link_unknown_router(client): rv = client.get( '/classifier/juniper-link-info/' 'unknown-router/unknown-interface-name', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 assert rv.is_json response_data = json.loads(rv.data.decode('utf-8')) jsonschema.validate(response_data, JUNIPER_LINK_RESPONSE_SCHEMA) assert response_data == { 'interface': { 'name': 'unknown-interface-name', 'description': '', 'ipv4': [], 'ipv6': [], 'bundle': [], 'bundle_members': [], 'speed': '' }, 'locations': [] } VPN_RR_PEER_INFO_KEYS = {'vpn-rr-peer-info', 'locations', 'snmp'} IX_PUBLIC_PEER_INFO_KEYS = { 'ix-public-peer-info', 'interfaces', 'locations', 'snmp', 'asn'} @pytest.mark.parametrize('peer_address,expected_response_keys', [ # MDVPN, asn=13092 ('147.91.0.117', VPN_RR_PEER_INFO_KEYS | {'asn', 'contacts'}), # # MDVPN, no asn # ('62.40.96.18', VPN_RR_PEER_INFO_KEYS | {'interfaces'}), # ('2001:07f8:0036:0000:0000:3417:0000:0001', IX_PUBLIC_PEER_INFO_KEYS), # ('2001:07f8:0030:0000:0002:0002:0003:2934', IX_PUBLIC_PEER_INFO_KEYS), # ('195.66.227.154', IX_PUBLIC_PEER_INFO_KEYS), # ('149.29.9.9', {'interfaces', 'locations', 'snmp', 'asn'}), # ('62.40.125.142', {'interfaces', 'locations', 'snmp'}) ]) def test_peer_info( client, peer_address, expected_response_keys): rv = client.get( f'/classifier/peer-info/{peer_address}', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 assert rv.is_json response_data = json.loads(rv.data.decode('utf-8')) jsonschema.validate(response_data, PEER_INFO_RESPONSE_SCHEMA) assert set(response_data.keys()) == expected_response_keys def test_peer_invalid_address(client): rv = client.get( '/classifier/peer-info/1.2.3.4.5', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 422 def test_peer_not_found(client): rv = client.get( '/classifier/peer-info/1.2.3.4', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 response_data = json.loads(rv.data.decode('utf-8')) assert response_data == {'locations': [], "contacts": []} @pytest.mark.parametrize('equipment,entity_name,expected_card_id,expected_port,endpoint', [ ('grv3.ams.nl.geant.net', '1-3.3.1-Optical-10GbE-TTP', '1/3', '3.1', 'coriant-port-info'), ('grv3.ams.nl.geant.net', '1-3.3.1-Optical-10GbE-TTP', '1/3', '3.1', 'coriant-info'), ('grv3.ams.nl.geant.net', '1-1.3.1-100GbE-ODU4-TTP', '1/1', '3', 'coriant-tp-info'), ('bogus-hostname.with&special.char', '234-2345234.7878i234crazynamewithslash/1-2.3', '234/2345234', '7878', 'coriant-port-info') ]) def test_coriant_info( client, equipment, entity_name, expected_card_id, expected_port, endpoint): rv = client.get( f'/classifier/{endpoint}/{equipment}/{entity_name}', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 assert rv.is_json response_data = json.loads(rv.data.decode('utf-8')) jsonschema.validate(response_data, CORIANT_INFO_RESPONSE_SCHEMA) assert response_data['card id'] == expected_card_id assert response_data['port number'] == expected_port def test_coriant_info_not_found(client): """ just check the correct method is called, but mock out all sql access """ rv = client.get( '/classifier/coriant-info/aaa/unparseableentitystring', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 404 def test_mtc_info(client): rv = client.get( '/classifier/mtc-interface-info/MAD01-MTC6-1/1-A-1-S1-1', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 assert rv.is_json response_data = json.loads(rv.data.decode('utf-8')) assert response_data # schema seems wrong ... jsonschema.validate(response_data, MTC_INTERFACE_INFO_RESPONSE_SCHEMA) def test_infinera_unparseable_fiberlink(client): rv = client.get( '/classifier/infinera-fiberlink-info/unparseableentitystring/aaa', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 422 rv = client.get( '/classifier/infinera-fiberlink-info/XXX-OLA1-XXX02-DTNX10-1/aaa', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 422 def test_infinera_fiberlink_not_found(client): rv = client.get( '/classifier/infinera-fiberlink-info/' 'XXX-OLA1-XXX02-DTNX10-1/1-A-2-L1_3-A-2-L1', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 404 def test_infinera_fiberlink(client): rv = client.get( '/classifier/infinera-fiberlink-info/' 'OOS-OLA1-GHE-OLA1/1-A-3-L1_1-A-2-L1', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 assert rv.is_json response_data = json.loads(rv.data.decode('utf-8')) jsonschema.validate(response_data, INFINERA_FIBERLINK_INFO_RESPONSE_SCHEMA) def test_infinera_lambda(client): rv = client.get( '/classifier/infinera-lambda-info/' 'LON2-DTNX10-1/1-A-2-1-4/gen-lon3_LHC_CERN-JANET_09013', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 assert rv.is_json response_data = json.loads(rv.data.decode('utf-8')) jsonschema.validate(response_data, INFINERA_LAMBDA_INFO_RESPONSE_SCHEMA) def test_tnms_fibre_info(client): rv = client.get( '/classifier/tnms-fibre-info/SWENSZAB-MTC9-1/ZRCHSZ01-MTC9-1', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 assert rv.is_json response_data = json.loads(rv.data.decode('utf-8')) jsonschema.validate(response_data, TNMS_FIBERLINK_INFO_RESPONSE_SCHEMA)