Skip to content
Snippets Groups Projects
Commit a40f911a authored by Release Webservice's avatar Release Webservice
Browse files

Finished release 0.68.

parents ce87dab6 d4f39d34
Branches
Tags
No related merge requests found
......@@ -175,12 +175,15 @@ def get_interface_services_and_loc(ims_source_equipment, ims_interface, redis):
keys_to_remove = set(_s.keys()) - keys
for k in keys_to_remove:
_s.pop(k)
result = {}
raw_services = redis.get(
f'ims:interface_services:{ims_source_equipment}:{ims_interface}')
result = {}
if raw_services:
result['services'] = []
# result['services'] = []
related_services = {}
services = {}
contacts = set()
for s in json.loads(raw_services.decode('utf-8')):
related_services.update(
......@@ -188,9 +191,16 @@ def get_interface_services_and_loc(ims_source_equipment, ims_interface, redis):
contacts.update(set(s.pop('contacts', set())))
if s['circuit_type'] == 'service':
_format_service(s)
result['services'].append(s)
services['id'] = s
def _sorted_by_name(things_with_name):
return sorted(
list(things_with_name),
key=lambda x: x['name'])
result['contacts'] = sorted(list(contacts))
result['related-services'] = list(related_services.values())
result['services'] = _sorted_by_name(services.values())
result['related-services'] = _sorted_by_name(related_services.values())
if not result['services']:
result.pop('services', None)
......
......@@ -275,6 +275,67 @@ def load_snmp_indexes(hostname=None):
return result
def distribute_jobs_across_workers(
worker_proc, jobs, input_ctx, num_threads=10):
"""
Launch `num_threads` threads with worker_proc and distribute
jobs across them. Then return the results from all workers.
(generic version of _load_redis_docs)
worker_proc should be a function that takes args:
- input queue (items from input_data_items are written here)
- output queue (results from each input item are to be written here)
- input_ctx (some worker-specific data)
worker contract is:
- None is written to input queue iff there are no more items coming
- the worker writes None to the output queue when it exits
:param worker_proc: worker proc, as above
:param input_data_items: an iterable of things to put in input queue
:param input_ctx: some data to pass when starting worker proc
:param num_threads: number of worker threads to start
:return: yields all values computed by worker procs
"""
assert isinstance(num_threads, int) and num_threads > 0 # sanity
response_queue = queue.Queue()
threads = []
for _ in range(num_threads):
q = queue.Queue()
t = threading.Thread(
target=worker_proc,
args=[q, response_queue, input_ctx])
t.start()
threads.append({'thread': t, 'queue': q})
for job_data in jobs:
t = random.choice(threads)
t['queue'].put(job_data)
# tell all threads there are no more keys coming
for t in threads:
t['queue'].put(None)
num_finished = 0
# read values from response_queue until we receive
# None len(threads) times
while num_finished < len(threads):
job_result = response_queue.get()
if not job_result:
# contract is that thread returns None when done
num_finished += 1
logger.debug('one worker thread finished')
continue
yield job_result
# cleanup like we're supposed to, even though it's python
for t in threads:
t['thread'].join(timeout=0.5) # timeout, for sanity
def ims_equipment_to_hostname(equipment):
"""
changes names like MX1.AMS.NL to mx1.ams.nl.geant.net
......
......@@ -25,6 +25,12 @@ These endpoints are intended for use by MSR.
.. autofunction:: inventory_provider.routes.msr.logical_system_peerings
/msr/bgp/peering-services
-------------------------------------
.. autofunction:: inventory_provider.routes.msr.get_peering_services
/msr/bgp/groups
-------------------------------------
......@@ -56,17 +62,29 @@ helpers
.. autofunction:: inventory_provider.routes.msr._handle_peering_group_request
""" # noqa E501
import binascii
import functools
import hashlib
import itertools
import json
import ipaddress
import logging
import re
import threading
from flask import Blueprint, Response, request
from flask import Blueprint, Response, request, current_app
import jsonschema
from inventory_provider.routes import common
from inventory_provider.routes.classifier import \
get_ims_equipment_name, get_ims_interface, get_interface_services_and_loc
from inventory_provider.routes.common import _ignore_cache_or_retrieve
from inventory_provider.routes.poller import get_services
from inventory_provider.tasks import common as tasks_common
routes = Blueprint('msr-query-routes', __name__)
logger = logging.getLogger(__name__)
_subnet_lookup_semaphore = threading.Semaphore()
PEERING_GROUP_LIST_SCHEMA = {
'$schema': 'http://json-schema.org/draft-07/schema#',
......@@ -106,6 +124,55 @@ PEERING_LIST_SCHEMA = {
}
IP_ADDRESS_LIST_SCHEMA = {
'$schema': 'http://json-schema.org/draft-07/schema#',
'definitions': {
"ip-address": {
"type": "string",
"oneOf": [
{"pattern": r'^(\d+\.){3}\d+$'},
{"pattern": r'^[a-f0-9:]+$'}
]
}
},
'type': 'array',
'items': {'$ref': '#/definitions/ip-address'},
'minItems': 1
}
PEERING_ADDRESS_SERVICES_LIST = {
'$schema': 'http://json-schema.org/draft-07/schema#',
'definitions': {
'service': {
'properties': {
'id': {'type': 'integer'},
'name': {'type': 'string'},
'type': {'type': 'string'},
'status': {'type': 'string'}
},
'required': ['name', 'type', 'status'],
'additionalProperties': False
},
'address-service-info': {
'properties': {
'address': {'type': 'string'},
'hostname': {'type': 'string'},
'interface': {'type': 'string'},
'services': {
'type': 'array',
'items': {'$ref': '#/definitions/service'}
}
},
'required': ['address', 'hostname', 'interface', 'services'],
'additionalProperties': False
}
},
'type': 'array',
'items': {'$ref': '#/definitions/address-service-info'}
}
@routes.after_request
def after_request(resp):
return common.after_request(resp)
......@@ -327,3 +394,264 @@ def get_access_services():
cf. :meth:`inventory_provider.routes.poller.get_services`
"""
return get_services(service_type='geant_ip')
def _find_subnet_keys(addresses):
"""
yields pairs like:
(redis key [str], address [str])
we search to the end of the list in case of network config
errors (same address in multiple subnets)
:param addresses: iterable of strings (like PEER_ADDRESS_LIST)
:return: as above
"""
# make a dict & remove duplicates
# will raise in case of invalid addresses
remaining_addresses = {
a: ipaddress.ip_address(a)
for a in set(addresses)
}
r = common.get_current_redis()
# scan with bigger batches, to mitigate network latency effects
for k in r.scan_iter('subnets:*', count=1000):
if not remaining_addresses:
break
k = k.decode('utf-8')
m = re.match(r'^subnets:(.*)$', k)
assert m, 'sanity failure: redis returned an invalid key name'
interface = ipaddress.ip_interface(m.group(1))
try:
matched_address = next(
a for a, v
in remaining_addresses.items()
if v == interface.ip)
del remaining_addresses[matched_address]
yield k, matched_address
except StopIteration:
# no match
continue
@functools.lru_cache(100)
def _get_subnets(r):
result = {}
for k in r.scan_iter('subnets:*', count=1000):
k = k.decode('utf-8')
m = re.match(r'^subnets:(.+)$', k)
assert m
result[k] = ipaddress.ip_interface(m.group(1)).network
return result
def _get_subnet_interfaces(address, r):
# synchronize calls to _get_subnets, so we don't
# call it many times together when running in
# multi-thread mode
_subnet_lookup_semaphore.acquire()
try:
all_subnets = _get_subnets(r)
except Exception:
logger.exception('error looking up subnets')
all_subnets = {}
finally:
_subnet_lookup_semaphore.release()
address = ipaddress.ip_address(address)
for key, network in all_subnets.items():
if address not in network:
continue
value = r.get(key)
if not value:
logger.error(f'no value for for redis key "{key}"')
continue
yield from json.loads(value.decode('utf-8'))
def _get_services_for_address(address: str, r):
"""
match this address against all interfaces, then look up
any known services for that port
address is assumed to be in a valid v4/v6 format (it's used to
construct a ipaddress.ip_address object without try/except)
:param address: ip address string
:param r: a Redis instance
:return: yields PEERING_ADDRESS_SERVICES_LIST elements
"""
def _formatted_service(s):
return {
'id': s['id'],
'name': s['name'],
'type': s['service_type'],
'status': s['status']
}
for ifc_info in _get_subnet_interfaces(address, r):
ims_source_equipment = get_ims_equipment_name(
ifc_info['router'], r)
ims_interface = get_ims_interface(ifc_info['interface name'])
service_info = get_interface_services_and_loc(
ims_source_equipment, ims_interface, r)
services = service_info.get('services', [])
services = map(_formatted_service, services)
services = sorted(services, key=lambda x: x['name'])
yield {
'address': address,
'hostname': ifc_info['router'],
'interface': ifc_info['interface name'],
'services': list(services)
}
def _load_address_services_proc(address_queue, results_queue, config_params):
"""
create a local redis connection with the current db index,
lookup the values of the keys that come from key_queue
and put them on value_queue
i/o contract:
None arriving on key_queue means no more keys are coming
put None in value_queue means we are finished
:param key_queue:
:param value_queue:
:param config_params: app config
:param doc_type: decoding type to do (xml or json)
:return: nothing
"""
try:
r = tasks_common.get_current_redis(config_params)
while True:
address = address_queue.get()
# contract is that None means no more addresses
if not address:
break
for service_info in _get_services_for_address(address, r):
results_queue.put(service_info)
except json.JSONDecodeError:
logger.exception(f'error decoding redis entry for {address}')
except Exception:
# just log info about this error (for debugging only)
# ... and quit (i.e. let finally cleanup)
logger.exception(f'error looking up service info for {address}')
finally:
# contract is to return None when finished
results_queue.put(None)
def _get_peering_services_multi_thread(addresses):
"""
normal handler for `/msr/bgp/peering-services`
this one does the lookups in multiple threads, each with its own
redis connection
(cf. _get_peering_services_single_thread)
:param addresses: iterable of address strings
:return: yields dicts returned from _get_services_for_address
"""
yield from common.distribute_jobs_across_workers(
worker_proc=_load_address_services_proc,
jobs=addresses,
input_ctx=current_app.config['INVENTORY_PROVIDER_CONFIG'],
num_threads=min(len(addresses), 10))
def _get_peering_services_single_thread(addresses):
"""
used by `/msr/bgp/peering-services`
this one does the lookups serially, in the current thread and a single
redis connection
(cf. _get_peering_services_multi_thread)
:param addresses: iterable of address strings
:return: yields dicts returned from _get_services_for_address
"""
r = common.get_current_redis()
for a in addresses:
yield from _get_services_for_address(a, r)
def _obj_key(o):
m = hashlib.sha256()
m.update(json.dumps(json.dumps(o)).encode('utf-8'))
digest = binascii.b2a_hex(m.digest()).decode('utf-8')
return digest.upper()[-4:]
@routes.route('/bgp/peering-services', methods=['POST'])
@common.require_accepts_json
def get_peering_services():
"""
Handler for `/msr/bgp/peering-services`
This method must be called with POST method, and the payload
should be a json-formatted list of addresses (strings), which will
be validated against the following schema:
.. asjson::
inventory_provider.routes.msr.IP_ADDRESS_LIST_SCHEMA
The response will be formatted as follows:
.. asjson::
inventory_provider.routes.msr.PEERING_ADDRESS_SERVICES_LIST
A `no-threads` can be also be given. If its truthiness
value evaluates to True, then the lookups are done in a single thread.
(This functionality is mainly for testing/debugging - it's not
expected to be used in production.)
:return:
"""
addresses = request.json
jsonschema.validate(addresses, IP_ADDRESS_LIST_SCHEMA)
addresses = set(addresses) # remove duplicates
input_data_key = _obj_key(sorted(list(addresses)))
cache_key = f'classifier-cache:msr:peering-services:{input_data_key}'
r = common.get_current_redis()
response = _ignore_cache_or_retrieve(request, cache_key, r)
if not response:
# validate addresses, to decrease chances of dying in a worker thread
for a in addresses:
assert ipaddress.ip_address(a)
no_threads = common.get_bool_request_arg('no-threads', False)
if no_threads:
response = _get_peering_services_single_thread(addresses)
else:
response = _get_peering_services_multi_thread(addresses)
response = list(response)
if response:
response = json.dumps(response)
r.set(cache_key, response.encode('utf-8'))
if not response:
return Response(
response='no interfaces found',
status=404,
mimetype="text/html")
return Response(response, mimetype="application/json")
......@@ -2,7 +2,7 @@ from setuptools import setup, find_packages
setup(
name='inventory-provider',
version="0.67",
version="0.68",
author='GEANT',
author_email='swd@geant.org',
description='Dashboard inventory provider',
......
......@@ -5,6 +5,7 @@ import netifaces
import os
import re
import tempfile
import threading
from lxml import etree
import pytest
......@@ -19,6 +20,8 @@ TEST_DATA_DIRNAME = os.path.realpath(os.path.join(
"test",
"data"))
_bootstrap_semaphore = threading.Semaphore()
@pytest.fixture
def data_config_filename():
......@@ -85,20 +88,17 @@ def data_config(data_config_filename):
return config.load(f)
TEST_DATA_DIRNAME = os.path.realpath(os.path.join(
inventory_provider.__path__[0],
"..",
"test",
"data"))
class MockedRedis(object):
db = None
def __init__(self, *args, **kwargs):
if MockedRedis.db is None:
MockedRedis.prep()
_bootstrap_semaphore.acquire()
try:
if MockedRedis.db is None:
MockedRedis.prep()
finally:
_bootstrap_semaphore.release()
# allows us to create other mocks using a different data source file
@staticmethod
......
......@@ -4,11 +4,13 @@ import jsonschema
import pytest
from inventory_provider.routes.msr import PEERING_LIST_SCHEMA, \
PEERING_GROUP_LIST_SCHEMA
PEERING_GROUP_LIST_SCHEMA, PEERING_ADDRESS_SERVICES_LIST, \
_get_services_for_address
from inventory_provider.routes.poller import SERVICES_LIST_SCHEMA
from inventory_provider.tasks.common import _get_redis
DEFAULT_REQUEST_HEADERS = {
"Content-type": "application/json",
"Accept": ["application/json"]
}
......@@ -135,3 +137,168 @@ def test_peerings_group_list(client, uri):
jsonschema.validate(response_data, PEERING_GROUP_LIST_SCHEMA)
assert response_data # test data is non-empty
@pytest.mark.parametrize('address', [
# hand-chosen, should have services
'62.40.127.141', # from sample peering outage response
'62.40.127.139', # from sample peering outage response
'62.40.98.201', # AMS-AMS IP TRUNK (mx1.ams/ae0.1)
'62.40.127.134', # BELNET AP3 (mx1.ams/ae13.2)
'62.40.124.38', # SURF AP1 (mx1.ams/ae15.1103)
'62.40.125.57', # JISC AP2 (mx1.lon2/ae12.0)
'2001:0798:0099:0001:0000:0000:0000:0026', # v6 peering with Internet2
'2001:0798:0099:0001:0000:0000:0000:0056', # v6 peering with Canarie
'2001:0798:0018:10aa:0000:0000:0000:0016', # v6 peering with HEANET
])
def test_lookup_services_for_address(address, mocked_redis):
_redis_instance = _get_redis({
'redis': {
'hostname': None,
'port': None
},
'redis-databases': [9, 7, 5]
})
info = list(_get_services_for_address(address, r=_redis_instance))
jsonschema.validate(info, PEERING_ADDRESS_SERVICES_LIST)
# sanity check to be sure we have interesting test data
assert info
assert any(x['services'] for x in info)
_OUTAGE_PEER_ADDRESSES = [
# taken from a sample splunk query result
'83.97.93.247',
'146.48.78.13',
'185.6.36.40',
'2a00:1620:c0:4e:146:48:78:13',
'62.40.125.102',
'62.40.126.11',
'62.40.127.141',
'62.40.98.11',
'62.40.102.19',
'202.179.249.33',
'202.179.249.209',
'138.44.226.6',
'138.44.226.8',
'203.30.38.92',
'195.66.226.140',
'195.66.224.122',
'62.40.124.226',
'80.81.194.138',
'62.40.127.139',
'80.81.195.40',
'80.81.194.152',
'62.40.125.154',
'62.40.124.147',
'62.40.100.39',
'62.40.126.222',
'83.97.88.197',
'83.97.88.166',
'91.210.16.114',
'62.40.126.146',
'62.40.100.45',
'62.40.125.198',
'2400:4500::1:0:32',
'62.40.126.230',
'117.103.111.142',
'91.210.16.5',
'195.66.227.152',
'91.210.16.63',
'62.40.109.146',
'64.57.30.209',
'198.124.80.29',
'62.40.125.78',
'192.84.8.13',
'62.40.124.242',
'185.1.47.55',
'83.97.89.222',
'185.1.47.54',
'83.97.88.228',
'83.97.88.229',
'83.97.90.2',
'195.66.224.207',
'83.97.89.242',
'62.40.124.249',
'62.40.126.37',
'195.66.226.230',
'62.40.124.222',
'185.6.36.75',
'80.249.210.1',
'62.40.124.230',
'198.124.80.9',
'83.97.88.162',
'62.40.125.130',
'195.66.225.63',
'80.81.195.189',
'195.66.225.121',
'62.40.125.167',
'195.66.225.142',
'62.40.125.18',
'91.210.16.113',
'80.249.210.95',
'80.249.209.34',
'62.40.125.238',
'83.97.88.102',
'80.81.194.165',
'62.40.125.254',
'83.97.88.106',
'91.210.16.202',
'80.249.208.164',
'185.1.192.59',
'195.66.226.180',
'62.40.125.134',
'83.97.89.17',
'62.40.126.3',
'80.249.209.232',
'83.97.88.34',
'185.1.47.48',
'83.97.89.250',
'83.97.88.185',
'80.249.209.53',
'62.40.125.174',
'205.189.32.76',
'185.6.36.55',
'185.6.36.28',
'80.81.195.168',
'62.40.125.42',
'80.81.192.43',
'83.97.88.206',
'62.40.100.59',
'62.40.125.118',
'62.40.124.150',
'83.97.88.46'
]
def test_peering_services(client):
headers = {'Content-Type': 'application/json'}
headers.update(DEFAULT_REQUEST_HEADERS)
rv = client.post(
'/msr/bgp/peering-services',
headers=headers,
data=json.dumps(_OUTAGE_PEER_ADDRESSES))
assert rv.status_code == 200
assert rv.is_json
response_data = json.loads(rv.data.decode('utf-8'))
assert response_data # test data is non-empty
jsonschema.validate(response_data, PEERING_ADDRESS_SERVICES_LIST)
def test_peering_services_single_threaded(client):
# this functionality is mainly for testing/debugging
headers = {'Content-Type': 'application/json'}
headers.update(DEFAULT_REQUEST_HEADERS)
rv = client.post(
'/msr/bgp/peering-services?no-threads=1',
headers=headers,
data=json.dumps(_OUTAGE_PEER_ADDRESSES))
assert rv.status_code == 200
assert rv.is_json
response_data = json.loads(rv.data.decode('utf-8'))
assert response_data # test data is non-empty
jsonschema.validate(response_data, PEERING_ADDRESS_SERVICES_LIST)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment