test_general_poller_routes.py 8.24 KiB
import json
import jsonschema
import pytest
from inventory_provider.routes import poller
DEFAULT_REQUEST_HEADERS = {
"Content-type": "application/json",
"Accept": ["application/json"]
}
def test_get_all_interfaces(client):
rv = client.get(
'/poller/interfaces',
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 200
assert rv.is_json
response_data = json.loads(rv.data.decode('utf-8'))
jsonschema.validate(response_data, poller.INTERFACE_LIST_SCHEMA)
response_routers = {ifc['router'] for ifc in response_data}
assert len(response_routers) > 1, \
'there should data from be lots of routers'
def test_all_router_interface_speeds(client):
rv = client.post(
'/poller/speeds',
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 200
response = json.loads(rv.data.decode("utf-8"))
jsonschema.validate(response, poller.INTERFACE_SPEED_LIST_SCHEMA)
assert response # at least shouldn't be empty
response_routers = {ifc['router'] for ifc in response}
assert len(response_routers) > 1, \
'there should data from be lots of routers'
def test_eumetsat_multicast(mocker, client):
# routers don't have snmp acl's for us
mocker.patch('inventory_provider.juniper.snmp_community_string') \
.return_value = 'blah'
rv = client.get(
'/poller/eumetsat-multicast',
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 200
assert rv.is_json
response_data = json.loads(rv.data.decode('utf-8'))
jsonschema.validate(
response_data, poller.MULTICAST_SUBSCRIPTION_LIST_SCHEMA)
assert response_data, "the subscription list shouldn't be empty"
def test_gws_direct(client):
rv = client.get(
'/poller/gws/direct',
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 200
assert rv.is_json
response_data = json.loads(rv.data.decode('utf-8'))
jsonschema.validate(response_data, poller.GWS_DIRECT_DATA_SCHEMA)
assert response_data, "the subscription list shouldn't be empty"
def test_gws_indirect(client):
rv = client.get(
'/poller/gws/indirect',
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 200
assert rv.is_json
response_data = json.loads(rv.data.decode('utf-8'))
jsonschema.validate(response_data, poller.SERVICES_LIST_SCHEMA)
assert response_data # test data is non-empty
assert all(s['type'] == 'GWS - INDIRECT' for s in response_data)
def test_gws_indirect_snmp(client):
# same as test_services_snmp, but also verify that
# the snmp data in the response contains counters
rv = client.get(
'/poller/services/gws_indirect?snmp=1',
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 200
assert rv.is_json
response_data = json.loads(rv.data.decode('utf-8'))
jsonschema.validate(response_data, poller.SERVICES_LIST_SCHEMA)
assert response_data # test data is non-empty
# all access services should have snmp info
assert all('snmp' in s for s in response_data)
assert all('counters' in s['snmp'] for s in response_data)
def test_all_services_default(client):
rv = client.get(
'/poller/services',
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 200
assert rv.is_json
response_data = json.loads(rv.data.decode('utf-8'))
jsonschema.validate(response_data, poller.SERVICES_LIST_SCHEMA)
assert response_data # test data is non-empty
assert all(s['status'] == 'operational' for s in response_data)
def test_all_services_all(client):
rv = client.get(
'/poller/services?all=1',
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 200
assert rv.is_json
response_data = json.loads(rv.data.decode('utf-8'))
jsonschema.validate(response_data, poller.SERVICES_LIST_SCHEMA)
assert response_data # test data is non-empty
assert any(s['status'] != 'operational' for s in response_data)
assert all('snmp' not in s for s in response_data)
@pytest.mark.parametrize('service_type', [
# services with these types should all have snmp configs
'gws_indirect', 'geant_ip', 'l3_vpn'])
def test_services_snmp(client, service_type):
rv = client.get(
f'/poller/services/{service_type}?snmp=1',
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 200
assert rv.is_json
response_data = json.loads(rv.data.decode('utf-8'))
jsonschema.validate(response_data, poller.SERVICES_LIST_SCHEMA)
assert response_data # test data is non-empty
# all access services should have snmp info
assert all('snmp' in s for s in response_data)
@pytest.mark.parametrize('uri_type,expected_type', [
('gws_indirect', 'GWS - INDIRECT'),
('gws_upstream', 'GWS - UPSTREAM'),
('geant_ip', 'GEANT IP'),
('geant_lambda', 'GEANT LAMBDA'),
# ('gts', 'GTS'), # these are non-monitored (TODO: make a flag?)
('md_vpn_native', 'MD-VPN (NATIVE)'),
('md_vpn_proxy', 'MD-VPN (PROXY)'),
('cbl1', 'CBL1'),
('l3_vpn', 'L3-VPN')
])
def test_all_services_by_type(client, uri_type, expected_type):
rv = client.get(
f'/poller/services/{uri_type}',
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 200
assert rv.is_json
response_data = json.loads(rv.data.decode('utf-8'))
jsonschema.validate(response_data, poller.SERVICES_LIST_SCHEMA)
assert response_data # test data is non-empty
assert all(s['type'] == expected_type for s in response_data)
def test_get_service_types(client):
rv = client.get(
'/poller/service-types',
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 200
assert rv.is_json
response_data = json.loads(rv.data.decode('utf-8'))
jsonschema.validate(response_data, poller.STRING_LIST_SCHEMA)
assert response_data # test data is non-empty
# some hard-coded values we expect to see from the test data ...
assert 'gws_indirect' in response_data
assert 'geant_ip' in response_data
@pytest.mark.parametrize('ifIndex,expected_oid', [
(595, '1.3.6.1.4.1.2636.3.6.2.1.5.595.1.6.100.119.115.45.105.110'),
(607, '1.3.6.1.4.1.2636.3.6.2.1.5.607.1.6.100.119.115.45.105.110'),
(1135, '1.3.6.1.4.1.2636.3.6.2.1.5.1135.1.6.100.119.115.45.105.110')
])
def test_dcu_oid_values(ifIndex, expected_oid):
assert poller._jnx_dcu_byte_count_oid(ifIndex) == expected_oid
@pytest.mark.parametrize('customer, interface_name, expected_oid', [
('ASREN', 'ae17.333', '1.3.6.1.4.1.2636.3.5.2.1.5.29.110.114.101.110.95.73.65.83.95.65.83.82.69.78.95.79.85.84.45.97.101.49.55.46.51.51.51.45.111.18.68.87.83.45.111.117.116.45.97.101.49.55.46.51.51.51.45.111.2'), # noqa: E501
('FCCN', 'ae10.333', '1.3.6.1.4.1.2636.3.5.2.1.5.28.110.114.101.110.95.73.65.83.95.70.67.67.78.95.79.85.84.45.97.101.49.48.46.51.51.51.45.111.18.68.87.83.45.111.117.116.45.97.101.49.48.46.51.51.51.45.111.2'), # noqa: E501
('GRNET', 'ae11.333', '1.3.6.1.4.1.2636.3.5.2.1.5.29.110.114.101.110.95.73.65.83.95.71.82.78.69.84.95.79.85.84.45.97.101.49.49.46.51.51.51.45.111.18.68.87.83.45.111.117.116.45.97.101.49.49.46.51.51.51.45.111.2'), # noqa: #E501
('GRNET', 'ae10.333', '1.3.6.1.4.1.2636.3.5.2.1.5.29.110.114.101.110.95.73.65.83.95.71.82.78.69.84.95.79.85.84.45.97.101.49.48.46.51.51.51.45.111.18.68.87.83.45.111.117.116.45.97.101.49.48.46.51.51.51.45.111.2'), # noqa: #E501
('ULAKBIM', 'ae11.333', '1.3.6.1.4.1.2636.3.5.2.1.5.31.110.114.101.110.95.73.65.83.95.85.76.65.75.66.73.77.95.79.85.84.45.97.101.49.49.46.51.51.51.45.111.18.68.87.83.45.111.117.116.45.97.101.49.49.46.51.51.51.45.111.2'), # noqa: E501
('UOM', 'xe-11/0/0.333', '1.3.6.1.4.1.2636.3.5.2.1.5.32.110.114.101.110.95.73.65.83.95.85.79.77.95.79.85.84.45.120.101.45.49.49.47.48.47.48.46.51.51.51.45.111.23.68.87.83.45.111.117.116.45.120.101.45.49.49.47.48.47.48.46.51.51.51.45.111.2'), # noqa: E501
('LITNET', 'xe-0/1/1.333', '1.3.6.1.4.1.2636.3.5.2.1.5.34.110.114.101.110.95.73.65.83.95.76.73.84.78.69.84.95.79.85.84.45.120.101.45.48.47.49.47.49.46.51.51.51.45.111.22.68.87.83.45.111.117.116.45.120.101.45.48.47.49.47.49.46.51.51.51.45.111.2'), # noqa: E501
])
def test_fw_counter_bytes_oid_values(customer, interface_name, expected_oid):
assert poller._jnx_fw_counter_bytes_oid(
customer, interface_name) == expected_oid