Skip to content
Snippets Groups Projects
Commit 834595eb authored by Erik Reid's avatar Erik Reid
Browse files

added single-threaded mode for better debugging

parent 13b712d5
Branches
Tags
No related merge requests found
...@@ -75,8 +75,6 @@ from flask import Blueprint, Response, request, current_app, jsonify ...@@ -75,8 +75,6 @@ from flask import Blueprint, Response, request, current_app, jsonify
import jsonschema import jsonschema
from inventory_provider.routes import common from inventory_provider.routes import common
from inventory_provider.routes.classifier_schema \
import _ipaddresses_definitions
from inventory_provider.routes.classifier import \ from inventory_provider.routes.classifier import \
get_ims_equipment_name, get_ims_interface, get_interface_services_and_loc get_ims_equipment_name, get_ims_interface, get_interface_services_and_loc
from inventory_provider.routes.common import _ignore_cache_or_retrieve from inventory_provider.routes.common import _ignore_cache_or_retrieve
...@@ -416,6 +414,31 @@ def _get_subnet_interfaces(address, r): ...@@ -416,6 +414,31 @@ def _get_subnet_interfaces(address, r):
return [] return []
def _get_peer_address_services(address: str, r):
for ifc_info in _get_subnet_interfaces(address, r):
ims_source_equipment = get_ims_equipment_name(
ifc_info['router'], r)
ims_interface = get_ims_interface(ifc_info['interface name'])
service_info = get_interface_services_and_loc(
ims_source_equipment, ims_interface, r)
# make a dict to de-dup the services list
services_dict = {}
for s in service_info.get('services', []):
services_dict[s['id']] = {
'name': s['name'],
'type': s['service_type'],
'status': s['status']
}
yield {
'address': address,
'hostname': ifc_info['router'],
'interface': ifc_info['interface name'],
'services': list(services_dict.values())
}
def _load_address_services_proc(address_queue, results_queue, config_params): def _load_address_services_proc(address_queue, results_queue, config_params):
""" """
create a local redis connection with the current db index, create a local redis connection with the current db index,
...@@ -441,30 +464,8 @@ def _load_address_services_proc(address_queue, results_queue, config_params): ...@@ -441,30 +464,8 @@ def _load_address_services_proc(address_queue, results_queue, config_params):
if not address: if not address:
break break
for ifc_info in _get_subnet_interfaces(address, r): for service_info in _get_peer_address_services(address, r):
ims_source_equipment = get_ims_equipment_name( results_queue.put(service_info)
ifc_info['router'], r)
ims_interface = get_ims_interface(ifc_info['interface name'])
service_info = get_interface_services_and_loc(
ims_source_equipment, ims_interface, r)
# make a dict to de-dup the services list
services_dict = {}
for s in service_info.get('services', []):
services_dict[s['id']] = {
'name': s['name'],
'type': s['service_type'],
'status': s['status']
}
address_info = {
'address': address,
'hostname': ifc_info['router'],
'interface': ifc_info['interface name'],
'services': list(services_dict.values())
}
results_queue.put(address_info)
except json.JSONDecodeError: except json.JSONDecodeError:
logger.exception(f'error decoding redis entry for {address}') logger.exception(f'error decoding redis entry for {address}')
...@@ -475,29 +476,23 @@ def _load_address_services_proc(address_queue, results_queue, config_params): ...@@ -475,29 +476,23 @@ def _load_address_services_proc(address_queue, results_queue, config_params):
results_queue.put(None) results_queue.put(None)
@routes.route('/bgp/peering-services', methods=['POST']) def _get_peering_services_multi_thread(addresses):
@common.require_accepts_json
def get_peering_services():
""" """
Handler for `/msr/bgp/peering-services` normal handler for `/msr/bgp/peering-services`
Takes a json-formatted payload with the following schema: this one does the lookups in multiple threads, each with its own
redis connection
:return: (cf. _get_peering_services_single_thread)
"""
config_params = current_app.config['INVENTORY_PROVIDER_CONFIG']
addresses = request.json
jsonschema.validate(addresses, IP_ADDRESS_LIST_SCHEMA)
addresses = set(addresses) # remove duplicates :param addresses: iterable of address strings
:return: yields dicts returned from _get_peer_address_services
# validate addresses, to decrease chances of dying in a worker thread """
for a in addresses:
assert ipaddress.ip_address(a)
response_queue = queue.Queue() response_queue = queue.Queue()
threads = [] threads = []
config_params = current_app.config['INVENTORY_PROVIDER_CONFIG']
for _ in range(min(len(addresses), 10)): for _ in range(min(len(addresses), 10)):
q = queue.Queue() q = queue.Queue()
t = threading.Thread( t = threading.Thread(
...@@ -514,23 +509,72 @@ def get_peering_services(): ...@@ -514,23 +509,72 @@ def get_peering_services():
for t in threads: for t in threads:
t['queue'].put(None) t['queue'].put(None)
response = []
num_finished = 0 num_finished = 0
# read values from response_queue until we receive # read values from response_queue until we receive
# None len(threads) times # None len(threads) times
while num_finished < len(threads): while num_finished < len(threads):
value = response_queue.get() value = response_queue.get()
if not value: if not value:
# contract is that thread returns None when done
num_finished += 1 num_finished += 1
logger.debug('one worker thread finished') logger.debug('one worker thread finished')
continue continue
response.append(value) yield value
# cleanup like we're supposed to, even though it's python # cleanup like we're supposed to, even though it's python
for t in threads: for t in threads:
t['thread'].join(timeout=0.5) # timeout, for sanity t['thread'].join(timeout=0.5) # timeout, for sanity
def _get_peering_services_single_thread(addresses):
"""
used by `/msr/bgp/peering-services`
this one does the lookups serially, in the current thread and a single
redis connection
(cf. _get_peering_services_multi_thread)
:param addresses: iterable of address strings
:return: yields dicts returned from _get_peer_address_services
"""
r = common.get_current_redis()
for a in addresses:
yield from _get_peer_address_services(a, r)
@routes.route('/bgp/peering-services', methods=['POST'])
@common.require_accepts_json
def get_peering_services():
"""
Handler for `/msr/bgp/peering-services`
Takes a json-formatted payload with the following schema:
A parameter `no-threads` can be given. If its truthiness
value evaluates to True, then the lookups are done in a single thread.
(This functionality is mainly for testing/debugging - it's not
expected to be used in production.)
:return:
"""
addresses = request.json
jsonschema.validate(addresses, IP_ADDRESS_LIST_SCHEMA)
addresses = set(addresses) # remove duplicates
# validate addresses, to decrease chances of dying in a worker thread
for a in addresses:
assert ipaddress.ip_address(a)
no_threads = common.get_bool_request_arg('no-threads', False)
if no_threads:
response = _get_peering_services_single_thread(addresses)
else:
response = _get_peering_services_multi_thread(addresses)
response = list(response)
if not response: if not response:
return Response( return Response(
response='no interfaces found', response='no interfaces found',
......
...@@ -137,118 +137,134 @@ def test_peerings_group_list(client, uri): ...@@ -137,118 +137,134 @@ def test_peerings_group_list(client, uri):
assert response_data # test data is non-empty assert response_data # test data is non-empty
def test_peering_services(client): # taken from a sample splunk query
_OUTAGE_PEER_ADDRESSES = [
'83.97.93.247',
'146.48.78.13',
'185.6.36.40',
'2a00:1620:c0:4e:146:48:78:13',
'62.40.125.102',
'62.40.126.11',
'62.40.127.141',
'62.40.98.11',
'62.40.102.19',
'202.179.249.33',
'202.179.249.209',
'138.44.226.6',
'138.44.226.8',
'203.30.38.92',
'195.66.226.140',
'195.66.224.122',
'62.40.124.226',
'80.81.194.138',
'62.40.127.139',
'80.81.195.40',
'80.81.194.152',
'62.40.125.154',
'62.40.124.147',
'62.40.100.39',
'62.40.126.222',
'83.97.88.197',
'83.97.88.166',
'91.210.16.114',
'62.40.126.146',
'62.40.100.45',
'62.40.125.198',
'2400:4500::1:0:32',
'62.40.126.230',
'117.103.111.142',
'91.210.16.5',
'195.66.227.152',
'91.210.16.63',
'62.40.109.146',
'64.57.30.209',
'198.124.80.29',
'62.40.125.78',
'192.84.8.13',
'62.40.124.242',
'185.1.47.55',
'83.97.89.222',
'185.1.47.54',
'83.97.88.228',
'83.97.88.229',
'83.97.90.2',
'195.66.224.207',
'83.97.89.242',
'62.40.124.249',
'62.40.126.37',
'195.66.226.230',
'62.40.124.222',
'185.6.36.75',
'80.249.210.1',
'62.40.124.230',
'198.124.80.9',
'83.97.88.162',
'62.40.125.130',
'195.66.225.63',
'80.81.195.189',
'195.66.225.121',
'62.40.125.167',
'195.66.225.142',
'62.40.125.18',
'91.210.16.113',
'80.249.210.95',
'80.249.209.34',
'62.40.125.238',
'83.97.88.102',
'80.81.194.165',
'62.40.125.254',
'83.97.88.106',
'91.210.16.202',
'80.249.208.164',
'185.1.192.59',
'195.66.226.180',
'62.40.125.134',
'83.97.89.17',
'62.40.126.3',
'80.249.209.232',
'83.97.88.34',
'185.1.47.48',
'83.97.89.250',
'83.97.88.185',
'80.249.209.53',
'62.40.125.174',
'205.189.32.76',
'185.6.36.55',
'185.6.36.28',
'80.81.195.168',
'62.40.125.42',
'80.81.192.43',
'83.97.88.206',
'62.40.100.59',
'62.40.125.118',
'62.40.124.150',
'83.97.88.46'
]
# sample splunk output
payload = [
'83.97.93.247',
'146.48.78.13',
'185.6.36.40',
'2a00:1620:c0:4e:146:48:78:13',
'62.40.125.102',
'62.40.126.11',
'62.40.127.141',
'62.40.98.11',
'62.40.102.19',
'202.179.249.33',
'202.179.249.209',
'138.44.226.6',
'138.44.226.8',
'203.30.38.92',
'195.66.226.140',
'195.66.224.122',
'62.40.124.226',
'80.81.194.138',
'62.40.127.139',
'80.81.195.40',
'80.81.194.152',
'62.40.125.154',
'62.40.124.147',
'62.40.100.39',
'62.40.126.222',
'83.97.88.197',
'83.97.88.166',
'91.210.16.114',
'62.40.126.146',
'62.40.100.45',
'62.40.125.198',
'2400:4500::1:0:32',
'62.40.126.230',
'117.103.111.142',
'91.210.16.5',
'195.66.227.152',
'91.210.16.63',
'62.40.109.146',
'64.57.30.209',
'198.124.80.29',
'62.40.125.78',
'192.84.8.13',
'62.40.124.242',
'185.1.47.55',
'83.97.89.222',
'185.1.47.54',
'83.97.88.228',
'83.97.88.229',
'83.97.90.2',
'195.66.224.207',
'83.97.89.242',
'62.40.124.249',
'62.40.126.37',
'195.66.226.230',
'62.40.124.222',
'185.6.36.75',
'80.249.210.1',
'62.40.124.230',
'198.124.80.9',
'83.97.88.162',
'62.40.125.130',
'195.66.225.63',
'80.81.195.189',
'195.66.225.121',
'62.40.125.167',
'195.66.225.142',
'62.40.125.18',
'91.210.16.113',
'80.249.210.95',
'80.249.209.34',
'62.40.125.238',
'83.97.88.102',
'80.81.194.165',
'62.40.125.254',
'83.97.88.106',
'91.210.16.202',
'80.249.208.164',
'185.1.192.59',
'195.66.226.180',
'62.40.125.134',
'83.97.89.17',
'62.40.126.3',
'80.249.209.232',
'83.97.88.34',
'185.1.47.48',
'83.97.89.250',
'83.97.88.185',
'80.249.209.53',
'62.40.125.174',
'205.189.32.76',
'185.6.36.55',
'185.6.36.28',
'80.81.195.168',
'62.40.125.42',
'80.81.192.43',
'83.97.88.206',
'62.40.100.59',
'62.40.125.118',
'62.40.124.150',
'83.97.88.46'
]
def test_peering_services(client):
headers = {'Content-Type': 'application/json'} headers = {'Content-Type': 'application/json'}
headers.update(DEFAULT_REQUEST_HEADERS) headers.update(DEFAULT_REQUEST_HEADERS)
rv = client.post( rv = client.post(
'/msr/bgp/peering-services', '/msr/bgp/peering-services',
headers=headers, headers=headers,
data=json.dumps(payload)) data=json.dumps(_OUTAGE_PEER_ADDRESSES))
assert rv.status_code == 200
assert rv.is_json
response_data = json.loads(rv.data.decode('utf-8'))
# jsonschema.validate(response_data, IP_ADDRESS_LIST_SCHEMA)
assert response_data # test data is non-empty
def test_peering_services_single_threaded(client):
# this functionality is mainly for testing/debugging
headers = {'Content-Type': 'application/json'}
headers.update(DEFAULT_REQUEST_HEADERS)
rv = client.post(
'/msr/bgp/peering-services?no-threads=1',
headers=headers,
data=json.dumps(_OUTAGE_PEER_ADDRESSES))
assert rv.status_code == 200 assert rv.status_code == 200
assert rv.is_json assert rv.is_json
response_data = json.loads(rv.data.decode('utf-8')) response_data = json.loads(rv.data.decode('utf-8'))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment