import json import jsonschema import pytest from inventory_provider.routes import poller DEFAULT_REQUEST_HEADERS = { "Content-type": "application/json", "Accept": ["application/json"] } def test_get_all_interfaces(client): rv = client.get( '/poller/interfaces', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 assert rv.is_json response_data = json.loads(rv.data.decode('utf-8')) jsonschema.validate(response_data, poller.INTERFACE_LIST_SCHEMA) response_routers = {ifc['router'] for ifc in response_data} assert len(response_routers) > 1, \ 'there should data from be lots of routers' assert any('.lab.' in name for name in response_routers) def test_get_all_interfaces_no_lab(client): rv = client.get( '/poller/interfaces?no-lab=1', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 assert rv.is_json response_data = json.loads(rv.data.decode('utf-8')) jsonschema.validate(response_data, poller.INTERFACE_LIST_SCHEMA) response_routers = {ifc['router'] for ifc in response_data} assert len(response_routers) > 1, \ 'there should data from be lots of routers' assert all('.lab.' not in name for name in response_routers) def test_all_router_interface_speeds(client): rv = client.post( '/poller/speeds', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 response = json.loads(rv.data.decode("utf-8")) jsonschema.validate(response, poller.INTERFACE_SPEED_LIST_SCHEMA) assert response # at least shouldn't be empty response_routers = {ifc['router'] for ifc in response} assert len(response_routers) > 1, \ 'there should data from be lots of routers' def test_eumetsat_multicast(mocker, client): # routers don't have snmp acl's for us mocker.patch('inventory_provider.juniper.snmp_community_string') \ .return_value = 'blah' rv = client.get( '/poller/eumetsat-multicast', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 assert rv.is_json response_data = json.loads(rv.data.decode('utf-8')) jsonschema.validate( response_data, poller.MULTICAST_SUBSCRIPTION_LIST_SCHEMA) assert response_data, "the subscription list shouldn't be empty" def test_gws_direct(client): rv = client.get( '/poller/gws/direct', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 assert rv.is_json response_data = json.loads(rv.data.decode('utf-8')) jsonschema.validate(response_data, poller.GWS_DIRECT_DATA_SCHEMA) assert response_data, "the subscription list shouldn't be empty" def test_gws_indirect(client): rv = client.get( '/poller/gws/indirect', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 assert rv.is_json response_data = json.loads(rv.data.decode('utf-8')) jsonschema.validate(response_data, poller.SERVICES_LIST_SCHEMA) assert response_data # test data is non-empty assert all(s['type'] == 'GWS - INDIRECT' for s in response_data) def test_gws_indirect_snmp(client): # same as test_services_snmp, but also verify that # the snmp data in the response contains counters rv = client.get( '/poller/services/gws_indirect?snmp=1', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 assert rv.is_json response_data = json.loads(rv.data.decode('utf-8')) jsonschema.validate(response_data, poller.SERVICES_LIST_SCHEMA) assert response_data # test data is non-empty # all access services should have snmp info assert all('snmp' in s for s in response_data) assert all('counters' in s['snmp'] for s in response_data) def test_all_services_default(client): rv = client.get( '/poller/services', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 assert rv.is_json response_data = json.loads(rv.data.decode('utf-8')) jsonschema.validate(response_data, poller.SERVICES_LIST_SCHEMA) assert response_data # test data is non-empty assert all(s['status'] == 'operational' for s in response_data) def test_all_services_all(client): rv = client.get( '/poller/services?all=1', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 assert rv.is_json response_data = json.loads(rv.data.decode('utf-8')) jsonschema.validate(response_data, poller.SERVICES_LIST_SCHEMA) assert response_data # test data is non-empty assert any(s['status'] != 'operational' for s in response_data) assert all('snmp' not in s for s in response_data) @pytest.mark.parametrize('service_type', [ # services with these types should all have snmp configs 'gws_indirect', 'geant_ip', 'l3_vpn']) def test_services_snmp(client, service_type): rv = client.get( f'/poller/services/{service_type}?snmp=1', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 assert rv.is_json response_data = json.loads(rv.data.decode('utf-8')) jsonschema.validate(response_data, poller.SERVICES_LIST_SCHEMA) assert response_data # test data is non-empty # all access services should have snmp info assert all('snmp' in s for s in response_data) @pytest.mark.parametrize('uri_type,expected_type', [ ('gws_indirect', 'GWS - INDIRECT'), ('gws_upstream', 'GWS - UPSTREAM'), ('geant_ip', 'GEANT IP'), ('geant_lambda', 'GEANT LAMBDA'), # ('gts', 'GTS'), # these are non-monitored (TODO: make a flag?) ('md_vpn_native', 'MD-VPN (NATIVE)'), ('md_vpn_proxy', 'MD-VPN (PROXY)'), ('cbl1', 'CBL1'), ('l3_vpn', 'L3-VPN') ]) def test_all_services_by_type(client, uri_type, expected_type): rv = client.get( f'/poller/services/{uri_type}', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 assert rv.is_json response_data = json.loads(rv.data.decode('utf-8')) jsonschema.validate(response_data, poller.SERVICES_LIST_SCHEMA) assert response_data # test data is non-empty assert all(s['type'] == expected_type for s in response_data) def test_get_service_types(client): rv = client.get( '/poller/service-types', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 assert rv.is_json response_data = json.loads(rv.data.decode('utf-8')) jsonschema.validate(response_data, poller.STRING_LIST_SCHEMA) assert response_data # test data is non-empty # some hard-coded values we expect to see from the test data ... assert 'gws_indirect' in response_data assert 'geant_ip' in response_data @pytest.mark.parametrize('ifIndex,expected_oid', [ (595, '1.3.6.1.4.1.2636.3.6.2.1.5.595.1.6.100.119.115.45.105.110'), (607, '1.3.6.1.4.1.2636.3.6.2.1.5.607.1.6.100.119.115.45.105.110'), (1135, '1.3.6.1.4.1.2636.3.6.2.1.5.1135.1.6.100.119.115.45.105.110') ]) def test_dcu_oid_values(ifIndex, expected_oid): assert poller._jnx_dcu_byte_count_oid(ifIndex) == expected_oid @pytest.mark.parametrize('customer, interface_name, expected_oid', [ ('ASREN', 'ae17.333', '1.3.6.1.4.1.2636.3.5.2.1.5.29.110.114.101.110.95.73.65.83.95.65.83.82.69.78.95.79.85.84.45.97.101.49.55.46.51.51.51.45.111.18.68.87.83.45.111.117.116.45.97.101.49.55.46.51.51.51.45.111.2'), # noqa: E501 ('FCCN', 'ae10.333', '1.3.6.1.4.1.2636.3.5.2.1.5.28.110.114.101.110.95.73.65.83.95.70.67.67.78.95.79.85.84.45.97.101.49.48.46.51.51.51.45.111.18.68.87.83.45.111.117.116.45.97.101.49.48.46.51.51.51.45.111.2'), # noqa: E501 ('GRNET', 'ae11.333', '1.3.6.1.4.1.2636.3.5.2.1.5.29.110.114.101.110.95.73.65.83.95.71.82.78.69.84.95.79.85.84.45.97.101.49.49.46.51.51.51.45.111.18.68.87.83.45.111.117.116.45.97.101.49.49.46.51.51.51.45.111.2'), # noqa: #E501 ('GRNET', 'ae10.333', '1.3.6.1.4.1.2636.3.5.2.1.5.29.110.114.101.110.95.73.65.83.95.71.82.78.69.84.95.79.85.84.45.97.101.49.48.46.51.51.51.45.111.18.68.87.83.45.111.117.116.45.97.101.49.48.46.51.51.51.45.111.2'), # noqa: #E501 ('ULAKBIM', 'ae11.333', '1.3.6.1.4.1.2636.3.5.2.1.5.31.110.114.101.110.95.73.65.83.95.85.76.65.75.66.73.77.95.79.85.84.45.97.101.49.49.46.51.51.51.45.111.18.68.87.83.45.111.117.116.45.97.101.49.49.46.51.51.51.45.111.2'), # noqa: E501 ('UOM', 'xe-11/0/0.333', '1.3.6.1.4.1.2636.3.5.2.1.5.32.110.114.101.110.95.73.65.83.95.85.79.77.95.79.85.84.45.120.101.45.49.49.47.48.47.48.46.51.51.51.45.111.23.68.87.83.45.111.117.116.45.120.101.45.49.49.47.48.47.48.46.51.51.51.45.111.2'), # noqa: E501 ('LITNET', 'xe-0/1/1.333', '1.3.6.1.4.1.2636.3.5.2.1.5.34.110.114.101.110.95.73.65.83.95.76.73.84.78.69.84.95.79.85.84.45.120.101.45.48.47.49.47.49.46.51.51.51.45.111.22.68.87.83.45.111.117.116.45.120.101.45.48.47.49.47.49.46.51.51.51.45.111.2'), # noqa: E501 ]) def test_fw_counter_bytes_oid_values(customer, interface_name, expected_oid): assert poller._jnx_fw_counter_bytes_oid( customer, interface_name) == expected_oid