Skip to content
Snippets Groups Projects
test_msr_routes.py 12.43 KiB
import ipaddress
import json
import jsonschema

import pytest

from inventory_provider.routes.msr import PEERING_LIST_SCHEMA, \
    PEERING_GROUP_LIST_SCHEMA, PEERING_ADDRESS_SERVICES_LIST, \
    SYSTEM_CORRELATION_SERVICES_LIST_SCHEMA, _get_services_for_address, \
    MDVPN_LIST_SCHEMA, ASN_PEER_LIST_SCHEMA, \
    IP_SERVICES_LIST_SCHEMA, _dedupe
from inventory_provider.routes.poller import SERVICES_LIST_SCHEMA
from inventory_provider.tasks.common import _get_redis

DEFAULT_REQUEST_HEADERS = {'Accept': ['application/json']}


def test_access_services(client):
    rv = client.get(
        '/msr/access-services',
        headers=DEFAULT_REQUEST_HEADERS)
    assert rv.status_code == 200
    assert rv.is_json
    response_data = json.loads(rv.data.decode('utf-8'))
    jsonschema.validate(response_data, SERVICES_LIST_SCHEMA)

    assert response_data  # test data is non-empty
    assert all(s['type'] == 'GEANT IP' for s in response_data)


def test_logical_system_peerings_all(client):
    rv = client.get(
        '/msr/bgp/logical-system-peerings',
        headers=DEFAULT_REQUEST_HEADERS)
    assert rv.status_code == 200
    assert rv.is_json
    response_data = json.loads(rv.data.decode('utf-8'))
    jsonschema.validate(response_data, PEERING_LIST_SCHEMA)

    assert response_data  # test data is non-empty
    assert all('logical-system' in p for p in response_data)


@pytest.mark.parametrize('name', ['VRR'])
def test_logical_system_peerings_specific(client, name):
    rv = client.get(
        f'/msr/bgp/logical-system-peerings/{name}',
        headers=DEFAULT_REQUEST_HEADERS)
    assert rv.status_code == 200
    assert rv.is_json
    response_data = json.loads(rv.data.decode('utf-8'))
    jsonschema.validate(response_data, PEERING_LIST_SCHEMA)

    assert response_data  # test data is non-empty
    assert all(p['logical-system'] == name for p in response_data)


@pytest.mark.parametrize('name', [
    'VRR1',
    'VPNPROXY',
    'vrr',
    ' vrr',
    'VPN PROXY'
])
def test_logical_system_peerings_404(client, name):
    rv = client.get(
        f'/msr/bgp/logical-system-peerings/{name}',
        headers=DEFAULT_REQUEST_HEADERS)
    assert rv.status_code == 404


def test_group_peerings_all(client):
    rv = client.get(
        '/msr/bgp/group-peerings',
        headers=DEFAULT_REQUEST_HEADERS)
    assert rv.status_code == 200
    assert rv.is_json
    response_data = json.loads(rv.data.decode('utf-8'))
    jsonschema.validate(response_data, PEERING_LIST_SCHEMA)

    assert response_data  # test data is non-empty


@pytest.mark.parametrize('name', ['BGPLU', 'eGEANT', 'eGEANT-mcast'])
def test_group_peerings_specific(client, name):
    rv = client.get(
        f'/msr/bgp/group-peerings/{name}',
        headers=DEFAULT_REQUEST_HEADERS)
    assert rv.status_code == 200
    assert rv.is_json
    response_data = json.loads(rv.data.decode('utf-8'))
    jsonschema.validate(response_data, PEERING_LIST_SCHEMA)

    assert response_data  # test data is non-empty
    assert all(p['group'] == name for p in response_data)


@pytest.mark.parametrize('name', ['EGEANT', 'eGEANT mcast'])
def test_group_peerings_404(client, name):
    rv = client.get(
        f'/msr/bgp/group-peerings/{name}',
        headers=DEFAULT_REQUEST_HEADERS)
    assert rv.status_code == 404


@pytest.mark.parametrize('name', ['mdvpn'])
def test_routing_instance_peerings_specific(client, name):
    rv = client.get(
        f'/msr/bgp/routing-instance-peerings/{name}',
        headers=DEFAULT_REQUEST_HEADERS)
    assert rv.status_code == 200
    assert rv.is_json
    response_data = json.loads(rv.data.decode('utf-8'))
    jsonschema.validate(response_data, PEERING_LIST_SCHEMA)

    assert response_data  # test data is non-empty
    assert all(p['instance'] == name for p in response_data)


@pytest.mark.parametrize('name', ['xyz', 'MDVPN', 'PRACE'])
def test_routing_instance_peerings_404(client, name):
    rv = client.get(
        f'/msr/bgp/routing-instance-peerings/{name}',
        headers=DEFAULT_REQUEST_HEADERS)
    assert rv.status_code == 404


@pytest.mark.parametrize('uri', [
    '/msr/bgp/logical-systems',
    '/msr/bgp/groups',
    '/msr/bgp/routing-instances'])
