import json import re import jsonschema import pytest from inventory_provider.routes import poller DEFAULT_REQUEST_HEADERS = { 'Accept': ['application/json'] } def test_get_all_interfaces(client): rv = client.get( '/poller/interfaces', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 assert rv.is_json response_data = json.loads(rv.data.decode('utf-8')) jsonschema.validate(response_data, poller.INTERFACE_LIST_SCHEMA) response_routers = {ifc['router'] for ifc in response_data} assert len(response_routers) > 1, \ 'there should data from be lots of routers' assert any('.lab.' in name for name in response_routers) def test_get_all_interfaces_no_lab(client): rv = client.get( '/poller/interfaces?no-lab=1', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 assert rv.is_json response_data = json.loads(rv.data.decode('utf-8')) jsonschema.validate(response_data, poller.INTERFACE_LIST_SCHEMA) response_routers = {ifc['router'] for ifc in response_data} assert len(response_routers) > 1, \ 'there should data from be lots of routers' assert all('.lab.' not in name for name in response_routers) def test_get_interfaces_returns_only_relevant_interfaces(client): rv = client.get( "/poller/interfaces", headers=DEFAULT_REQUEST_HEADERS, ) assert rv.status_code == 200 assert rv.is_json response_data = set(ifc["name"] for ifc in rv.json) # This check for "good" interfaces is for juniper only. Either delete or update # once nokia routers are included in the test data assert all( ifc for ifc in response_data if re.match(r"^(xe|ge|et|ae|irb|gr).+", ifc) ) assert not any( ifc for ifc in response_data if re.match(r"^(lt-|so-|dsc\.|fxp\d|lo\d).*", ifc) ) def test_all_router_interface_speeds(client): rv = client.get( '/poller/speeds', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 response = json.loads(rv.data.decode('utf-8')) jsonschema.validate(response, poller.INTERFACE_SPEED_LIST_SCHEMA) assert response # at least shouldn't be empty response_routers = {ifc['router'] for ifc in response} assert len(response_routers) > 1, \ 'there should data from be lots of routers' def test_eumetsat_multicast_all(mocker, client): # routers don't have snmp acl's for us mocker.patch('inventory_provider.juniper.snmp_community_string') \ .return_value = 'blah' rv = client.get( '/poller/eumetsat-multicast', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 assert rv.is_json response_data = json.loads(rv.data.decode('utf-8')) jsonschema.validate( response_data, poller.MULTICAST_SUBSCRIPTION_LIST_SCHEMA) assert response_data, "the subscription list shouldn't be empty" # only 'mx*' routers are returned, by default assert all([s['router'].startswith('mx') for s in response_data]) @pytest.mark.parametrize('hostname', [ 'mx1.ams.nl.geant.net', 'mx1.ams', 'qfx.fra.de.geant.net' # expect to be able to explicitly select others ]) def test_eumetsat_multicast_hostname(mocker, client, hostname): # routers don't have snmp acl's for us mocker.patch('inventory_provider.juniper.snmp_community_string') \ .return_value = 'blah' rv = client.get( f'/poller/eumetsat-multicast/{hostname}', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 assert rv.is_json response_data = json.loads(rv.data.decode('utf-8')) jsonschema.validate( response_data, poller.MULTICAST_SUBSCRIPTION_LIST_SCHEMA) assert response_data, "the subscription list shouldn't be empty" # only 'mx*' routers are returned, by default assert all([s['router'].startswith(hostname) for s in response_data]) def test_eumetsat_multicast_404(mocker, client): # routers don't have snmp acl's for us mocker.patch('inventory_provider.juniper.snmp_community_string') \ .return_value = 'blah' rv = client.get( '/poller/eumetsat-multicast/XYZ123', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 404 def test_eumetsat_multicast_forbidden(mocker, client): # routers don't have snmp acl's for us mocker.patch('inventory_provider.juniper.snmp_community_string') \ .return_value = '' rv = client.get( '/poller/eumetsat-multicast', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 403 def test_gws_direct(client): rv = client.get( '/poller/gws/direct', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 assert rv.is_json response_data = json.loads(rv.data.decode('utf-8')) jsonschema.validate(response_data, poller.GWS_DIRECT_DATA_SCHEMA) assert response_data, "the service list shouldn't be empty" def test_gws_indirect(client): rv = client.get( '/poller/gws/indirect', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 assert rv.is_json response_data = json.loads(rv.data.decode('utf-8')) jsonschema.validate(response_data, poller.SERVICES_LIST_SCHEMA) assert response_data # test data is non-empty assert all(s['type'] == 'GWS - INDIRECT' for s in response_data) def test_gws_indirect_snmp(client): # same as test_services_snmp, but also verify that # the snmp data in the response contains counters rv = client.get( '/poller/services/gws_indirect?snmp=1', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 assert rv.is_json response_data = json.loads(rv.data.decode('utf-8')) jsonschema.validate(response_data, poller.SERVICES_LIST_SCHEMA) assert response_data # sanity: test data is non-empty # all operational access services should have snmp info operational = list(filter( lambda s: s['status'] == 'operational', response_data)) assert operational # sanity: test data contains operational services assert all('snmp' in s for s in operational) assert all('counters' in s['snmp'] for s in operational) def test_get_services_default(client): rv = client.get( '/poller/services', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 assert rv.is_json response_data = json.loads(rv.data.decode('utf-8')) jsonschema.validate(response_data, poller.SERVICES_LIST_SCHEMA) assert response_data # test data is non-empty assert all('snmp' not in s for s in response_data) def test_get_services_operational(client): rv = client.get( '/poller/services?all=0', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 assert rv.is_json response_data = json.loads(rv.data.decode('utf-8')) jsonschema.validate(response_data, poller.SERVICES_LIST_SCHEMA) assert response_data # test data is non-empty assert all(s['status'] == 'operational' for s in response_data) @pytest.mark.parametrize('service_type', [ # services with these types should all have snmp configs 'gws_indirect', 'geant_ip']) # 'gws_indirect', 'geant_ip', 'l3_vpn']) # TODO: something changed, check def test_services_snmp(client, service_type): rv = client.get( f'/poller/services/{service_type}?snmp=1', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 assert rv.is_json response_data = json.loads(rv.data.decode('utf-8')) jsonschema.validate(response_data, poller.SERVICES_LIST_SCHEMA) assert response_data # sanity: test data is non-empty # operational services should have snmp info operational = list(filter( lambda s: s['status'] == 'operational', response_data)) assert operational # sanity: test data contains operational services assert all('snmp' in s for s in operational) @pytest.mark.parametrize('uri_type,expected_type', [ ('gws_indirect', 'GWS - INDIRECT'), ('gws_upstream', 'GWS - UPSTREAM'), ('geant_ip', 'GEANT IP'), ('geant_lambda', 'GEANT LAMBDA'), # ('gts', 'GTS'), # these are non-monitored (TODO: make a flag?) ('md_vpn_native', 'MD-VPN (NATIVE)'), ('l3_vpn', 'L3-VPN') ]) def test_all_services_by_type(client, uri_type, expected_type): rv = client.get( f'/poller/services/{uri_type}', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 assert rv.is_json response_data = json.loads(rv.data.decode('utf-8')) jsonschema.validate(response_data, poller.SERVICES_LIST_SCHEMA) assert response_data # test data is non-empty assert all(s['type'] == expected_type for s in response_data) def test_get_service_types(client): rv = client.get( '/poller/service-types', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 assert rv.is_json response_data = json.loads(rv.data.decode('utf-8')) jsonschema.validate(response_data, poller.STRING_LIST_SCHEMA) assert response_data # test data is non-empty # some hard-coded values we expect to see from the test data ... assert 'gws_indirect' in response_data assert 'geant_ip' in response_data @pytest.mark.parametrize('ifIndex,expected_oid', [ (595, '1.3.6.1.4.1.2636.3.6.2.1.5.595.1.6.100.119.115.45.105.110'), (607, '1.3.6.1.4.1.2636.3.6.2.1.5.607.1.6.100.119.115.45.105.110'), (1135, '1.3.6.1.4.1.2636.3.6.2.1.5.1135.1.6.100.119.115.45.105.110') ]) def test_dcu_oid_values(ifIndex, expected_oid): assert poller._jnx_dcu_byte_count_oid(ifIndex) == expected_oid @pytest.mark.parametrize('customer, interface_name, expected_oid', [ ('ASREN', 'ae17.333', '1.3.6.1.4.1.2636.3.5.2.1.5.29.110.114.101.110.95.73.65.83.95.65.83.82.69.78.95.79.85.84.45.97.101.49.55.46.51.51.51.45.111.18.68.87.83.45.111.117.116.45.97.101.49.55.46.51.51.51.45.111.2'), # noqa: E501 ('FCCN', 'ae10.333', '1.3.6.1.4.1.2636.3.5.2.1.5.28.110.114.101.110.95.73.65.83.95.70.67.67.78.95.79.85.84.45.97.101.49.48.46.51.51.51.45.111.18.68.87.83.45.111.117.116.45.97.101.49.48.46.51.51.51.45.111.2'), # noqa: E501 ('GRNET', 'ae11.333', '1.3.6.1.4.1.2636.3.5.2.1.5.29.110.114.101.110.95.73.65.83.95.71.82.78.69.84.95.79.85.84.45.97.101.49.49.46.51.51.51.45.111.18.68.87.83.45.111.117.116.45.97.101.49.49.46.51.51.51.45.111.2'), # noqa: #E501 ('GRNET', 'ae10.333', '1.3.6.1.4.1.2636.3.5.2.1.5.29.110.114.101.110.95.73.65.83.95.71.82.78.69.84.95.79.85.84.45.97.101.49.48.46.51.51.51.45.111.18.68.87.83.45.111.117.116.45.97.101.49.48.46.51.51.51.45.111.2'), # noqa: #E501 ('ULAKBIM', 'ae11.333', '1.3.6.1.4.1.2636.3.5.2.1.5.31.110.114.101.110.95.73.65.83.95.85.76.65.75.66.73.77.95.79.85.84.45.97.101.49.49.46.51.51.51.45.111.18.68.87.83.45.111.117.116.45.97.101.49.49.46.51.51.51.45.111.2'), # noqa: E501 ('UOM', 'xe-11/0/0.333', '1.3.6.1.4.1.2636.3.5.2.1.5.32.110.114.101.110.95.73.65.83.95.85.79.77.95.79.85.84.45.120.101.45.49.49.47.48.47.48.46.51.51.51.45.111.23.68.87.83.45.111.117.116.45.120.101.45.49.49.47.48.47.48.46.51.51.51.45.111.2'), # noqa: E501 ('LITNET', 'xe-0/1/1.333', '1.3.6.1.4.1.2636.3.5.2.1.5.34.110.114.101.110.95.73.65.83.95.76.73.84.78.69.84.95.79.85.84.45.120.101.45.48.47.49.47.49.46.51.51.51.45.111.22.68.87.83.45.111.117.116.45.120.101.45.48.47.49.47.49.46.51.51.51.45.111.2'), # noqa: E501 ]) def test_fw_counter_bytes_oid_values(customer, interface_name, expected_oid): assert poller._jnx_fw_counter_bytes_oid( customer, interface_name) == expected_oid @pytest.mark.parametrize('description,expected_dashboards', [ ('SRV_IAS CUSTOMER JISC #JISC-AP1-IAS IASPS | ASN786', ['IAS_CUSTOMER', 'NREN']), ('SRV_L2CIRCUIT CUSTOMER JISC JISC #DUB-LON-NRENBBEXT-JANET-13015 | backup for niran ', # noqa: E501 ['L2_CIRCUIT']), ('SRV_L3VPN CUSTOMER EENET #EENET-AP2-LHCONE | ASN3221', ['LHCONE', 'LHCONE_CUST', 'NREN']), ('SRV_IAS PRIVATE OPTIMA-TELEKOM #HR-EduZone | For Eduzone', ['IAS_PEERS', 'IAS_PRIVATE']), ('SRV_CLS PRIVATE AWS #AT-AWS-CLS|ASN16509 | ', ['CLS', 'CLS_PEERS']), ('SRV_IAS PUBLIC MIX #IX_Peerings_in_MIX |', ['IAS_PEERS', 'IAS_PUBLIC']), ('LAG INFRASTRUCTURE BACKBONE SRF0000001 | bil-por', ['INFRASTRUCTURE_BACKBONE']), ('SRV_GLOBAL INFRASTRUCTURE BACKBONE SRF0000001 | bil-por', ['INFRASTRUCTURE_BACKBONE']), ('SRV_GCS CUSTOMER FCCN MICROSOFT #FCCN_NoveSBE_ExpressRoute_Vlan1945 | UNIT CONFIGURATION HAS BEEN SYSTEM GENERATED', # noqa: E501 ['GCS']), ('PHY UPSTREAM TELIA SRF9940473 | Telia ID: IC-326863', ['GWS_PHY_UPSTREAM']), ('SRV_IAS UPSTREAM COGENT #COGENT_GWS_VIE | ASN174', ['IAS_UPSTREAM']), ('SRV_L3VPN RE_INTERCONNECT CLARA #REDCLARA-LIS-LHCONE | ASN27750', ['LHCONE', 'LHCONE_PEER', 'RE_PEER']), ('SRV_MDVPN CUSTOMER REDIRIS #RedIRIS_AP1_BGP_LU_CoC_1 | MD VPN CoC-REDIRIS - ', # noqa: E501 ['MDVPN_CUSTOMERS', 'NREN']), ('SRV_L2CIRCUIT CUSTOMER TENET PSNC #lon-lon-GEANTOPEN-PSNC-TENET-18067 |', # noqa: E501 ['L2_CIRCUIT']), ('PHY CUSTOMER_GEO TENET PSNC #lon-lon-GEANTOPEN-PSNC-TENET-18067 |', ['GEANTOPEN']), ('SRV_L3VPN RE_INTERCONNECT REDCLARA #REDCLARA-MAD-COPERNICUS | ASN27750', ['COPERNICUS', 'RE_PEER']), ('SRV_10GGBS CUSTOMER REDIRIS CERN #gen-mad-LHC-CERN-REDIRIS-07003 |', ['GBS_10G']) ]) def test_interface_dashboard_mapping(description, expected_dashboards): interface = { 'router': '', 'name': '', 'description': description } dashboards = poller._get_dashboards(interface) assert set(d.name for d in dashboards) == set(expected_dashboards) @pytest.mark.parametrize( "router, interface, exp_dashboards", [ ("mx1.lon.uk.geant.net", "ae12.123", ["CAE1"]), ("mx1.lon.uk.geant.net", "ae12", []), # POL1-704 ("rt1.mar.fr.geant.net", "ae12.123", ["IC1"]), # POL1_703 ("rt1.mar.fr.geant.net", "ae12", []), # POL1-704 ], ) def test_ae12_aggregate_dashboards(router, interface, exp_dashboards): interface = {"router": router, "name": interface, "description": ""} dashboards = poller._get_dashboards(interface) assert [d.name for d in dashboards] == exp_dashboards @pytest.mark.parametrize( "description, has_ana", [ ("PHY GA-1 ANA-100", True), ("PHY ANA-100", False), ("PHY GA-1", False), ("PHY", False), ], ) def test_ana_aggregate_dashboard(description, has_ana): interface = { "router": "some.router", "name": "ifc-1", "description": description, } dashboards = poller._get_dashboards(interface) expected = ["ANA"] if has_ana else [] assert [d.name for d in dashboards] == expected @pytest.mark.parametrize('interface,customers,dashboard_info', [ ({ 'description': 'SRV_IAS CUSTOMER JISC #JISC-AP1-IAS IASPS | ASN786', 'dashboards': ['IAS_CUSTOMER', 'NREN'] }, [{'name': 'JISC'}], {'name': 'JISC', 'interface_type': 'LOGICAL'}), ({ 'description': 'SRV_L2CIRCUIT CUSTOMER JISC JISC #DUB-LON-NRENBBEXT-JANET-13015 | backup for niran ', # noqa: E501 'dashboards': ['L2_CIRCUIT'] }, [{'name': 'JISC'}], {'name': 'JISC', 'interface_type': 'LOGICAL'}), ({ 'description': 'SRV_L3VPN CUSTOMER EENET #EENET-AP2-LHCONE | ASN3221', 'dashboards': ['LHCONE', 'LHCONE_CUST', 'NREN'] }, [{'name': 'EENET'}], {'name': 'EENET', 'interface_type': 'LOGICAL'}), ({ 'description': 'SRV_IAS PRIVATE OPTIMA-TELEKOM #HR-EduZone | For Eduzone', # noqa: E501 'dashboards': ['IAS_PEERS', 'IAS_PRIVATE'] }, [{'name': 'OPTIMA-TELEKOM'}], {'name': 'OPTIMA-TELEKOM', 'interface_type': 'LOGICAL'}), ({ 'description': 'SRV_CLS PRIVATE AWS #AT-AWS-CLS|ASN16509 | ', 'dashboards': ['CLS', 'CLS_PEERS'] }, [{'name': 'AWS'}], {'name': 'AWS', 'interface_type': 'LOGICAL'}), ({ 'description': 'SRV_IAS PUBLIC MIX #IX_Peerings_in_MIX |', 'dashboards': ['IAS_PEERS', 'IAS_PUBLIC'] }, [{'name': 'MIX'}], {'name': 'MIX', 'interface_type': 'LOGICAL'}), ({ 'description': 'LAG INFRASTRUCTURE BACKBONE SRF0000001 | bil-por', 'dashboards': ['INFRASTRUCTURE_BACKBONE'] }, [{'name': 'bil-por'}], {'name': 'bil-por', 'interface_type': 'AGGREGATE'}), # noqa: E501 ({ 'description': 'SRV_GLOBAL INFRASTRUCTURE BACKBONE SRF0000001 | bil-por', # noqa: E501 'dashboards': ['INFRASTRUCTURE_BACKBONE'] }, [{'name': 'bil-por'}], {'name': 'bil-por', 'interface_type': 'LOGICAL'}), # noqa: E501 ({ 'description': 'SRV_GCS CUSTOMER FCCN MICROSOFT #FCCN_NoveSBE_ExpressRoute_Vlan1945 | UNIT CONFIGURATION HAS BEEN SYSTEM GENERATED', # noqa: E501 'dashboards': ['GCS'] }, [{'name': 'FCCN'}], {'name': 'FCCN', 'interface_type': 'LOGICAL'}), ({ 'description': 'PHY UPSTREAM TELIA SRF9940473 | Telia ID: IC-326863', 'router': 'mx1.bud.hu.geant.net', 'dashboards': ['GWS_PHY_UPSTREAM'] }, [{'name': 'TELIA - BUD'}], {'name': 'TELIA - BUD', 'interface_type': 'PHYSICAL'}), # noqa: E501 ({ 'description': 'SRV_IAS UPSTREAM COGENT #COGENT_GWS_VIE | ASN174', 'dashboards': ['IAS_UPSTREAM'] }, [{'name': 'COGENT'}], {'name': 'COGENT', 'interface_type': 'LOGICAL'}), ({ 'description': 'SRV_L3VPN RE_INTERCONNECT CLARA #REDCLARA-LIS-LHCONE | ASN27750', # noqa: E501 'dashboards': ['LHCONE', 'LHCONE_PEER', 'RE_PEER'] }, [{'name': 'CLARA'}], {'name': 'CLARA', 'interface_type': 'LOGICAL'}), ({ 'description': 'SRV_MDVPN CUSTOMER REDIRIS #RedIRIS_AP1_BGP_LU_CoC_1 | MD VPN CoC-REDIRIS - ', # noqa: E501 'dashboards': ['MDVPN_CUSTOMERS', 'NREN'] }, [{'name': 'REDIRIS'}], {'name': 'REDIRIS', 'interface_type': 'LOGICAL'}), # noqa: E501 ({ 'description': 'SRV_L2CIRCUIT CUSTOMER TENET PSNC #lon-lon-GEANTOPEN-PSNC-TENET-18067 |', # noqa: E501 'dashboards': ['GEANTOPEN', 'L2_CIRCUIT'] }, [{'name': 'TENET'}, {'name': 'PSNC'}], {'name': 'TENET', 'interface_type': 'LOGICAL'}), # noqa: E501 ({ 'description': 'SRV_GLOBAL INFRASTRUCTURE BACKBONE SRF0000001', 'dashboards': ['INFRASTRUCTURE_BACKBONE'] }, [{'name': 'SRV_GLOBAL INFRASTRUCTURE BACKBONE SRF0000001'}], {'name': 'SRV_GLOBAL INFRASTRUCTURE BACKBONE SRF0000001', 'interface_type': 'LOGICAL'}), # noqa: E501 ({ 'description': 'SRV_MDVPN CUSTOMER', 'dashboards': ['MDVPN_CUSTOMERS', 'NREN'] }, [{'name': 'SRV_MDVPN CUSTOMER'}], {'name': 'SRV_MDVPN CUSTOMER', 'interface_type': 'LOGICAL'}), ({ 'description': 'SRV_MDVPN CUSTOMER', 'dashboards': ['MDVPN_CUSTOMERS', 'NREN'] }, [{'name': 'SRV_MDVPN CUSTOMER'}], {'name': 'SRV_MDVPN CUSTOMER', 'interface_type': 'LOGICAL'}), ({ 'description': 'SRV_L3VPN RE_INTERCONNECT REDCLARA #REDCLARA-MAD-COPERNICUS | ASN27750', # noqa: E501 'dashboards': ['COPERNICUS'] }, [{'name': 'REDCLARA'}], {'name': 'REDCLARA', 'interface_type': 'LOGICAL'}), # noqa: E501 ]) def test_description_dashboard_parsing( interface, customers, dashboard_info): updated = poller._get_dashboard_data(interface, customers) info = updated['dashboard_info'] assert info == dashboard_info dashboards_info = updated['dashboards_info'] expected_names = [c['name'] for c in customers] assert set(expected_names) == {d['name'] for d in dashboards_info} @pytest.mark.parametrize('description,expected_port_type', [ ('SRV_IAS CUSTOMER JISC #JISC-AP1-IAS IASPS | ASN786', 'UNKNOWN'), ('SRV_GLOBAL CUSTOMER RENATER #RENATER-AP1 $GS-00505 | ASN2200 |', 'SERVICE'), # noqa: E501 ('PHY CUSTOMER AZSCIENCENET SRF19095 $GA-01599 | GEANT-10G-Baku-CO-Interxion|', 'ACCESS') # noqa: E501 ]) def test_description_port_type_parsing( description, expected_port_type): _port_type = poller._get_port_type(description) assert _port_type == expected_port_type def test_gws_config_json(client): rv = client.get( '/poller/gws/direct-config', headers={'Accept': ['application/json']}) assert rv.status_code == 200 assert rv.is_json response_data = json.loads(rv.data.decode('utf-8')) # just a sanity check, no validation # ... for now, this isn't an important interface assert response_data def test_gws_config_html(client): rv = client.get( '/poller/gws/direct-config?format=html', headers={'Accept': ['text/html']}) assert rv.status_code == 200 response_data = rv.data.decode('utf-8') # just a sanity check, no validation # ... for now, this isn't an important interface assert response_data.endswith('</html>') def test_get_all_error_report_interfaces(client): rv = client.get('/poller/error-report-interfaces', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 assert rv.is_json response_data = json.loads(rv.data) jsonschema.validate(response_data, poller.ERROR_REPORT_INTERFACE_LIST_SCHEMA) response_routers = {ifc['router'] for ifc in response_data} assert len(response_routers) > 1, 'there should data from be lots of routers' def test_error_report_interfaces_has_no_logical_interfaces(client): rv = client.get("/poller/error-report-interfaces", headers=DEFAULT_REQUEST_HEADERS) assert not [i["name"] for i in rv.json if i["name"].endswith(".1")] def test_get_single_router_error_report_interfaces(client): rv = client.get( '/poller/error-report-interfaces/mx1.ams.nl.geant.net', headers=DEFAULT_REQUEST_HEADERS, ) assert rv.status_code == 200 assert rv.is_json response_data = json.loads(rv.data) jsonschema.validate(response_data, poller.ERROR_REPORT_INTERFACE_LIST_SCHEMA) assert {ifc['router'] for ifc in response_data} == {"mx1.ams.nl.geant.net"} def test_get_nren_regions(client): rv = client.get("/poller/regions", headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 assert rv.is_json response_data = json.loads(rv.data) jsonschema.validate(response_data, poller.NREN_REGION_LIST_SCHEMA) assert {nren_region['region'] for nren_region in response_data} # no values should be empty strings