diff --git a/inventory_provider/routes/classifier.py b/inventory_provider/routes/classifier.py index d2b46d342a1793562e52aa5cd04ca265c88a480c..732d614b02cdc65b85a01f94b6876e6d106a379f 100644 --- a/inventory_provider/routes/classifier.py +++ b/inventory_provider/routes/classifier.py @@ -175,12 +175,15 @@ def get_interface_services_and_loc(ims_source_equipment, ims_interface, redis): keys_to_remove = set(_s.keys()) - keys for k in keys_to_remove: _s.pop(k) + + result = {} + raw_services = redis.get( f'ims:interface_services:{ims_source_equipment}:{ims_interface}') - result = {} if raw_services: - result['services'] = [] + # result['services'] = [] related_services = {} + services = {} contacts = set() for s in json.loads(raw_services.decode('utf-8')): related_services.update( @@ -188,9 +191,16 @@ def get_interface_services_and_loc(ims_source_equipment, ims_interface, redis): contacts.update(set(s.pop('contacts', set()))) if s['circuit_type'] == 'service': _format_service(s) - result['services'].append(s) + services['id'] = s + + def _sorted_by_name(things_with_name): + return sorted( + list(things_with_name), + key=lambda x: x['name']) + result['contacts'] = sorted(list(contacts)) - result['related-services'] = list(related_services.values()) + result['services'] = _sorted_by_name(services.values()) + result['related-services'] = _sorted_by_name(related_services.values()) if not result['services']: result.pop('services', None) diff --git a/inventory_provider/routes/common.py b/inventory_provider/routes/common.py index 8213ad6ba18a635f3f17c3324f4d29e3d955234c..6f9f072a8e931c60ac901720335abbfcefbebd39 100644 --- a/inventory_provider/routes/common.py +++ b/inventory_provider/routes/common.py @@ -275,6 +275,67 @@ def load_snmp_indexes(hostname=None): return result +def distribute_jobs_across_workers( + worker_proc, jobs, input_ctx, num_threads=10): + """ + Launch `num_threads` threads with worker_proc and distribute + jobs across them. Then return the results from all workers. + + (generic version of _load_redis_docs) + + worker_proc should be a function that takes args: + - input queue (items from input_data_items are written here) + - output queue (results from each input item are to be written here) + - input_ctx (some worker-specific data) + + worker contract is: + - None is written to input queue iff there are no more items coming + - the worker writes None to the output queue when it exits + + :param worker_proc: worker proc, as above + :param input_data_items: an iterable of things to put in input queue + :param input_ctx: some data to pass when starting worker proc + :param num_threads: number of worker threads to start + :return: yields all values computed by worker procs + """ + assert isinstance(num_threads, int) and num_threads > 0 # sanity + + response_queue = queue.Queue() + + threads = [] + for _ in range(num_threads): + q = queue.Queue() + t = threading.Thread( + target=worker_proc, + args=[q, response_queue, input_ctx]) + t.start() + threads.append({'thread': t, 'queue': q}) + + for job_data in jobs: + t = random.choice(threads) + t['queue'].put(job_data) + + # tell all threads there are no more keys coming + for t in threads: + t['queue'].put(None) + + num_finished = 0 + # read values from response_queue until we receive + # None len(threads) times + while num_finished < len(threads): + job_result = response_queue.get() + if not job_result: + # contract is that thread returns None when done + num_finished += 1 + logger.debug('one worker thread finished') + continue + yield job_result + + # cleanup like we're supposed to, even though it's python + for t in threads: + t['thread'].join(timeout=0.5) # timeout, for sanity + + def ims_equipment_to_hostname(equipment): """ changes names like MX1.AMS.NL to mx1.ams.nl.geant.net diff --git a/inventory_provider/routes/msr.py b/inventory_provider/routes/msr.py index 0bb6a65c68f23bf45ceac35c979f391c85ef563d..00cb88c7108ba2d80248265434614c7dfc8834f7 100644 --- a/inventory_provider/routes/msr.py +++ b/inventory_provider/routes/msr.py @@ -25,6 +25,12 @@ These endpoints are intended for use by MSR. .. autofunction:: inventory_provider.routes.msr.logical_system_peerings +/msr/bgp/peering-services +------------------------------------- + +.. autofunction:: inventory_provider.routes.msr.get_peering_services + + /msr/bgp/groups ------------------------------------- @@ -56,17 +62,29 @@ helpers .. autofunction:: inventory_provider.routes.msr._handle_peering_group_request """ # noqa E501 +import binascii +import functools +import hashlib import itertools import json +import ipaddress +import logging +import re +import threading -from flask import Blueprint, Response, request +from flask import Blueprint, Response, request, current_app +import jsonschema from inventory_provider.routes import common +from inventory_provider.routes.classifier import \ + get_ims_equipment_name, get_ims_interface, get_interface_services_and_loc from inventory_provider.routes.common import _ignore_cache_or_retrieve from inventory_provider.routes.poller import get_services +from inventory_provider.tasks import common as tasks_common routes = Blueprint('msr-query-routes', __name__) - +logger = logging.getLogger(__name__) +_subnet_lookup_semaphore = threading.Semaphore() PEERING_GROUP_LIST_SCHEMA = { '$schema': 'http://json-schema.org/draft-07/schema#', @@ -106,6 +124,55 @@ PEERING_LIST_SCHEMA = { } +IP_ADDRESS_LIST_SCHEMA = { + '$schema': 'http://json-schema.org/draft-07/schema#', + 'definitions': { + "ip-address": { + "type": "string", + "oneOf": [ + {"pattern": r'^(\d+\.){3}\d+$'}, + {"pattern": r'^[a-f0-9:]+$'} + ] + } + }, + + 'type': 'array', + 'items': {'$ref': '#/definitions/ip-address'}, + 'minItems': 1 +} + +PEERING_ADDRESS_SERVICES_LIST = { + '$schema': 'http://json-schema.org/draft-07/schema#', + 'definitions': { + 'service': { + 'properties': { + 'id': {'type': 'integer'}, + 'name': {'type': 'string'}, + 'type': {'type': 'string'}, + 'status': {'type': 'string'} + }, + 'required': ['name', 'type', 'status'], + 'additionalProperties': False + }, + 'address-service-info': { + 'properties': { + 'address': {'type': 'string'}, + 'hostname': {'type': 'string'}, + 'interface': {'type': 'string'}, + 'services': { + 'type': 'array', + 'items': {'$ref': '#/definitions/service'} + } + }, + 'required': ['address', 'hostname', 'interface', 'services'], + 'additionalProperties': False + } + }, + 'type': 'array', + 'items': {'$ref': '#/definitions/address-service-info'} +} + + @routes.after_request def after_request(resp): return common.after_request(resp) @@ -327,3 +394,264 @@ def get_access_services(): cf. :meth:`inventory_provider.routes.poller.get_services` """ return get_services(service_type='geant_ip') + + +def _find_subnet_keys(addresses): + """ + yields pairs like: + (redis key [str], address [str]) + + we search to the end of the list in case of network config + errors (same address in multiple subnets) + + :param addresses: iterable of strings (like PEER_ADDRESS_LIST) + :return: as above + """ + # make a dict & remove duplicates + # will raise in case of invalid addresses + remaining_addresses = { + a: ipaddress.ip_address(a) + for a in set(addresses) + } + + r = common.get_current_redis() + # scan with bigger batches, to mitigate network latency effects + for k in r.scan_iter('subnets:*', count=1000): + + if not remaining_addresses: + break + + k = k.decode('utf-8') + m = re.match(r'^subnets:(.*)$', k) + assert m, 'sanity failure: redis returned an invalid key name' + + interface = ipaddress.ip_interface(m.group(1)) + try: + matched_address = next( + a for a, v + in remaining_addresses.items() + if v == interface.ip) + del remaining_addresses[matched_address] + yield k, matched_address + except StopIteration: + # no match + continue + + +@functools.lru_cache(100) +def _get_subnets(r): + result = {} + for k in r.scan_iter('subnets:*', count=1000): + k = k.decode('utf-8') + m = re.match(r'^subnets:(.+)$', k) + assert m + result[k] = ipaddress.ip_interface(m.group(1)).network + return result + + +def _get_subnet_interfaces(address, r): + + # synchronize calls to _get_subnets, so we don't + # call it many times together when running in + # multi-thread mode + _subnet_lookup_semaphore.acquire() + try: + all_subnets = _get_subnets(r) + except Exception: + logger.exception('error looking up subnets') + all_subnets = {} + finally: + _subnet_lookup_semaphore.release() + + address = ipaddress.ip_address(address) + for key, network in all_subnets.items(): + if address not in network: + continue + value = r.get(key) + if not value: + logger.error(f'no value for for redis key "{key}"') + continue + + yield from json.loads(value.decode('utf-8')) + + +def _get_services_for_address(address: str, r): + """ + match this address against all interfaces, then look up + any known services for that port + + address is assumed to be in a valid v4/v6 format (it's used to + construct a ipaddress.ip_address object without try/except) + + :param address: ip address string + :param r: a Redis instance + :return: yields PEERING_ADDRESS_SERVICES_LIST elements + """ + def _formatted_service(s): + return { + 'id': s['id'], + 'name': s['name'], + 'type': s['service_type'], + 'status': s['status'] + } + + for ifc_info in _get_subnet_interfaces(address, r): + ims_source_equipment = get_ims_equipment_name( + ifc_info['router'], r) + ims_interface = get_ims_interface(ifc_info['interface name']) + service_info = get_interface_services_and_loc( + ims_source_equipment, ims_interface, r) + + services = service_info.get('services', []) + services = map(_formatted_service, services) + services = sorted(services, key=lambda x: x['name']) + + yield { + 'address': address, + 'hostname': ifc_info['router'], + 'interface': ifc_info['interface name'], + 'services': list(services) + } + + +def _load_address_services_proc(address_queue, results_queue, config_params): + """ + create a local redis connection with the current db index, + lookup the values of the keys that come from key_queue + and put them on value_queue + + i/o contract: + None arriving on key_queue means no more keys are coming + put None in value_queue means we are finished + + :param key_queue: + :param value_queue: + :param config_params: app config + :param doc_type: decoding type to do (xml or json) + :return: nothing + """ + try: + r = tasks_common.get_current_redis(config_params) + while True: + address = address_queue.get() + + # contract is that None means no more addresses + if not address: + break + + for service_info in _get_services_for_address(address, r): + results_queue.put(service_info) + + except json.JSONDecodeError: + logger.exception(f'error decoding redis entry for {address}') + except Exception: + # just log info about this error (for debugging only) + # ... and quit (i.e. let finally cleanup) + logger.exception(f'error looking up service info for {address}') + finally: + # contract is to return None when finished + results_queue.put(None) + + +def _get_peering_services_multi_thread(addresses): + """ + normal handler for `/msr/bgp/peering-services` + + this one does the lookups in multiple threads, each with its own + redis connection + + (cf. _get_peering_services_single_thread) + + :param addresses: iterable of address strings + :return: yields dicts returned from _get_services_for_address + """ + yield from common.distribute_jobs_across_workers( + worker_proc=_load_address_services_proc, + jobs=addresses, + input_ctx=current_app.config['INVENTORY_PROVIDER_CONFIG'], + num_threads=min(len(addresses), 10)) + + +def _get_peering_services_single_thread(addresses): + """ + used by `/msr/bgp/peering-services` + + this one does the lookups serially, in the current thread and a single + redis connection + + (cf. _get_peering_services_multi_thread) + + :param addresses: iterable of address strings + :return: yields dicts returned from _get_services_for_address + """ + r = common.get_current_redis() + for a in addresses: + yield from _get_services_for_address(a, r) + + +def _obj_key(o): + m = hashlib.sha256() + m.update(json.dumps(json.dumps(o)).encode('utf-8')) + digest = binascii.b2a_hex(m.digest()).decode('utf-8') + return digest.upper()[-4:] + + +@routes.route('/bgp/peering-services', methods=['POST']) +@common.require_accepts_json +def get_peering_services(): + """ + Handler for `/msr/bgp/peering-services` + + This method must be called with POST method, and the payload + should be a json-formatted list of addresses (strings), which will + be validated against the following schema: + + .. asjson:: + inventory_provider.routes.msr.IP_ADDRESS_LIST_SCHEMA + + The response will be formatted as follows: + + .. asjson:: + inventory_provider.routes.msr.PEERING_ADDRESS_SERVICES_LIST + + A `no-threads` can be also be given. If its truthiness + value evaluates to True, then the lookups are done in a single thread. + (This functionality is mainly for testing/debugging - it's not + expected to be used in production.) + + :return: + """ + addresses = request.json + jsonschema.validate(addresses, IP_ADDRESS_LIST_SCHEMA) + + addresses = set(addresses) # remove duplicates + + input_data_key = _obj_key(sorted(list(addresses))) + cache_key = f'classifier-cache:msr:peering-services:{input_data_key}' + + r = common.get_current_redis() + response = _ignore_cache_or_retrieve(request, cache_key, r) + + if not response: + # validate addresses, to decrease chances of dying in a worker thread + for a in addresses: + assert ipaddress.ip_address(a) + + no_threads = common.get_bool_request_arg('no-threads', False) + if no_threads: + response = _get_peering_services_single_thread(addresses) + else: + response = _get_peering_services_multi_thread(addresses) + + response = list(response) + if response: + response = json.dumps(response) + r.set(cache_key, response.encode('utf-8')) + + if not response: + return Response( + response='no interfaces found', + status=404, + mimetype="text/html") + + return Response(response, mimetype="application/json") diff --git a/test/conftest.py b/test/conftest.py index d6066602f9026880655075df31d45f690fa61460..bdfe95676907fa170cde88260eadc5385ffdb9a4 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -5,6 +5,7 @@ import netifaces import os import re import tempfile +import threading from lxml import etree import pytest @@ -19,6 +20,8 @@ TEST_DATA_DIRNAME = os.path.realpath(os.path.join( "test", "data")) +_bootstrap_semaphore = threading.Semaphore() + @pytest.fixture def data_config_filename(): @@ -85,20 +88,17 @@ def data_config(data_config_filename): return config.load(f) -TEST_DATA_DIRNAME = os.path.realpath(os.path.join( - inventory_provider.__path__[0], - "..", - "test", - "data")) - - class MockedRedis(object): db = None def __init__(self, *args, **kwargs): - if MockedRedis.db is None: - MockedRedis.prep() + _bootstrap_semaphore.acquire() + try: + if MockedRedis.db is None: + MockedRedis.prep() + finally: + _bootstrap_semaphore.release() # allows us to create other mocks using a different data source file @staticmethod diff --git a/test/test_msr_routes.py b/test/test_msr_routes.py index 3788bdeb08a03adcb7e1dba504363bffcaab9b99..ba10ce0fbe404a78c29985baf96179dc986cdf92 100644 --- a/test/test_msr_routes.py +++ b/test/test_msr_routes.py @@ -4,11 +4,13 @@ import jsonschema import pytest from inventory_provider.routes.msr import PEERING_LIST_SCHEMA, \ - PEERING_GROUP_LIST_SCHEMA + PEERING_GROUP_LIST_SCHEMA, PEERING_ADDRESS_SERVICES_LIST, \ + _get_services_for_address from inventory_provider.routes.poller import SERVICES_LIST_SCHEMA +from inventory_provider.tasks.common import _get_redis + DEFAULT_REQUEST_HEADERS = { - "Content-type": "application/json", "Accept": ["application/json"] } @@ -135,3 +137,168 @@ def test_peerings_group_list(client, uri): jsonschema.validate(response_data, PEERING_GROUP_LIST_SCHEMA) assert response_data # test data is non-empty + + +@pytest.mark.parametrize('address', [ + # hand-chosen, should have services + '62.40.127.141', # from sample peering outage response + '62.40.127.139', # from sample peering outage response + '62.40.98.201', # AMS-AMS IP TRUNK (mx1.ams/ae0.1) + '62.40.127.134', # BELNET AP3 (mx1.ams/ae13.2) + '62.40.124.38', # SURF AP1 (mx1.ams/ae15.1103) + '62.40.125.57', # JISC AP2 (mx1.lon2/ae12.0) + '2001:0798:0099:0001:0000:0000:0000:0026', # v6 peering with Internet2 + '2001:0798:0099:0001:0000:0000:0000:0056', # v6 peering with Canarie + '2001:0798:0018:10aa:0000:0000:0000:0016', # v6 peering with HEANET +]) +def test_lookup_services_for_address(address, mocked_redis): + _redis_instance = _get_redis({ + 'redis': { + 'hostname': None, + 'port': None + }, + 'redis-databases': [9, 7, 5] + }) + + info = list(_get_services_for_address(address, r=_redis_instance)) + jsonschema.validate(info, PEERING_ADDRESS_SERVICES_LIST) + + # sanity check to be sure we have interesting test data + assert info + assert any(x['services'] for x in info) + + +_OUTAGE_PEER_ADDRESSES = [ + # taken from a sample splunk query result + '83.97.93.247', + '146.48.78.13', + '185.6.36.40', + '2a00:1620:c0:4e:146:48:78:13', + '62.40.125.102', + '62.40.126.11', + '62.40.127.141', + '62.40.98.11', + '62.40.102.19', + '202.179.249.33', + '202.179.249.209', + '138.44.226.6', + '138.44.226.8', + '203.30.38.92', + '195.66.226.140', + '195.66.224.122', + '62.40.124.226', + '80.81.194.138', + '62.40.127.139', + '80.81.195.40', + '80.81.194.152', + '62.40.125.154', + '62.40.124.147', + '62.40.100.39', + '62.40.126.222', + '83.97.88.197', + '83.97.88.166', + '91.210.16.114', + '62.40.126.146', + '62.40.100.45', + '62.40.125.198', + '2400:4500::1:0:32', + '62.40.126.230', + '117.103.111.142', + '91.210.16.5', + '195.66.227.152', + '91.210.16.63', + '62.40.109.146', + '64.57.30.209', + '198.124.80.29', + '62.40.125.78', + '192.84.8.13', + '62.40.124.242', + '185.1.47.55', + '83.97.89.222', + '185.1.47.54', + '83.97.88.228', + '83.97.88.229', + '83.97.90.2', + '195.66.224.207', + '83.97.89.242', + '62.40.124.249', + '62.40.126.37', + '195.66.226.230', + '62.40.124.222', + '185.6.36.75', + '80.249.210.1', + '62.40.124.230', + '198.124.80.9', + '83.97.88.162', + '62.40.125.130', + '195.66.225.63', + '80.81.195.189', + '195.66.225.121', + '62.40.125.167', + '195.66.225.142', + '62.40.125.18', + '91.210.16.113', + '80.249.210.95', + '80.249.209.34', + '62.40.125.238', + '83.97.88.102', + '80.81.194.165', + '62.40.125.254', + '83.97.88.106', + '91.210.16.202', + '80.249.208.164', + '185.1.192.59', + '195.66.226.180', + '62.40.125.134', + '83.97.89.17', + '62.40.126.3', + '80.249.209.232', + '83.97.88.34', + '185.1.47.48', + '83.97.89.250', + '83.97.88.185', + '80.249.209.53', + '62.40.125.174', + '205.189.32.76', + '185.6.36.55', + '185.6.36.28', + '80.81.195.168', + '62.40.125.42', + '80.81.192.43', + '83.97.88.206', + '62.40.100.59', + '62.40.125.118', + '62.40.124.150', + '83.97.88.46' +] + + +def test_peering_services(client): + headers = {'Content-Type': 'application/json'} + headers.update(DEFAULT_REQUEST_HEADERS) + rv = client.post( + '/msr/bgp/peering-services', + headers=headers, + data=json.dumps(_OUTAGE_PEER_ADDRESSES)) + assert rv.status_code == 200 + assert rv.is_json + response_data = json.loads(rv.data.decode('utf-8')) + + assert response_data # test data is non-empty + jsonschema.validate(response_data, PEERING_ADDRESS_SERVICES_LIST) + + +def test_peering_services_single_threaded(client): + # this functionality is mainly for testing/debugging + headers = {'Content-Type': 'application/json'} + headers.update(DEFAULT_REQUEST_HEADERS) + rv = client.post( + '/msr/bgp/peering-services?no-threads=1', + headers=headers, + data=json.dumps(_OUTAGE_PEER_ADDRESSES)) + assert rv.status_code == 200 + assert rv.is_json + response_data = json.loads(rv.data.decode('utf-8')) + + assert response_data # test data is non-empty + jsonschema.validate(response_data, PEERING_ADDRESS_SERVICES_LIST)