-
Sam Roberts authoredSam Roberts authored
test_general_poller_routes.py 21.89 KiB
import json
import re
import jsonschema
import pytest
from inventory_provider.routes import poller
DEFAULT_REQUEST_HEADERS = {
'Accept': ['application/json']
}
def test_get_all_interfaces(client):
rv = client.get(
'/poller/interfaces',
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 200
assert rv.is_json
response_data = json.loads(rv.data.decode('utf-8'))
jsonschema.validate(response_data, poller.INTERFACE_LIST_SCHEMA)
response_routers = {ifc['router'] for ifc in response_data}
assert len(response_routers) > 1, \
'there should data from be lots of routers'
assert any('.lab.' in name for name in response_routers)
def test_get_all_interfaces_no_lab(client):
rv = client.get(
'/poller/interfaces?no-lab=1',
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 200
assert rv.is_json
response_data = json.loads(rv.data.decode('utf-8'))
jsonschema.validate(response_data, poller.INTERFACE_LIST_SCHEMA)
response_routers = {ifc['router'] for ifc in response_data}
assert len(response_routers) > 1, \
'there should data from be lots of routers'
assert all('.lab.' not in name for name in response_routers)
def test_get_interfaces_returns_only_relevant_interfaces(client):
rv = client.get(
"/poller/interfaces",
headers=DEFAULT_REQUEST_HEADERS,
)
assert rv.status_code == 200
assert rv.is_json
response_data = set(ifc["name"] for ifc in rv.json)
# This check for "good" interfaces is for juniper only. Either delete or update
# once nokia routers are included in the test data
assert all(
ifc for ifc in response_data if re.match(r"^(xe|ge|et|ae|irb|gr).+", ifc)
)
assert not any(
ifc for ifc in response_data if re.match(r"^(lt-|so-|dsc\.|fxp\d|lo\d).*", ifc)
)
def test_all_router_interface_speeds(client):
rv = client.get(
'/poller/speeds',
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 200
response = json.loads(rv.data.decode('utf-8'))
jsonschema.validate(response, poller.INTERFACE_SPEED_LIST_SCHEMA)
assert response # at least shouldn't be empty
response_routers = {ifc['router'] for ifc in response}
assert len(response_routers) > 1, \
'there should data from be lots of routers'
def test_eumetsat_multicast_all(mocker, client):
# routers don't have snmp acl's for us
mocker.patch('inventory_provider.juniper.snmp_community_string') \
.return_value = 'blah'
rv = client.get(
'/poller/eumetsat-multicast',
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 200
assert rv.is_json
response_data = json.loads(rv.data.decode('utf-8'))
jsonschema.validate(
response_data, poller.MULTICAST_SUBSCRIPTION_LIST_SCHEMA)
assert response_data, "the subscription list shouldn't be empty"
# only 'mx*' routers are returned, by default
assert all([s['router'].startswith('mx') for s in response_data])
@pytest.mark.parametrize('hostname', [
'mx1.ams.nl.geant.net',
'mx1.ams',
'qfx.fra.de.geant.net' # expect to be able to explicitly select others
])
def test_eumetsat_multicast_hostname(mocker, client, hostname):
# routers don't have snmp acl's for us
mocker.patch('inventory_provider.juniper.snmp_community_string') \
.return_value = 'blah'
rv = client.get(
f'/poller/eumetsat-multicast/{hostname}',
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 200
assert rv.is_json
response_data = json.loads(rv.data.decode('utf-8'))
jsonschema.validate(
response_data, poller.MULTICAST_SUBSCRIPTION_LIST_SCHEMA)
assert response_data, "the subscription list shouldn't be empty"
# only 'mx*' routers are returned, by default
assert all([s['router'].startswith(hostname) for s in response_data])
def test_eumetsat_multicast_404(mocker, client):
# routers don't have snmp acl's for us
mocker.patch('inventory_provider.juniper.snmp_community_string') \
.return_value = 'blah'
rv = client.get(
'/poller/eumetsat-multicast/XYZ123',
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 404
def test_eumetsat_multicast_forbidden(mocker, client):
# routers don't have snmp acl's for us
mocker.patch('inventory_provider.juniper.snmp_community_string') \
.return_value = ''
rv = client.get(
'/poller/eumetsat-multicast',
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 403
def test_gws_direct(client):
rv = client.get(
'/poller/gws/direct',
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 200
assert rv.is_json
response_data = json.loads(rv.data.decode('utf-8'))
jsonschema.validate(response_data, poller.GWS_DIRECT_DATA_SCHEMA)
assert response_data, "the service list shouldn't be empty"
def test_gws_indirect(client):
rv = client.get(
'/poller/gws/indirect',
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 200
assert rv.is_json
response_data = json.loads(rv.data.decode('utf-8'))
jsonschema.validate(response_data, poller.SERVICES_LIST_SCHEMA)
assert response_data # test data is non-empty
assert all(s['type'] == 'GWS - INDIRECT' for s in response_data)
def test_gws_indirect_snmp(client):
# same as test_services_snmp, but also verify that
# the snmp data in the response contains counters
rv = client.get(
'/poller/services/gws_indirect?snmp=1',
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 200
assert rv.is_json
response_data = json.loads(rv.data.decode('utf-8'))
jsonschema.validate(response_data, poller.SERVICES_LIST_SCHEMA)
assert response_data # sanity: test data is non-empty
# all operational access services should have snmp info
operational = list(filter(
lambda s: s['status'] == 'operational', response_data))
assert operational # sanity: test data contains operational services
assert all('snmp' in s for s in operational)
assert all('counters' in s['snmp'] for s in operational)
def test_get_services_default(client):
rv = client.get(
'/poller/services',
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 200
assert rv.is_json
response_data = json.loads(rv.data.decode('utf-8'))
jsonschema.validate(response_data, poller.SERVICES_LIST_SCHEMA)
assert response_data # test data is non-empty
assert all('snmp' not in s for s in response_data)
def test_get_services_operational(client):
rv = client.get(
'/poller/services?all=0',
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 200
assert rv.is_json
response_data = json.loads(rv.data.decode('utf-8'))
jsonschema.validate(response_data, poller.SERVICES_LIST_SCHEMA)
assert response_data # test data is non-empty
assert all(s['status'] == 'operational' for s in response_data)
@pytest.mark.parametrize('service_type', [
# services with these types should all have snmp configs
'gws_indirect', 'geant_ip'])
# 'gws_indirect', 'geant_ip', 'l3_vpn']) # TODO: something changed, check
def test_services_snmp(client, service_type):
rv = client.get(
f'/poller/services/{service_type}?snmp=1',
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 200
assert rv.is_json
response_data = json.loads(rv.data.decode('utf-8'))
jsonschema.validate(response_data, poller.SERVICES_LIST_SCHEMA)
assert response_data # sanity: test data is non-empty
# operational services should have snmp info
operational = list(filter(
lambda s: s['status'] == 'operational', response_data))
assert operational # sanity: test data contains operational services
assert all('snmp' in s for s in operational)
@pytest.mark.parametrize('uri_type,expected_type', [
('gws_indirect', 'GWS - INDIRECT'),
('gws_upstream', 'GWS - UPSTREAM'),
('geant_ip', 'GEANT IP'),
('geant_lambda', 'GEANT LAMBDA'),
# ('gts', 'GTS'), # these are non-monitored (TODO: make a flag?)
('md_vpn_native', 'MD-VPN (NATIVE)'),
('l3_vpn', 'L3-VPN')
])
def test_all_services_by_type(client, uri_type, expected_type):
rv = client.get(
f'/poller/services/{uri_type}',
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 200
assert rv.is_json
response_data = json.loads(rv.data.decode('utf-8'))
jsonschema.validate(response_data, poller.SERVICES_LIST_SCHEMA)
assert response_data # test data is non-empty
assert all(s['type'] == expected_type for s in response_data)
def test_get_service_types(client):
rv = client.get(
'/poller/service-types',
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 200
assert rv.is_json
response_data = json.loads(rv.data.decode('utf-8'))
jsonschema.validate(response_data, poller.STRING_LIST_SCHEMA)
assert response_data # test data is non-empty
# some hard-coded values we expect to see from the test data ...
assert 'gws_indirect' in response_data
assert 'geant_ip' in response_data
@pytest.mark.parametrize('ifIndex,expected_oid', [
(595, '1.3.6.1.4.1.2636.3.6.2.1.5.595.1.6.100.119.115.45.105.110'),
(607, '1.3.6.1.4.1.2636.3.6.2.1.5.607.1.6.100.119.115.45.105.110'),
(1135, '1.3.6.1.4.1.2636.3.6.2.1.5.1135.1.6.100.119.115.45.105.110')
])
def test_dcu_oid_values(ifIndex, expected_oid):
assert poller._jnx_dcu_byte_count_oid(ifIndex) == expected_oid
@pytest.mark.parametrize('customer, interface_name, expected_oid', [
('ASREN', 'ae17.333', '1.3.6.1.4.1.2636.3.5.2.1.5.29.110.114.101.110.95.73.65.83.95.65.83.82.69.78.95.79.85.84.45.97.101.49.55.46.51.51.51.45.111.18.68.87.83.45.111.117.116.45.97.101.49.55.46.51.51.51.45.111.2'), # noqa: E501
('FCCN', 'ae10.333', '1.3.6.1.4.1.2636.3.5.2.1.5.28.110.114.101.110.95.73.65.83.95.70.67.67.78.95.79.85.84.45.97.101.49.48.46.51.51.51.45.111.18.68.87.83.45.111.117.116.45.97.101.49.48.46.51.51.51.45.111.2'), # noqa: E501
('GRNET', 'ae11.333', '1.3.6.1.4.1.2636.3.5.2.1.5.29.110.114.101.110.95.73.65.83.95.71.82.78.69.84.95.79.85.84.45.97.101.49.49.46.51.51.51.45.111.18.68.87.83.45.111.117.116.45.97.101.49.49.46.51.51.51.45.111.2'), # noqa: #E501
('GRNET', 'ae10.333', '1.3.6.1.4.1.2636.3.5.2.1.5.29.110.114.101.110.95.73.65.83.95.71.82.78.69.84.95.79.85.84.45.97.101.49.48.46.51.51.51.45.111.18.68.87.83.45.111.117.116.45.97.101.49.48.46.51.51.51.45.111.2'), # noqa: #E501
('ULAKBIM', 'ae11.333', '1.3.6.1.4.1.2636.3.5.2.1.5.31.110.114.101.110.95.73.65.83.95.85.76.65.75.66.73.77.95.79.85.84.45.97.101.49.49.46.51.51.51.45.111.18.68.87.83.45.111.117.116.45.97.101.49.49.46.51.51.51.45.111.2'), # noqa: E501
('UOM', 'xe-11/0/0.333', '1.3.6.1.4.1.2636.3.5.2.1.5.32.110.114.101.110.95.73.65.83.95.85.79.77.95.79.85.84.45.120.101.45.49.49.47.48.47.48.46.51.51.51.45.111.23.68.87.83.45.111.117.116.45.120.101.45.49.49.47.48.47.48.46.51.51.51.45.111.2'), # noqa: E501
('LITNET', 'xe-0/1/1.333', '1.3.6.1.4.1.2636.3.5.2.1.5.34.110.114.101.110.95.73.65.83.95.76.73.84.78.69.84.95.79.85.84.45.120.101.45.48.47.49.47.49.46.51.51.51.45.111.22.68.87.83.45.111.117.116.45.120.101.45.48.47.49.47.49.46.51.51.51.45.111.2'), # noqa: E501
])
def test_fw_counter_bytes_oid_values(customer, interface_name, expected_oid):
assert poller._jnx_fw_counter_bytes_oid(
customer, interface_name) == expected_oid
@pytest.mark.parametrize('description,expected_dashboards', [
('SRV_IAS CUSTOMER JISC #JISC-AP1-IAS IASPS | ASN786',
['IAS_CUSTOMER', 'NREN']),
('SRV_L2CIRCUIT CUSTOMER JISC JISC #DUB-LON-NRENBBEXT-JANET-13015 | backup for niran ', # noqa: E501
['L2_CIRCUIT']),
('SRV_L3VPN CUSTOMER EENET #EENET-AP2-LHCONE | ASN3221',
['LHCONE', 'LHCONE_CUST', 'NREN']),
('SRV_IAS PRIVATE OPTIMA-TELEKOM #HR-EduZone | For Eduzone',
['IAS_PEERS', 'IAS_PRIVATE']),
('SRV_CLS PRIVATE AWS #AT-AWS-CLS|ASN16509 | ',
['CLS', 'CLS_PEERS']),
('SRV_IAS PUBLIC MIX #IX_Peerings_in_MIX |',
['IAS_PEERS', 'IAS_PUBLIC']),
('LAG INFRASTRUCTURE BACKBONE SRF0000001 | bil-por',
['INFRASTRUCTURE_BACKBONE']),
('SRV_GLOBAL INFRASTRUCTURE BACKBONE SRF0000001 | bil-por',
['INFRASTRUCTURE_BACKBONE']),
('SRV_GCS CUSTOMER FCCN MICROSOFT #FCCN_NoveSBE_ExpressRoute_Vlan1945 | UNIT CONFIGURATION HAS BEEN SYSTEM GENERATED', # noqa: E501
['GCS']),
('PHY UPSTREAM TELIA SRF9940473 | Telia ID: IC-326863',
['GWS_PHY_UPSTREAM']),
('SRV_IAS UPSTREAM COGENT #COGENT_GWS_VIE | ASN174',
['IAS_UPSTREAM']),
('SRV_L3VPN RE_INTERCONNECT CLARA #REDCLARA-LIS-LHCONE | ASN27750',
['LHCONE', 'LHCONE_PEER', 'RE_PEER']),
('SRV_MDVPN CUSTOMER REDIRIS #RedIRIS_AP1_BGP_LU_CoC_1 | MD VPN CoC-REDIRIS - ', # noqa: E501
['MDVPN_CUSTOMERS', 'NREN']),
('SRV_L2CIRCUIT CUSTOMER TENET PSNC #lon-lon-GEANTOPEN-PSNC-TENET-18067 |', # noqa: E501
['L2_CIRCUIT']),
('PHY CUSTOMER_GEO TENET PSNC #lon-lon-GEANTOPEN-PSNC-TENET-18067 |',
['GEANTOPEN']),
('SRV_L3VPN RE_INTERCONNECT REDCLARA #REDCLARA-MAD-COPERNICUS | ASN27750',
['COPERNICUS', 'RE_PEER']),
('SRV_10GGBS CUSTOMER REDIRIS CERN #gen-mad-LHC-CERN-REDIRIS-07003 |',
['GBS_10G'])
])
def test_interface_dashboard_mapping(description, expected_dashboards):
interface = {
'router': '',
'name': '',
'description': description
}
dashboards = poller._get_dashboards(interface)
assert set(d.name for d in dashboards) == set(expected_dashboards)
@pytest.mark.parametrize(
"router, interface, exp_dashboards",
[
("mx1.lon.uk.geant.net", "ae12.123", ["CAE1"]),
("mx1.lon.uk.geant.net", "ae12", []), # POL1-704
("rt1.mar.fr.geant.net", "ae12.123", ["IC1"]), # POL1_703
("rt1.mar.fr.geant.net", "ae12", []), # POL1-704
],
)
def test_ae12_aggregate_dashboards(router, interface, exp_dashboards):
interface = {"router": router, "name": interface, "description": ""}
dashboards = poller._get_dashboards(interface)
assert [d.name for d in dashboards] == exp_dashboards
@pytest.mark.parametrize(
"description, has_ana",
[
("PHY GA-1 ANA-100", True),
("PHY ANA-100", False),
("PHY GA-1", False),
("PHY", False),
],
)
def test_ana_aggregate_dashboard(description, has_ana):
interface = {
"router": "some.router",
"name": "ifc-1",
"description": description,
}
dashboards = poller._get_dashboards(interface)
expected = ["ANA"] if has_ana else []
assert [d.name for d in dashboards] == expected
@pytest.mark.parametrize('interface,customers,dashboard_info', [
({
'description': 'SRV_IAS CUSTOMER JISC #JISC-AP1-IAS IASPS | ASN786',
'dashboards': ['IAS_CUSTOMER', 'NREN']
}, [{'name': 'JISC'}], {'name': 'JISC', 'interface_type': 'LOGICAL'}),
({
'description': 'SRV_L2CIRCUIT CUSTOMER JISC JISC #DUB-LON-NRENBBEXT-JANET-13015 | backup for niran ', # noqa: E501
'dashboards': ['L2_CIRCUIT']
}, [{'name': 'JISC'}], {'name': 'JISC', 'interface_type': 'LOGICAL'}),
({
'description': 'SRV_L3VPN CUSTOMER EENET #EENET-AP2-LHCONE | ASN3221',
'dashboards': ['LHCONE', 'LHCONE_CUST', 'NREN']
}, [{'name': 'EENET'}], {'name': 'EENET', 'interface_type': 'LOGICAL'}),
({
'description': 'SRV_IAS PRIVATE OPTIMA-TELEKOM #HR-EduZone | For Eduzone', # noqa: E501
'dashboards': ['IAS_PEERS', 'IAS_PRIVATE']
},
[{'name': 'OPTIMA-TELEKOM'}],
{'name': 'OPTIMA-TELEKOM', 'interface_type': 'LOGICAL'}),
({
'description': 'SRV_CLS PRIVATE AWS #AT-AWS-CLS|ASN16509 | ',
'dashboards': ['CLS', 'CLS_PEERS']
}, [{'name': 'AWS'}], {'name': 'AWS', 'interface_type': 'LOGICAL'}),
({
'description': 'SRV_IAS PUBLIC MIX #IX_Peerings_in_MIX |',
'dashboards': ['IAS_PEERS', 'IAS_PUBLIC']
}, [{'name': 'MIX'}], {'name': 'MIX', 'interface_type': 'LOGICAL'}),
({
'description': 'LAG INFRASTRUCTURE BACKBONE SRF0000001 | bil-por',
'dashboards': ['INFRASTRUCTURE_BACKBONE']
}, [{'name': 'bil-por'}], {'name': 'bil-por', 'interface_type': 'AGGREGATE'}), # noqa: E501
({
'description': 'SRV_GLOBAL INFRASTRUCTURE BACKBONE SRF0000001 | bil-por', # noqa: E501
'dashboards': ['INFRASTRUCTURE_BACKBONE']
}, [{'name': 'bil-por'}], {'name': 'bil-por', 'interface_type': 'LOGICAL'}), # noqa: E501
({
'description': 'SRV_GCS CUSTOMER FCCN MICROSOFT #FCCN_NoveSBE_ExpressRoute_Vlan1945 | UNIT CONFIGURATION HAS BEEN SYSTEM GENERATED', # noqa: E501
'dashboards': ['GCS']
}, [{'name': 'FCCN'}], {'name': 'FCCN', 'interface_type': 'LOGICAL'}),
({
'description': 'PHY UPSTREAM TELIA SRF9940473 | Telia ID: IC-326863',
'router': 'mx1.bud.hu.geant.net',
'dashboards': ['GWS_PHY_UPSTREAM']
}, [{'name': 'TELIA - BUD'}], {'name': 'TELIA - BUD', 'interface_type': 'PHYSICAL'}), # noqa: E501
({
'description': 'SRV_IAS UPSTREAM COGENT #COGENT_GWS_VIE | ASN174',
'dashboards': ['IAS_UPSTREAM']
}, [{'name': 'COGENT'}], {'name': 'COGENT', 'interface_type': 'LOGICAL'}),
({
'description': 'SRV_L3VPN RE_INTERCONNECT CLARA #REDCLARA-LIS-LHCONE | ASN27750', # noqa: E501
'dashboards': ['LHCONE', 'LHCONE_PEER', 'RE_PEER']
}, [{'name': 'CLARA'}], {'name': 'CLARA', 'interface_type': 'LOGICAL'}),
({
'description': 'SRV_MDVPN CUSTOMER REDIRIS #RedIRIS_AP1_BGP_LU_CoC_1 | MD VPN CoC-REDIRIS - ', # noqa: E501
'dashboards': ['MDVPN_CUSTOMERS', 'NREN']
}, [{'name': 'REDIRIS'}], {'name': 'REDIRIS', 'interface_type': 'LOGICAL'}), # noqa: E501
({
'description': 'SRV_L2CIRCUIT CUSTOMER TENET PSNC #lon-lon-GEANTOPEN-PSNC-TENET-18067 |', # noqa: E501
'dashboards': ['GEANTOPEN', 'L2_CIRCUIT']
}, [{'name': 'TENET'}, {'name': 'PSNC'}], {'name': 'TENET', 'interface_type': 'LOGICAL'}), # noqa: E501
({
'description': 'SRV_GLOBAL INFRASTRUCTURE BACKBONE SRF0000001',
'dashboards': ['INFRASTRUCTURE_BACKBONE']
},
[{'name': 'SRV_GLOBAL INFRASTRUCTURE BACKBONE SRF0000001'}],
{'name': 'SRV_GLOBAL INFRASTRUCTURE BACKBONE SRF0000001', 'interface_type': 'LOGICAL'}), # noqa: E501
({
'description': 'SRV_MDVPN CUSTOMER',
'dashboards': ['MDVPN_CUSTOMERS', 'NREN']
},
[{'name': 'SRV_MDVPN CUSTOMER'}],
{'name': 'SRV_MDVPN CUSTOMER', 'interface_type': 'LOGICAL'}),
({
'description': 'SRV_MDVPN CUSTOMER',
'dashboards': ['MDVPN_CUSTOMERS', 'NREN']
},
[{'name': 'SRV_MDVPN CUSTOMER'}],
{'name': 'SRV_MDVPN CUSTOMER', 'interface_type': 'LOGICAL'}),
({
'description': 'SRV_L3VPN RE_INTERCONNECT REDCLARA #REDCLARA-MAD-COPERNICUS | ASN27750', # noqa: E501
'dashboards': ['COPERNICUS']
}, [{'name': 'REDCLARA'}], {'name': 'REDCLARA', 'interface_type': 'LOGICAL'}), # noqa: E501
])
def test_description_dashboard_parsing(
interface, customers, dashboard_info):
updated = poller._get_dashboard_data(interface, customers)
info = updated['dashboard_info']
assert info == dashboard_info
dashboards_info = updated['dashboards_info']
expected_names = [c['name'] for c in customers]
assert set(expected_names) == {d['name'] for d in dashboards_info}
@pytest.mark.parametrize('description,expected_port_type', [
('SRV_IAS CUSTOMER JISC #JISC-AP1-IAS IASPS | ASN786', 'UNKNOWN'),
('SRV_GLOBAL CUSTOMER RENATER #RENATER-AP1 $GS-00505 | ASN2200 |', 'SERVICE'), # noqa: E501
('PHY CUSTOMER AZSCIENCENET SRF19095 $GA-01599 | GEANT-10G-Baku-CO-Interxion|', 'ACCESS') # noqa: E501
])
def test_description_port_type_parsing(
description, expected_port_type):
_port_type = poller._get_port_type(description)
assert _port_type == expected_port_type
def test_gws_config_json(client):
rv = client.get(
'/poller/gws/direct-config',
headers={'Accept': ['application/json']})
assert rv.status_code == 200
assert rv.is_json
response_data = json.loads(rv.data.decode('utf-8'))
# just a sanity check, no validation
# ... for now, this isn't an important interface
assert response_data
def test_gws_config_html(client):
rv = client.get(
'/poller/gws/direct-config?format=html',
headers={'Accept': ['text/html']})
assert rv.status_code == 200
response_data = rv.data.decode('utf-8')
# just a sanity check, no validation
# ... for now, this isn't an important interface
assert response_data.endswith('</html>')
def test_get_all_error_report_interfaces(client):
rv = client.get('/poller/error-report-interfaces', headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 200
assert rv.is_json
response_data = json.loads(rv.data)
jsonschema.validate(response_data, poller.ERROR_REPORT_INTERFACE_LIST_SCHEMA)
response_routers = {ifc['router'] for ifc in response_data}
assert len(response_routers) > 1, 'there should data from be lots of routers'
def test_error_report_interfaces_has_no_logical_interfaces(client):
rv = client.get("/poller/error-report-interfaces", headers=DEFAULT_REQUEST_HEADERS)
assert not [i["name"] for i in rv.json if i["name"].endswith(".1")]
def test_get_single_router_error_report_interfaces(client):
rv = client.get(
'/poller/error-report-interfaces/mx1.ams.nl.geant.net',
headers=DEFAULT_REQUEST_HEADERS,
)
assert rv.status_code == 200
assert rv.is_json
response_data = json.loads(rv.data)
jsonschema.validate(response_data, poller.ERROR_REPORT_INTERFACE_LIST_SCHEMA)
assert {ifc['router'] for ifc in response_data} == {"mx1.ams.nl.geant.net"}
def test_get_nren_regions(client):
rv = client.get("/poller/regions", headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 200
assert rv.is_json
response_data = json.loads(rv.data)
jsonschema.validate(response_data, poller.NREN_REGION_LIST_SCHEMA)
assert {nren_region['region'] for nren_region in response_data} # no values should be empty strings