def test_peerings_group_list(client, uri):
    rv = client.get(uri, headers=DEFAULT_REQUEST_HEADERS)
    assert rv.status_code == 200
    assert rv.is_json
    response_data = json.loads(rv.data.decode('utf-8'))
    jsonschema.validate(response_data, PEERING_GROUP_LIST_SCHEMA)

    assert response_data  # test data is non-empty


@pytest.mark.parametrize('address', [
    # hand-chosen, should have services
    '62.40.127.141',  # from sample peering outage response
    '62.40.127.139',  # from sample peering outage response
    '62.40.98.201',  # AMS-AMS IP TRUNK (mx1.ams/ae0.1)
    '62.40.127.134',  # BELNET AP3 (mx1.ams/ae13.2)
    '62.40.124.38',  # SURF AP1 (mx1.ams/ae15.1103)
    '62.40.125.57',  # JISC AP2 (mx1.lon2/ae12.0)
    '2001:0798:0099:0001:0000:0000:0000:0026',  # v6 peering with Internet2
    '2001:0798:0099:0001:0000:0000:0000:0056',  # v6 peering with Canarie
    '2001:0798:0018:10aa:0000:0000:0000:0016',  # v6 peering with HEANET
])
def test_lookup_services_for_address(address, mocked_redis):
    _redis_instance = _get_redis({
        'redis': {
            'hostname': None,
            'port': None
        },
        'redis-databases': [9, 7, 5]
    })

    info = list(_get_services_for_address(address, r=_redis_instance))
    jsonschema.validate(info, PEERING_ADDRESS_SERVICES_LIST)

    # sanity check to be sure we have interesting test data
    assert info
    assert any(x['services'] for x in info)


_OUTAGE_PEER_ADDRESSES = [
    # taken from a sample splunk query result
    '83.97.93.247',
    '146.48.78.13',
    '185.6.36.40',
    '2a00:1620:c0:4e:146:48:78:13',
    '62.40.125.102',
    '62.40.126.11',
    '62.40.127.141',
    '62.40.98.11',
    '62.40.102.19',
    '202.179.249.33',
    '202.179.249.209',
    '138.44.226.6',
    '138.44.226.8',
    '203.30.38.92',
    '195.66.226.140',
    '195.66.224.122',
    '62.40.124.226',
    '80.81.194.138',
    '62.40.127.139',
    '80.81.195.40',
    '80.81.194.152',
    '62.40.125.154',
    '62.40.124.147',
    '62.40.100.39',
    '62.40.126.222',
    '83.97.88.197',
    '83.97.88.166',
    '91.210.16.114',
    '62.40.126.146',
    '62.40.100.45',
    '62.40.125.198',
    '2400:4500::1:0:32',
    '62.40.126.230',
    '117.103.111.142',
    '91.210.16.5',
    '195.66.227.152',
    '91.210.16.63',
    '62.40.109.146',
    '64.57.30.209',
    '198.124.80.29',
    '62.40.125.78',
    '192.84.8.13',
    '62.40.124.242',
    '185.1.47.55',
    '83.97.89.222',
    '185.1.47.54',
    '83.97.88.228',
    '83.97.88.229',
    '83.97.90.2',
    '195.66.224.207',
    '83.97.89.242',
    '62.40.124.249',
    '62.40.126.37',
    '195.66.226.230',
    '62.40.124.222',
    '185.6.36.75',
    '80.249.210.1',
    '62.40.124.230',
    '198.124.80.9',
    '83.97.88.162',
    '62.40.125.130',
    '195.66.225.63',
    '80.81.195.189',
    '195.66.225.121',
    '62.40.125.167',
    '195.66.225.142',
    '62.40.125.18',
    '91.210.16.113',
    '80.249.210.95',
    '80.249.209.34',
    '62.40.125.238',
    '83.97.88.102',
    '80.81.194.165',
    '62.40.125.254',
    '83.97.88.106',
    '91.210.16.202',
    '80.249.208.164',
    '185.1.192.59',
    '195.66.226.180',
    '62.40.125.134',
    '83.97.89.17',
    '62.40.126.3',
    '80.249.209.232',
    '83.97.88.34',
    '185.1.47.48',
    '83.97.89.250',
    '83.97.88.185',
    '80.249.209.53',
    '62.40.125.174',
    '205.189.32.76',
    '185.6.36.55',
    '185.6.36.28',
    '80.81.195.168',
    '62.40.125.42',
    '80.81.192.43',
    '83.97.88.206',
    '62.40.100.59',
    '62.40.125.118',
    '62.40.124.150',
    '83.97.88.46'
]


def test_peering_services(client):
    headers = {'Content-Type': 'application/json'}
    headers.update(DEFAULT_REQUEST_HEADERS)
    rv = client.post(
        '/msr/bgp/peering-services',
        headers=headers,
        data=json.dumps(_OUTAGE_PEER_ADDRESSES))
    assert rv.status_code == 200
    assert rv.is_json
    response_data = json.loads(rv.data.decode('utf-8'))

    assert response_data  # test data is non-empty
    jsonschema.validate(response_data, PEERING_ADDRESS_SERVICES_LIST)


