Skip to content
Snippets Groups Projects
Commit 5734acb1 authored by Erik Reid's avatar Erik Reid
Browse files

working start on feature

parent 4c3e66c0
No related branches found
No related tags found
No related merge requests found
...@@ -25,6 +25,12 @@ These endpoints are intended for use by MSR. ...@@ -25,6 +25,12 @@ These endpoints are intended for use by MSR.
.. autofunction:: inventory_provider.routes.msr.logical_system_peerings .. autofunction:: inventory_provider.routes.msr.logical_system_peerings
/msr/bgp/peering-services
-------------------------------------
.. autofunction:: inventory_provider.routes.msr.get_peering_services
/msr/bgp/groups /msr/bgp/groups
------------------------------------- -------------------------------------
...@@ -58,15 +64,27 @@ helpers ...@@ -58,15 +64,27 @@ helpers
""" # noqa E501 """ # noqa E501
import itertools import itertools
import json import json
import ipaddress
import logging
import queue
import random
import re
import threading
from flask import Blueprint, Response, request from flask import Blueprint, Response, request, current_app, jsonify
import jsonschema
from inventory_provider.routes import common from inventory_provider.routes import common
from inventory_provider.routes.classifier_schema \
import _ipaddresses_definitions
from inventory_provider.routes.classifier import \
get_ims_equipment_name, get_ims_interface, get_interface_services_and_loc
from inventory_provider.routes.common import _ignore_cache_or_retrieve from inventory_provider.routes.common import _ignore_cache_or_retrieve
from inventory_provider.routes.poller import get_services from inventory_provider.routes.poller import get_services
from inventory_provider.tasks import common as tasks_common
routes = Blueprint('msr-query-routes', __name__) routes = Blueprint('msr-query-routes', __name__)
logger = logging.getLogger(__name__)
PEERING_GROUP_LIST_SCHEMA = { PEERING_GROUP_LIST_SCHEMA = {
'$schema': 'http://json-schema.org/draft-07/schema#', '$schema': 'http://json-schema.org/draft-07/schema#',
...@@ -106,6 +124,24 @@ PEERING_LIST_SCHEMA = { ...@@ -106,6 +124,24 @@ PEERING_LIST_SCHEMA = {
} }
IP_ADDRESS_LIST_SCHEMA = {
'$schema': 'http://json-schema.org/draft-07/schema#',
'definitions': {
"ip-address": {
"type": "string",
"oneOf": [
{"pattern": r'^(\d+\.){3}\d+$'},
{"pattern": r'^[a-f0-9:]+$'}
]
}
},
'type': 'array',
'items': {'$ref': '#/definitions/ip-address'},
'minItems': 1
}
@routes.after_request @routes.after_request
def after_request(resp): def after_request(resp):
return common.after_request(resp) return common.after_request(resp)
...@@ -327,3 +363,178 @@ def get_access_services(): ...@@ -327,3 +363,178 @@ def get_access_services():
cf. :meth:`inventory_provider.routes.poller.get_services` cf. :meth:`inventory_provider.routes.poller.get_services`
""" """
return get_services(service_type='geant_ip') return get_services(service_type='geant_ip')
def _find_subnet_keys(addresses):
"""
yields pairs like:
(redis key [str], address [str])
we search to the end of the list in case of network config
errors (same address in multiple subnets)
:param addresses: iterable of strings (like PEER_ADDRESS_LIST)
:return: as above
"""
# make a dict & remove duplicates
# will raise in case of invalid addresses
remaining_addresses = {
a: ipaddress.ip_address(a)
for a in set(addresses)
}
r = common.get_current_redis()
# scan with bigger batches, to mitigate network latency effects
for k in r.scan_iter('subnets:*', count=1000):
if not remaining_addresses:
break
k = k.decode('utf-8')
m = re.match(r'^subnets:(.*)$', k)
assert m, 'sanity failure: redis returned an invalid key name'
interface = ipaddress.ip_interface(m.group(1))
try:
matched_address = next(a for a, v in remaining_addresses.items()
if v == interface.ip)
del remaining_addresses[matched_address]
yield k, matched_address
except StopIteration:
# no match
continue
def _get_subnet_interfaces(address, r):
exploded = ipaddress.ip_address(address).exploded
for k in r.scan_iter(f'subnets:{exploded}/*', count=1000):
value = r.get(k.decode('utf-8'))
if not value:
return None
value = value.decode('utf-8')
return json.loads(value)
return []
def _load_address_services_proc(address_queue, results_queue, config_params):
"""
create a local redis connection with the current db index,
lookup the values of the keys that come from key_queue
and put them on value_queue
i/o contract:
None arriving on key_queue means no more keys are coming
put None in value_queue means we are finished
:param key_queue:
:param value_queue:
:param config_params: app config
:param doc_type: decoding type to do (xml or json)
:return: nothing
"""
try:
r = tasks_common.get_current_redis(config_params)
while True:
address = address_queue.get()
# contract is that None means no more addresses
if not address:
break
for ifc_info in _get_subnet_interfaces(address, r):
ims_source_equipment = get_ims_equipment_name(
ifc_info['router'], r)
ims_interface = get_ims_interface(ifc_info['interface name'])
service_info = get_interface_services_and_loc(
ims_source_equipment, ims_interface, r)
# make a dict to de-dup the services list
services_dict = {}
for s in service_info.get('services', []):
services_dict[s['id']] = {
'name': s['name'],
'type': s['service_type'],
'status': s['status']
}
address_info = {
'address': address,
'hostname': ifc_info['router'],
'interface': ifc_info['interface name'],
'services': list(services_dict.values())
}
results_queue.put(address_info)
except json.JSONDecodeError:
logger.exception(f'error decoding redis entry for {address}')
except:
logger.exception(f'error looking up service info for {address}')
finally:
# contract is to return None when finished
results_queue.put(None)
@routes.route('/bgp/peering-services', methods=['POST'])
@common.require_accepts_json
def get_peering_services():
"""
Handler for `/msr/bgp/peering-services`
Takes a json-formatted payload with the following schema:
:return:
"""
config_params = current_app.config['INVENTORY_PROVIDER_CONFIG']
addresses = request.json
jsonschema.validate(addresses, IP_ADDRESS_LIST_SCHEMA)
addresses = set(addresses) # remove duplicates
# validate addresses, to decrease chances of dying in a worker thread
for a in addresses:
assert ipaddress.ip_address(a)
response_queue = queue.Queue()
threads = []
for _ in range(min(len(addresses), 10)):
q = queue.Queue()
t = threading.Thread(
target=_load_address_services_proc,
args=[q, response_queue, config_params])
t.start()
threads.append({'thread': t, 'queue': q})
for a in addresses:
t = random.choice(threads)
t['queue'].put(a)
# tell all threads there are no more keys coming
for t in threads:
t['queue'].put(None)
response = []
num_finished = 0
# read values from response_queue until we receive
# None len(threads) times
while num_finished < len(threads):
value = response_queue.get()
if not value:
num_finished += 1
logger.debug('one worker thread finished')
continue
response.append(value)
# cleanup like we're supposed to, even though it's python
for t in threads:
t['thread'].join(timeout=0.5) # timeout, for sanity
if not response:
return Response(
response='no interfaces found',
status=404,
mimetype="text/html")
return jsonify(response)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment