diff --git a/inventory_provider/routes/msr.py b/inventory_provider/routes/msr.py index 8fcb8bad96a3e2e08bc57762ebfcf580308d9a01..9a515c95ff048398f253fd032758be51d3ba25e1 100644 --- a/inventory_provider/routes/msr.py +++ b/inventory_provider/routes/msr.py @@ -75,8 +75,6 @@ from flask import Blueprint, Response, request, current_app, jsonify import jsonschema from inventory_provider.routes import common -from inventory_provider.routes.classifier_schema \ - import _ipaddresses_definitions from inventory_provider.routes.classifier import \ get_ims_equipment_name, get_ims_interface, get_interface_services_and_loc from inventory_provider.routes.common import _ignore_cache_or_retrieve @@ -416,6 +414,31 @@ def _get_subnet_interfaces(address, r): return [] +def _get_peer_address_services(address: str, r): + for ifc_info in _get_subnet_interfaces(address, r): + ims_source_equipment = get_ims_equipment_name( + ifc_info['router'], r) + ims_interface = get_ims_interface(ifc_info['interface name']) + service_info = get_interface_services_and_loc( + ims_source_equipment, ims_interface, r) + + # make a dict to de-dup the services list + services_dict = {} + for s in service_info.get('services', []): + services_dict[s['id']] = { + 'name': s['name'], + 'type': s['service_type'], + 'status': s['status'] + } + + yield { + 'address': address, + 'hostname': ifc_info['router'], + 'interface': ifc_info['interface name'], + 'services': list(services_dict.values()) + } + + def _load_address_services_proc(address_queue, results_queue, config_params): """ create a local redis connection with the current db index, @@ -441,30 +464,8 @@ def _load_address_services_proc(address_queue, results_queue, config_params): if not address: break - for ifc_info in _get_subnet_interfaces(address, r): - ims_source_equipment = get_ims_equipment_name( - ifc_info['router'], r) - ims_interface = get_ims_interface(ifc_info['interface name']) - service_info = get_interface_services_and_loc( - ims_source_equipment, ims_interface, r) - - # make a dict to de-dup the services list - services_dict = {} - for s in service_info.get('services', []): - services_dict[s['id']] = { - 'name': s['name'], - 'type': s['service_type'], - 'status': s['status'] - } - - address_info = { - 'address': address, - 'hostname': ifc_info['router'], - 'interface': ifc_info['interface name'], - 'services': list(services_dict.values()) - } - - results_queue.put(address_info) + for service_info in _get_peer_address_services(address, r): + results_queue.put(service_info) except json.JSONDecodeError: logger.exception(f'error decoding redis entry for {address}') @@ -475,29 +476,23 @@ def _load_address_services_proc(address_queue, results_queue, config_params): results_queue.put(None) -@routes.route('/bgp/peering-services', methods=['POST']) -@common.require_accepts_json -def get_peering_services(): +def _get_peering_services_multi_thread(addresses): """ - Handler for `/msr/bgp/peering-services` + normal handler for `/msr/bgp/peering-services` - Takes a json-formatted payload with the following schema: + this one does the lookups in multiple threads, each with its own + redis connection - :return: - """ - config_params = current_app.config['INVENTORY_PROVIDER_CONFIG'] - addresses = request.json - jsonschema.validate(addresses, IP_ADDRESS_LIST_SCHEMA) + (cf. _get_peering_services_single_thread) - addresses = set(addresses) # remove duplicates - - # validate addresses, to decrease chances of dying in a worker thread - for a in addresses: - assert ipaddress.ip_address(a) + :param addresses: iterable of address strings + :return: yields dicts returned from _get_peer_address_services + """ response_queue = queue.Queue() threads = [] + config_params = current_app.config['INVENTORY_PROVIDER_CONFIG'] for _ in range(min(len(addresses), 10)): q = queue.Queue() t = threading.Thread( @@ -514,23 +509,72 @@ def get_peering_services(): for t in threads: t['queue'].put(None) - response = [] - num_finished = 0 # read values from response_queue until we receive # None len(threads) times while num_finished < len(threads): value = response_queue.get() if not value: + # contract is that thread returns None when done num_finished += 1 logger.debug('one worker thread finished') continue - response.append(value) + yield value # cleanup like we're supposed to, even though it's python for t in threads: t['thread'].join(timeout=0.5) # timeout, for sanity + +def _get_peering_services_single_thread(addresses): + """ + used by `/msr/bgp/peering-services` + + this one does the lookups serially, in the current thread and a single + redis connection + + (cf. _get_peering_services_multi_thread) + + :param addresses: iterable of address strings + :return: yields dicts returned from _get_peer_address_services + """ + r = common.get_current_redis() + for a in addresses: + yield from _get_peer_address_services(a, r) + + +@routes.route('/bgp/peering-services', methods=['POST']) +@common.require_accepts_json +def get_peering_services(): + """ + Handler for `/msr/bgp/peering-services` + + Takes a json-formatted payload with the following schema: + + + A parameter `no-threads` can be given. If its truthiness + value evaluates to True, then the lookups are done in a single thread. + (This functionality is mainly for testing/debugging - it's not + expected to be used in production.) + + :return: + """ + addresses = request.json + jsonschema.validate(addresses, IP_ADDRESS_LIST_SCHEMA) + + addresses = set(addresses) # remove duplicates + + # validate addresses, to decrease chances of dying in a worker thread + for a in addresses: + assert ipaddress.ip_address(a) + + no_threads = common.get_bool_request_arg('no-threads', False) + if no_threads: + response = _get_peering_services_single_thread(addresses) + else: + response = _get_peering_services_multi_thread(addresses) + + response = list(response) if not response: return Response( response='no interfaces found', diff --git a/test/test_msr_routes.py b/test/test_msr_routes.py index 4babd97d6089f8818b943174722e8d3b0f147dc0..ea1cc2dff3707ec10eae18011cf74c769af05d82 100644 --- a/test/test_msr_routes.py +++ b/test/test_msr_routes.py @@ -137,118 +137,134 @@ def test_peerings_group_list(client, uri): assert response_data # test data is non-empty -def test_peering_services(client): +# taken from a sample splunk query +_OUTAGE_PEER_ADDRESSES = [ + '83.97.93.247', + '146.48.78.13', + '185.6.36.40', + '2a00:1620:c0:4e:146:48:78:13', + '62.40.125.102', + '62.40.126.11', + '62.40.127.141', + '62.40.98.11', + '62.40.102.19', + '202.179.249.33', + '202.179.249.209', + '138.44.226.6', + '138.44.226.8', + '203.30.38.92', + '195.66.226.140', + '195.66.224.122', + '62.40.124.226', + '80.81.194.138', + '62.40.127.139', + '80.81.195.40', + '80.81.194.152', + '62.40.125.154', + '62.40.124.147', + '62.40.100.39', + '62.40.126.222', + '83.97.88.197', + '83.97.88.166', + '91.210.16.114', + '62.40.126.146', + '62.40.100.45', + '62.40.125.198', + '2400:4500::1:0:32', + '62.40.126.230', + '117.103.111.142', + '91.210.16.5', + '195.66.227.152', + '91.210.16.63', + '62.40.109.146', + '64.57.30.209', + '198.124.80.29', + '62.40.125.78', + '192.84.8.13', + '62.40.124.242', + '185.1.47.55', + '83.97.89.222', + '185.1.47.54', + '83.97.88.228', + '83.97.88.229', + '83.97.90.2', + '195.66.224.207', + '83.97.89.242', + '62.40.124.249', + '62.40.126.37', + '195.66.226.230', + '62.40.124.222', + '185.6.36.75', + '80.249.210.1', + '62.40.124.230', + '198.124.80.9', + '83.97.88.162', + '62.40.125.130', + '195.66.225.63', + '80.81.195.189', + '195.66.225.121', + '62.40.125.167', + '195.66.225.142', + '62.40.125.18', + '91.210.16.113', + '80.249.210.95', + '80.249.209.34', + '62.40.125.238', + '83.97.88.102', + '80.81.194.165', + '62.40.125.254', + '83.97.88.106', + '91.210.16.202', + '80.249.208.164', + '185.1.192.59', + '195.66.226.180', + '62.40.125.134', + '83.97.89.17', + '62.40.126.3', + '80.249.209.232', + '83.97.88.34', + '185.1.47.48', + '83.97.89.250', + '83.97.88.185', + '80.249.209.53', + '62.40.125.174', + '205.189.32.76', + '185.6.36.55', + '185.6.36.28', + '80.81.195.168', + '62.40.125.42', + '80.81.192.43', + '83.97.88.206', + '62.40.100.59', + '62.40.125.118', + '62.40.124.150', + '83.97.88.46' +] - # sample splunk output - payload = [ - '83.97.93.247', - '146.48.78.13', - '185.6.36.40', - '2a00:1620:c0:4e:146:48:78:13', - '62.40.125.102', - '62.40.126.11', - '62.40.127.141', - '62.40.98.11', - '62.40.102.19', - '202.179.249.33', - '202.179.249.209', - '138.44.226.6', - '138.44.226.8', - '203.30.38.92', - '195.66.226.140', - '195.66.224.122', - '62.40.124.226', - '80.81.194.138', - '62.40.127.139', - '80.81.195.40', - '80.81.194.152', - '62.40.125.154', - '62.40.124.147', - '62.40.100.39', - '62.40.126.222', - '83.97.88.197', - '83.97.88.166', - '91.210.16.114', - '62.40.126.146', - '62.40.100.45', - '62.40.125.198', - '2400:4500::1:0:32', - '62.40.126.230', - '117.103.111.142', - '91.210.16.5', - '195.66.227.152', - '91.210.16.63', - '62.40.109.146', - '64.57.30.209', - '198.124.80.29', - '62.40.125.78', - '192.84.8.13', - '62.40.124.242', - '185.1.47.55', - '83.97.89.222', - '185.1.47.54', - '83.97.88.228', - '83.97.88.229', - '83.97.90.2', - '195.66.224.207', - '83.97.89.242', - '62.40.124.249', - '62.40.126.37', - '195.66.226.230', - '62.40.124.222', - '185.6.36.75', - '80.249.210.1', - '62.40.124.230', - '198.124.80.9', - '83.97.88.162', - '62.40.125.130', - '195.66.225.63', - '80.81.195.189', - '195.66.225.121', - '62.40.125.167', - '195.66.225.142', - '62.40.125.18', - '91.210.16.113', - '80.249.210.95', - '80.249.209.34', - '62.40.125.238', - '83.97.88.102', - '80.81.194.165', - '62.40.125.254', - '83.97.88.106', - '91.210.16.202', - '80.249.208.164', - '185.1.192.59', - '195.66.226.180', - '62.40.125.134', - '83.97.89.17', - '62.40.126.3', - '80.249.209.232', - '83.97.88.34', - '185.1.47.48', - '83.97.89.250', - '83.97.88.185', - '80.249.209.53', - '62.40.125.174', - '205.189.32.76', - '185.6.36.55', - '185.6.36.28', - '80.81.195.168', - '62.40.125.42', - '80.81.192.43', - '83.97.88.206', - '62.40.100.59', - '62.40.125.118', - '62.40.124.150', - '83.97.88.46' - ] +def test_peering_services(client): headers = {'Content-Type': 'application/json'} headers.update(DEFAULT_REQUEST_HEADERS) rv = client.post( '/msr/bgp/peering-services', headers=headers, - data=json.dumps(payload)) + data=json.dumps(_OUTAGE_PEER_ADDRESSES)) + assert rv.status_code == 200 + assert rv.is_json + response_data = json.loads(rv.data.decode('utf-8')) + # jsonschema.validate(response_data, IP_ADDRESS_LIST_SCHEMA) + + assert response_data # test data is non-empty + + +def test_peering_services_single_threaded(client): + # this functionality is mainly for testing/debugging + headers = {'Content-Type': 'application/json'} + headers.update(DEFAULT_REQUEST_HEADERS) + rv = client.post( + '/msr/bgp/peering-services?no-threads=1', + headers=headers, + data=json.dumps(_OUTAGE_PEER_ADDRESSES)) assert rv.status_code == 200 assert rv.is_json response_data = json.loads(rv.data.decode('utf-8'))