def test_peering_services_single_threaded(client):
    # this functionality is mainly for testing/debugging
    headers = {'Content-Type': 'application/json'}
    headers.update(DEFAULT_REQUEST_HEADERS)
    rv = client.post(
        '/msr/bgp/peering-services?no-threads=1',
        headers=headers,
        data=json.dumps(_OUTAGE_PEER_ADDRESSES))
    assert rv.status_code == 200
    assert rv.is_json
    response_data = json.loads(rv.data.decode('utf-8'))

    assert response_data  # test data is non-empty
    jsonschema.validate(response_data, PEERING_ADDRESS_SERVICES_LIST)


def test_system_correlation_services(client):
    rv = client.get(
        '/msr/services',
        headers=DEFAULT_REQUEST_HEADERS)
    assert rv.status_code == 200
    assert rv.is_json
    response_data = json.loads(rv.data.decode('utf-8'))
    jsonschema.validate(response_data, SYSTEM_CORRELATION_SERVICES_LIST_SCHEMA)
    assert response_data  # test data is non-empty


def test_get_all_peerings(client):
    rv = client.get(
        '/msr/bgp',
        headers=DEFAULT_REQUEST_HEADERS)
    assert rv.status_code == 200
    assert rv.is_json
    response_data = json.loads(rv.data.decode('utf-8'))
    jsonschema.validate(response_data, PEERING_LIST_SCHEMA)
    assert response_data  # test data is non-empty


def test_get_mdvpn_peerings(client, mocked_redis):
    rv = client.get(
        '/msr/mdvpn',
        headers=DEFAULT_REQUEST_HEADERS
    )
    assert rv.status_code == 200
    assert rv.is_json
    response_data = json.loads(rv.data.decode('utf-8'))
    jsonschema.validate(response_data, MDVPN_LIST_SCHEMA)
    assert response_data  # test data is non-empty


@pytest.mark.parametrize('endpoint_variant', [
    "",  # default, no filter
    "/1853",
    "?group=IAS-NRENS",
    "?instance=IAS",
    "?group=IAS-NRENS&instance=IAS",
    "/1853?group=IAS-NRENS",
    "/1853?instance=IAS",
    "/1853?group=IAS-NRENS&instance=IAS"
])
def test_get_asn_peers_get(endpoint_variant, client, mocked_redis):
    rv = client.get(
        f'/msr/asn-peers{endpoint_variant}',
        headers=DEFAULT_REQUEST_HEADERS
    )
    assert rv.status_code == 200
    assert rv.is_json
    response_data = json.loads(rv.data.decode('utf-8'))
    jsonschema.validate(response_data, ASN_PEER_LIST_SCHEMA)
    assert response_data  # test data is non-empty


@pytest.mark.parametrize('endpoint_variant,post_body', [
    ("", '{}'),
    ("", '{"group": "IAS-NRENS"}'),
    ("", '{"instance": "IAS"}'),
    ("", '{"group": "IAS-NRENS", "instance": "IAS"}'),
    ("/1853", '{}'),
    ("/1853", '{"group": "IAS-NRENS"}'),
    ("/1853", '{"instance": "IAS"}'),
    ("/1853", '{"group": "IAS-NRENS", "instance": "IAS"}')
])
def test_get_asn_peers_post(endpoint_variant, post_body, client, mocked_redis):
    rv = client.post(
        f'/msr/asn-peers{endpoint_variant}',
        headers=DEFAULT_REQUEST_HEADERS,
        json=post_body
    )
    assert rv.status_code == 200
    assert rv.is_json
    response_data = json.loads(rv.data.decode('utf-8'))
    jsonschema.validate(response_data, ASN_PEER_LIST_SCHEMA)
    assert response_data  # test data is non-empty


def test_dedupe():
    data = [
        {'a': 1, 'b': {'a': 1, 'b': 2}, 'c': 3},
        {'a': 1, 'b': {'a': 1, 'b': 2}, 'c': 3, 'd': 4},
        {'a': 1, 'b': {'a': 1, 'b': 2}, 'c': 3},
        {'a': 1, 'b': {'a': 1, 'b': 2}, 'c': 3},
    ]
    result = list(_dedupe(data))
    assert len(result) == 2


def test_ip_services(client):

    rv = client.get(
        '/msr/ip-services',
        headers=DEFAULT_REQUEST_HEADERS)
    assert rv.status_code == 200
    assert rv.is_json
    response_data = json.loads(rv.data.decode('utf-8'))
    jsonschema.validate(response_data, IP_SERVICES_LIST_SCHEMA)

    # sanity that there are some non-trivial elements in test data
    assert any(len(_x['peerings']) > 0 for _x in response_data)
    assert any(len(_x['services']) > 0 for _x in response_data)

    # sanity confirmation that peerings are with the correct host
    for _s in response_data:
        net = ipaddress.ip_interface(_s['address']).network
        for _p in _s['peerings']:
            assert ipaddress.ip_address(_p['address']) in net

    # sanity confirmation that no interface addresses are duplicated
    ifc_keys = set()
    for _s in response_data:
        key = f'{_s["hostname"]}:{_s["port"]}:{_s["address"]}'
        assert key not in ifc_keys
        ifc_keys.add(key)