From 73b1925a3feb6d9ea2e5b5c749788a02b61ac333 Mon Sep 17 00:00:00 2001 From: Erik Reid <erik.reid@geant.org> Date: Fri, 4 Sep 2020 16:45:05 +0200 Subject: [PATCH] support /poller/interfaces without hostname for all --- inventory_provider/routes/common.py | 46 +++++-- inventory_provider/routes/poller.py | 205 +++++++++++++++++++--------- test/test_general_poller_routes.py | 7 + 3 files changed, 183 insertions(+), 75 deletions(-) diff --git a/inventory_provider/routes/common.py b/inventory_provider/routes/common.py index e1821391..ecabd233 100644 --- a/inventory_provider/routes/common.py +++ b/inventory_provider/routes/common.py @@ -6,11 +6,14 @@ import queue import random import threading +from lxml import etree import requests from flask import request, Response, current_app, g from inventory_provider.tasks import common as tasks_common logger = logging.getLogger(__name__) +_DECODE_TYPE_XML = 'xml' +_DECODE_TYPE_JSON = 'json' def ims_hostname_decorator(field): @@ -109,11 +112,11 @@ def after_request(response): return response -def _redis_client_proc(key_queue, value_queue, config_params): +def _redis_client_proc(key_queue, value_queue, config_params, doc_type): """ create a local redis connection with the current db index, lookup the values of the keys that come from key_queue - and put them o=n value_queue + and put them on value_queue i/o contract: None arriving on key_queue means no more keys are coming @@ -122,8 +125,18 @@ def _redis_client_proc(key_queue, value_queue, config_params): :param key_queue: :param value_queue: :param config_params: app config - :return: yields dicts like {'key': str, 'value': dict} + :param doc_type: decoding type to do (xml or json) + :return: nothing """ + assert doc_type in (_DECODE_TYPE_JSON, _DECODE_TYPE_XML) + + def _decode(bv): + value = bv.decode('utf-8') + if doc_type == _DECODE_TYPE_JSON: + return json.loads(value) + elif doc_type == _DECODE_TYPE_XML: + return etree.XML(value) + try: r = tasks_common.get_current_redis(config_params) while True: @@ -133,10 +146,9 @@ def _redis_client_proc(key_queue, value_queue, config_params): if not key: break - value = r.get(key).decode('utf-8') value_queue.put({ 'key': key, - 'value': json.loads(value) + 'value': _decode(r.get(key)) }) except json.JSONDecodeError: @@ -147,9 +159,10 @@ def _redis_client_proc(key_queue, value_queue, config_params): value_queue.put(None) -def load_json_docs(config_params, key_pattern, num_threads=10): +def _load_redis_docs( + config_params, key_pattern, num_threads=10, doc_type=_DECODE_TYPE_JSON): """ - load all json docs from redis + load all docs from redis and decode as `doc_type` the loading is done with multiple connections in parallel, since this method is called from an api handler and when the client is far from @@ -158,8 +171,10 @@ def load_json_docs(config_params, key_pattern, num_threads=10): :param config_params: app config :param pattern: key pattern to load :param num_threads: number of client threads to create - :return: yields dicts like {'key': str, 'value': dict} + :param doc_type: decoding type to do (xml or json) + :return: yields dicts like {'key': str, 'value': dict or xml doc} """ + assert doc_type in (_DECODE_TYPE_XML, _DECODE_TYPE_JSON) response_queue = queue.Queue() threads = [] @@ -167,16 +182,15 @@ def load_json_docs(config_params, key_pattern, num_threads=10): q = queue.Queue() t = threading.Thread( target=_redis_client_proc, - args=[q, response_queue, config_params]) + args=[q, response_queue, config_params, doc_type]) t.start() threads.append({'thread': t, 'queue': q}) r = tasks_common.get_current_redis(config_params) # scan with bigger batches, to mitigate network latency effects for k in r.scan_iter(key_pattern, count=1000): - k = k.decode('utf-8') t = random.choice(threads) - t['queue'].put(k) + t['queue'].put(k.decode('utf-8')) # tell all threads there are no more keys coming for t in threads: @@ -196,3 +210,13 @@ def load_json_docs(config_params, key_pattern, num_threads=10): # cleanup like we're supposed to, even though it's python for t in threads: t['thread'].join(timeout=0.5) # timeout, for sanity + + +def load_json_docs(config_params, key_pattern, num_threads=10): + yield from _load_redis_docs( + config_params, key_pattern, num_threads, doc_type=_DECODE_TYPE_JSON) + + +def load_xml_docs(config_params, key_pattern, num_threads=10): + yield from _load_redis_docs( + config_params, key_pattern, num_threads, doc_type=_DECODE_TYPE_XML) diff --git a/inventory_provider/routes/poller.py b/inventory_provider/routes/poller.py index 2d9c122b..51bfa7d9 100644 --- a/inventory_provider/routes/poller.py +++ b/inventory_provider/routes/poller.py @@ -1,10 +1,12 @@ import json +import logging +import re from flask import Blueprint, Response, jsonify, current_app -from lxml import etree from inventory_provider import juniper from inventory_provider.routes import common +logger = logging.getLogger(__name__) routes = Blueprint('poller-support-routes', __name__) @@ -13,75 +15,150 @@ def after_request(resp): return common.after_request(resp) -@routes.route('/interfaces/<hostname>', methods=['GET', 'POST']) -@common.require_accepts_json -def poller_interface_oids(hostname): - r = common.get_current_redis() +def _load_snmp_indexes(hostname=None): + result = dict() + key_pattern = f'snmp-interfaces:{hostname}' \ + if hostname else 'snmp-interfaces:*' + + for doc in common.load_json_docs( + config_params=current_app.config['INVENTORY_PROVIDER_CONFIG'], + key_pattern=key_pattern): + + router = doc['key'][len('snmp-interfaces:'):] + interfaces = dict( + [(e['name'], e['index']) for e in doc['value']]) + result[router] = interfaces + + return result + + +def _load_interface_bundles(hostname=None): + result = dict() + key_pattern = f'netconf-interface-bundles:{hostname}:*' \ + if hostname else 'netconf-interface-bundles:*' + + for doc in common.load_json_docs( + config_params=current_app.config['INVENTORY_PROVIDER_CONFIG'], + key_pattern=key_pattern, + num_threads=20): + + m = re.match(r'^netconf-interface-bundles:([^:]+):(.+)', doc['key']) + assert m + + router = m.group(1) + interface = m.group(2) + result.setdefault(router, dict()) + result[router][interface] = doc['value'] + + return result + - netconf_string = r.get('netconf:' + hostname) - if not netconf_string: - return Response( - response='no netconf available info for %r' % hostname, - status=404, - mimetype='text/html') - - snmp_data_string = r.get('snmp-interfaces:' + hostname) - if not snmp_data_string: - return Response( - response='no snmp available info for %r' % hostname, - status=404, - mimetype='text/html') - - snmp_indexes = {} - for ifc in json.loads(snmp_data_string.decode('utf-8')): - snmp_indexes[ifc['name']] = ifc['index'] - - interfaces = list(juniper.list_interfaces( - etree.XML(netconf_string.decode('utf-8')))) - - if not interfaces: - return Response( - response='no interfaces found for %r' % hostname, - status=404, - mimetype='text/html') - - result = [] - for ifc in interfaces: - if not ifc['description']: +def _load_services(hostname=None): + result = dict() + key_pattern = f'opsdb:interface_services:{hostname}:*' \ + if hostname else 'opsdb:interface_services:*' + + def _service_params(full_service_info): + return { + 'id': full_service_info['id'], + 'name': full_service_info['name'], + 'type': full_service_info['service_type'], + 'status': full_service_info['status'] + } + + for doc in common.load_json_docs( + config_params=current_app.config['INVENTORY_PROVIDER_CONFIG'], + key_pattern=key_pattern, + num_threads=20): + + m = re.match(r'^opsdb:interface_services:([^:]+):(.+)', doc['key']) + if not m: + logger.warning(f'can''t parse redis service key {doc["key"]}') + # there are some weird records (dtn*, dp1*) continue - snmp_index = snmp_indexes.get(ifc['name'], None) - if not snmp_index: + router = m.group(1) + interface = m.group(2) + result.setdefault(router, dict()) + result[router][interface] = [_service_params(s) for s in doc['value']] + + return result + +def _load_interfaces(hostname): + key_pattern = f'netconf:{hostname}' if hostname else 'netconf:*' + for doc in common.load_xml_docs( + config_params=current_app.config['INVENTORY_PROVIDER_CONFIG'], + key_pattern=key_pattern, + num_threads=10): + + router = doc['key'][len('netconf:'):] + + for ifc in juniper.list_interfaces(doc['value']): + if not ifc['description']: + continue + yield { + 'router': router, + 'name': ifc['name'], + 'bundle': ifc['bundle'], + 'bundle-parents': [], + 'snmp-index': -1, + 'description': ifc['description'], + 'circuits': [] + } + + +def _load_poller_interfaces(hostname=None): + + snmp_indexes = _load_snmp_indexes(hostname) + bundles = _load_interface_bundles(hostname) + services= _load_services(hostname) + + for ifc in _load_interfaces(hostname): + + router_snmp = snmp_indexes.get(ifc['router'], None) + if not router_snmp or ifc['name'] not in router_snmp: + # there's no way to poll this interface continue + ifc['snmp-index'] = router_snmp[ifc['name']] - bundle_parents = r.get('netconf-interface-bundles:%s:%s' % ( - hostname, ifc['name'].split('.')[0])) - - ifc_data = { - 'name': ifc['name'], - 'bundle': ifc['bundle'], - 'bundle-parents': - json.loads(bundle_parents) if bundle_parents else [], - 'snmp-index': snmp_index, - 'description': ifc['description'], - 'circuits': [] - } + router_bundle = bundles.get(ifc['router'], None) + if router_bundle: + base_ifc = ifc['name'].split('.')[0] + ifc['bundle-parents'] = router_bundle.get(base_ifc, []) + + router_services = services.get(ifc['router'], None) + if router_services: + ifc['circuits'] = router_services.get(ifc['name'], []) + + yield ifc + + +@routes.route("/interfaces", methods=['GET', 'POST']) +@routes.route('/interfaces/<hostname>', methods=['GET', 'POST']) +@common.require_accepts_json +def poller_interface_oids(hostname=None): + + cache_key = f'classifier-cache:poller-interfaces:{hostname}' \ + if hostname else 'classifier-cache:poller-interfaces:all' + + r = common.get_current_redis() + + result = r.get(cache_key) + if result: + result = result.decode('utf-8') + else: + result = list(_load_poller_interfaces(hostname)) + if not result: + return Response( + response='no interfaces found', + status=404, + mimetype='text/html') + + result = json.dumps(result) + # cache this data for the next call + r.set(cache_key, result.encode('utf-8')) - circuits = r.get( - 'opsdb:interface_services:%s:%s' % (hostname, ifc['name'])) - if circuits: - ifc_data['circuits'] = [ - { - 'id': c['id'], - 'name': c['name'], - 'type': c['service_type'], - 'status': c['status'] - } for c in json.loads(circuits.decode('utf-8')) - ] - - result.append(ifc_data) - - return jsonify(result) + return Response(result, mimetype="application/json") @routes.route('/services/<category>', methods=['GET', 'POST']) diff --git a/test/test_general_poller_routes.py b/test/test_general_poller_routes.py index 6cf02ae2..166696f0 100644 --- a/test/test_general_poller_routes.py +++ b/test/test_general_poller_routes.py @@ -53,3 +53,10 @@ def test_service_category_not_found(client, mocked_worker_module, category): f'/poller/services/{category}', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 404 + + +def test_get_all_interfaces(client): + rv = client.get( + f'/poller/interfaces', + headers=DEFAULT_REQUEST_HEADERS) + assert rv.status_code == 200 -- GitLab