diff --git a/inventory_provider/routes/msr.py b/inventory_provider/routes/msr.py index 0bb6a65c68f23bf45ceac35c979f391c85ef563d..8fcb8bad96a3e2e08bc57762ebfcf580308d9a01 100644 --- a/inventory_provider/routes/msr.py +++ b/inventory_provider/routes/msr.py @@ -25,6 +25,12 @@ These endpoints are intended for use by MSR. .. autofunction:: inventory_provider.routes.msr.logical_system_peerings +/msr/bgp/peering-services +------------------------------------- + +.. autofunction:: inventory_provider.routes.msr.get_peering_services + + /msr/bgp/groups ------------------------------------- @@ -58,15 +64,27 @@ helpers """ # noqa E501 import itertools import json +import ipaddress +import logging +import queue +import random +import re +import threading -from flask import Blueprint, Response, request +from flask import Blueprint, Response, request, current_app, jsonify +import jsonschema from inventory_provider.routes import common +from inventory_provider.routes.classifier_schema \ + import _ipaddresses_definitions +from inventory_provider.routes.classifier import \ + get_ims_equipment_name, get_ims_interface, get_interface_services_and_loc from inventory_provider.routes.common import _ignore_cache_or_retrieve from inventory_provider.routes.poller import get_services +from inventory_provider.tasks import common as tasks_common routes = Blueprint('msr-query-routes', __name__) - +logger = logging.getLogger(__name__) PEERING_GROUP_LIST_SCHEMA = { '$schema': 'http://json-schema.org/draft-07/schema#', @@ -106,6 +124,24 @@ PEERING_LIST_SCHEMA = { } +IP_ADDRESS_LIST_SCHEMA = { + '$schema': 'http://json-schema.org/draft-07/schema#', + 'definitions': { + "ip-address": { + "type": "string", + "oneOf": [ + {"pattern": r'^(\d+\.){3}\d+$'}, + {"pattern": r'^[a-f0-9:]+$'} + ] + } + }, + + 'type': 'array', + 'items': {'$ref': '#/definitions/ip-address'}, + 'minItems': 1 +} + + @routes.after_request def after_request(resp): return common.after_request(resp) @@ -327,3 +363,178 @@ def get_access_services(): cf. :meth:`inventory_provider.routes.poller.get_services` """ return get_services(service_type='geant_ip') + + +def _find_subnet_keys(addresses): + """ + yields pairs like: + (redis key [str], address [str]) + + we search to the end of the list in case of network config + errors (same address in multiple subnets) + + :param addresses: iterable of strings (like PEER_ADDRESS_LIST) + :return: as above + """ + # make a dict & remove duplicates + # will raise in case of invalid addresses + remaining_addresses = { + a: ipaddress.ip_address(a) + for a in set(addresses) + } + + r = common.get_current_redis() + # scan with bigger batches, to mitigate network latency effects + for k in r.scan_iter('subnets:*', count=1000): + + if not remaining_addresses: + break + + k = k.decode('utf-8') + m = re.match(r'^subnets:(.*)$', k) + assert m, 'sanity failure: redis returned an invalid key name' + + interface = ipaddress.ip_interface(m.group(1)) + try: + matched_address = next(a for a, v in remaining_addresses.items() + if v == interface.ip) + del remaining_addresses[matched_address] + yield k, matched_address + except StopIteration: + # no match + continue + + +def _get_subnet_interfaces(address, r): + exploded = ipaddress.ip_address(address).exploded + for k in r.scan_iter(f'subnets:{exploded}/*', count=1000): + value = r.get(k.decode('utf-8')) + if not value: + return None + value = value.decode('utf-8') + return json.loads(value) + return [] + + +def _load_address_services_proc(address_queue, results_queue, config_params): + """ + create a local redis connection with the current db index, + lookup the values of the keys that come from key_queue + and put them on value_queue + + i/o contract: + None arriving on key_queue means no more keys are coming + put None in value_queue means we are finished + + :param key_queue: + :param value_queue: + :param config_params: app config + :param doc_type: decoding type to do (xml or json) + :return: nothing + """ + try: + r = tasks_common.get_current_redis(config_params) + while True: + address = address_queue.get() + + # contract is that None means no more addresses + if not address: + break + + for ifc_info in _get_subnet_interfaces(address, r): + ims_source_equipment = get_ims_equipment_name( + ifc_info['router'], r) + ims_interface = get_ims_interface(ifc_info['interface name']) + service_info = get_interface_services_and_loc( + ims_source_equipment, ims_interface, r) + + # make a dict to de-dup the services list + services_dict = {} + for s in service_info.get('services', []): + services_dict[s['id']] = { + 'name': s['name'], + 'type': s['service_type'], + 'status': s['status'] + } + + address_info = { + 'address': address, + 'hostname': ifc_info['router'], + 'interface': ifc_info['interface name'], + 'services': list(services_dict.values()) + } + + results_queue.put(address_info) + + except json.JSONDecodeError: + logger.exception(f'error decoding redis entry for {address}') + except: + logger.exception(f'error looking up service info for {address}') + finally: + # contract is to return None when finished + results_queue.put(None) + + +@routes.route('/bgp/peering-services', methods=['POST']) +@common.require_accepts_json +def get_peering_services(): + """ + Handler for `/msr/bgp/peering-services` + + Takes a json-formatted payload with the following schema: + + :return: + """ + config_params = current_app.config['INVENTORY_PROVIDER_CONFIG'] + addresses = request.json + jsonschema.validate(addresses, IP_ADDRESS_LIST_SCHEMA) + + addresses = set(addresses) # remove duplicates + + # validate addresses, to decrease chances of dying in a worker thread + for a in addresses: + assert ipaddress.ip_address(a) + + response_queue = queue.Queue() + + threads = [] + for _ in range(min(len(addresses), 10)): + q = queue.Queue() + t = threading.Thread( + target=_load_address_services_proc, + args=[q, response_queue, config_params]) + t.start() + threads.append({'thread': t, 'queue': q}) + + for a in addresses: + t = random.choice(threads) + t['queue'].put(a) + + # tell all threads there are no more keys coming + for t in threads: + t['queue'].put(None) + + response = [] + + num_finished = 0 + # read values from response_queue until we receive + # None len(threads) times + while num_finished < len(threads): + value = response_queue.get() + if not value: + num_finished += 1 + logger.debug('one worker thread finished') + continue + response.append(value) + + # cleanup like we're supposed to, even though it's python + for t in threads: + t['thread'].join(timeout=0.5) # timeout, for sanity + + if not response: + return Response( + response='no interfaces found', + status=404, + mimetype="text/html") + + return jsonify(response)