diff --git a/inventory_provider/routes/common.py b/inventory_provider/routes/common.py index e1821391fdd386e9e3314777716d44130ea30beb..275b976530539c528eff4439ad551300a1278b6e 100644 --- a/inventory_provider/routes/common.py +++ b/inventory_provider/routes/common.py @@ -6,11 +6,14 @@ import queue import random import threading +from lxml import etree import requests from flask import request, Response, current_app, g from inventory_provider.tasks import common as tasks_common logger = logging.getLogger(__name__) +_DECODE_TYPE_XML = 'xml' +_DECODE_TYPE_JSON = 'json' def ims_hostname_decorator(field): @@ -109,11 +112,11 @@ def after_request(response): return response -def _redis_client_proc(key_queue, value_queue, config_params): +def _redis_client_proc(key_queue, value_queue, config_params, doc_type): """ create a local redis connection with the current db index, lookup the values of the keys that come from key_queue - and put them o=n value_queue + and put them on value_queue i/o contract: None arriving on key_queue means no more keys are coming @@ -122,8 +125,18 @@ def _redis_client_proc(key_queue, value_queue, config_params): :param key_queue: :param value_queue: :param config_params: app config - :return: yields dicts like {'key': str, 'value': dict} + :param doc_type: decoding type to do (xml or json) + :return: nothing """ + assert doc_type in (_DECODE_TYPE_JSON, _DECODE_TYPE_XML) + + def _decode(bv): + value = bv.decode('utf-8') + if doc_type == _DECODE_TYPE_JSON: + return json.loads(value) + elif doc_type == _DECODE_TYPE_XML: + return etree.XML(value) + try: r = tasks_common.get_current_redis(config_params) while True: @@ -133,10 +146,9 @@ def _redis_client_proc(key_queue, value_queue, config_params): if not key: break - value = r.get(key).decode('utf-8') value_queue.put({ 'key': key, - 'value': json.loads(value) + 'value': _decode(r.get(key)) }) except json.JSONDecodeError: @@ -147,9 +159,13 @@ def _redis_client_proc(key_queue, value_queue, config_params): value_queue.put(None) -def load_json_docs(config_params, key_pattern, num_threads=10): +def _load_redis_docs( + config_params, + key_pattern, + num_threads=10, + doc_type=_DECODE_TYPE_JSON): """ - load all json docs from redis + load all docs from redis and decode as `doc_type` the loading is done with multiple connections in parallel, since this method is called from an api handler and when the client is far from @@ -158,8 +174,10 @@ def load_json_docs(config_params, key_pattern, num_threads=10): :param config_params: app config :param pattern: key pattern to load :param num_threads: number of client threads to create - :return: yields dicts like {'key': str, 'value': dict} + :param doc_type: decoding type to do (xml or json) + :return: yields dicts like {'key': str, 'value': dict or xml doc} """ + assert doc_type in (_DECODE_TYPE_XML, _DECODE_TYPE_JSON) response_queue = queue.Queue() threads = [] @@ -167,16 +185,15 @@ def load_json_docs(config_params, key_pattern, num_threads=10): q = queue.Queue() t = threading.Thread( target=_redis_client_proc, - args=[q, response_queue, config_params]) + args=[q, response_queue, config_params, doc_type]) t.start() threads.append({'thread': t, 'queue': q}) r = tasks_common.get_current_redis(config_params) # scan with bigger batches, to mitigate network latency effects for k in r.scan_iter(key_pattern, count=1000): - k = k.decode('utf-8') t = random.choice(threads) - t['queue'].put(k) + t['queue'].put(k.decode('utf-8')) # tell all threads there are no more keys coming for t in threads: @@ -196,3 +213,13 @@ def load_json_docs(config_params, key_pattern, num_threads=10): # cleanup like we're supposed to, even though it's python for t in threads: t['thread'].join(timeout=0.5) # timeout, for sanity + + +def load_json_docs(config_params, key_pattern, num_threads=10): + yield from _load_redis_docs( + config_params, key_pattern, num_threads, doc_type=_DECODE_TYPE_JSON) + + +def load_xml_docs(config_params, key_pattern, num_threads=10): + yield from _load_redis_docs( + config_params, key_pattern, num_threads, doc_type=_DECODE_TYPE_XML) diff --git a/inventory_provider/routes/poller.py b/inventory_provider/routes/poller.py index 2d9c122b06ce99a316fb9ed108b4f9e6e5b87636..b6f1f66fc407b006d0f47ce3d33dfe8602ce74ad 100644 --- a/inventory_provider/routes/poller.py +++ b/inventory_provider/routes/poller.py @@ -1,10 +1,12 @@ import json +import logging +import re -from flask import Blueprint, Response, jsonify, current_app -from lxml import etree +from flask import Blueprint, Response, current_app from inventory_provider import juniper from inventory_provider.routes import common +logger = logging.getLogger(__name__) routes = Blueprint('poller-support-routes', __name__) @@ -13,75 +15,151 @@ def after_request(resp): return common.after_request(resp) -@routes.route('/interfaces/<hostname>', methods=['GET', 'POST']) -@common.require_accepts_json -def poller_interface_oids(hostname): - r = common.get_current_redis() +def _load_snmp_indexes(hostname=None): + result = dict() + key_pattern = f'snmp-interfaces:{hostname}*' \ + if hostname else 'snmp-interfaces:*' + + for doc in common.load_json_docs( + config_params=current_app.config['INVENTORY_PROVIDER_CONFIG'], + key_pattern=key_pattern): + + router = doc['key'][len('snmp-interfaces:'):] + interfaces = dict( + [(e['name'], e['index']) for e in doc['value']]) + result[router] = interfaces + + return result + + +def _load_interface_bundles(hostname=None): + result = dict() + key_pattern = f'netconf-interface-bundles:{hostname}:*' \ + if hostname else 'netconf-interface-bundles:*' + + for doc in common.load_json_docs( + config_params=current_app.config['INVENTORY_PROVIDER_CONFIG'], + key_pattern=key_pattern, + num_threads=20): + + m = re.match(r'^netconf-interface-bundles:([^:]+):(.+)', doc['key']) + assert m + + router = m.group(1) + interface = m.group(2) + result.setdefault(router, dict()) + result[router][interface] = doc['value'] + + return result + - netconf_string = r.get('netconf:' + hostname) - if not netconf_string: - return Response( - response='no netconf available info for %r' % hostname, - status=404, - mimetype='text/html') - - snmp_data_string = r.get('snmp-interfaces:' + hostname) - if not snmp_data_string: - return Response( - response='no snmp available info for %r' % hostname, - status=404, - mimetype='text/html') - - snmp_indexes = {} - for ifc in json.loads(snmp_data_string.decode('utf-8')): - snmp_indexes[ifc['name']] = ifc['index'] - - interfaces = list(juniper.list_interfaces( - etree.XML(netconf_string.decode('utf-8')))) - - if not interfaces: - return Response( - response='no interfaces found for %r' % hostname, - status=404, - mimetype='text/html') - - result = [] - for ifc in interfaces: - if not ifc['description']: +def _load_services(hostname=None): + result = dict() + key_pattern = f'opsdb:interface_services:{hostname}:*' \ + if hostname else 'opsdb:interface_services:*' + + def _service_params(full_service_info): + return { + 'id': full_service_info['id'], + 'name': full_service_info['name'], + 'type': full_service_info['service_type'], + 'status': full_service_info['status'] + } + + for doc in common.load_json_docs( + config_params=current_app.config['INVENTORY_PROVIDER_CONFIG'], + key_pattern=key_pattern, + num_threads=20): + + m = re.match(r'^opsdb:interface_services:([^:]+):(.+)', doc['key']) + if not m: + logger.warning(f'can\'t parse redis service key {doc["key"]}') + # there are some weird records (dtn*, dp1*) continue - snmp_index = snmp_indexes.get(ifc['name'], None) - if not snmp_index: + router = m.group(1) + interface = m.group(2) + result.setdefault(router, dict()) + result[router][interface] = [_service_params(s) for s in doc['value']] + + return result + + +def _load_interfaces(hostname): + key_pattern = f'netconf:{hostname}*' if hostname else 'netconf:*' + for doc in common.load_xml_docs( + config_params=current_app.config['INVENTORY_PROVIDER_CONFIG'], + key_pattern=key_pattern, + num_threads=10): + + router = doc['key'][len('netconf:'):] + + for ifc in juniper.list_interfaces(doc['value']): + if not ifc['description']: + continue + yield { + 'router': router, + 'name': ifc['name'], + 'bundle': ifc['bundle'], + 'bundle-parents': [], + 'snmp-index': -1, + 'description': ifc['description'], + 'circuits': [] + } + + +def _load_poller_interfaces(hostname=None): + + snmp_indexes = _load_snmp_indexes(hostname) + bundles = _load_interface_bundles(hostname) + services = _load_services(hostname) + + for ifc in _load_interfaces(hostname): + + router_snmp = snmp_indexes.get(ifc['router'], None) + if not router_snmp or ifc['name'] not in router_snmp: + # there's no way to poll this interface continue + ifc['snmp-index'] = router_snmp[ifc['name']] - bundle_parents = r.get('netconf-interface-bundles:%s:%s' % ( - hostname, ifc['name'].split('.')[0])) - - ifc_data = { - 'name': ifc['name'], - 'bundle': ifc['bundle'], - 'bundle-parents': - json.loads(bundle_parents) if bundle_parents else [], - 'snmp-index': snmp_index, - 'description': ifc['description'], - 'circuits': [] - } + router_bundle = bundles.get(ifc['router'], None) + if router_bundle: + base_ifc = ifc['name'].split('.')[0] + ifc['bundle-parents'] = router_bundle.get(base_ifc, []) + + router_services = services.get(ifc['router'], None) + if router_services: + ifc['circuits'] = router_services.get(ifc['name'], []) - circuits = r.get( - 'opsdb:interface_services:%s:%s' % (hostname, ifc['name'])) - if circuits: - ifc_data['circuits'] = [ - { - 'id': c['id'], - 'name': c['name'], - 'type': c['service_type'], - 'status': c['status'] - } for c in json.loads(circuits.decode('utf-8')) - ] - - result.append(ifc_data) - - return jsonify(result) + yield ifc + + +@routes.route("/interfaces", methods=['GET', 'POST']) +@routes.route('/interfaces/<hostname>', methods=['GET', 'POST']) +@common.require_accepts_json +def poller_interface_oids(hostname=None): + + cache_key = f'classifier-cache:poller-interfaces:{hostname}' \ + if hostname else 'classifier-cache:poller-interfaces:all' + + r = common.get_current_redis() + + result = r.get(cache_key) + if result: + result = result.decode('utf-8') + else: + result = list(_load_poller_interfaces(hostname)) + if not result: + return Response( + response='no interfaces found', + status=404, + mimetype='text/html') + + result = json.dumps(result) + # cache this data for the next call + r.set(cache_key, result.encode('utf-8')) + + return Response(result, mimetype="application/json") @routes.route('/services/<category>', methods=['GET', 'POST']) diff --git a/test/per_router/test_poller_routes.py b/test/per_router/test_poller_routes.py index fe0a91cb585737d04762f886c1d17cd2066b9218..6879805e435949127ca305c9c16b5d777007e21b 100644 --- a/test/per_router/test_poller_routes.py +++ b/test/per_router/test_poller_routes.py @@ -6,61 +6,63 @@ DEFAULT_REQUEST_HEADERS = { "Accept": ["application/json"] } +INTERFACE_LIST_SCHEMA = { + '$schema': 'http://json-schema.org/draft-07/schema#', -def test_router_interfaces(router, client): - interfaces_list_schema = { - "$schema": "http://json-schema.org/draft-07/schema#", - - "definitions": { - "circuit": { - "type": "object", - "properties": { - "name": {"type": "string"}, - "status": {"type": "string"}, - "type": {"type": "string"}, - "id": {"type": "integer"} - }, - "required": ["name", "status", "type", "id"], - "additionalProperties": False - } + 'definitions': { + 'service': { + 'type': 'object', + 'properties': { + 'id': {'type': 'integer'}, + 'name': {'type': 'string'}, + 'type': {'type': 'string'}, + 'status': {'type': 'string'}, + }, + 'required': ['id', 'name', 'type', 'status'], + 'additionalProperties': False }, - - "type": "array", - "items": { - "type": "object", - "properties": { - "circuits": { - "type": "array", - "items": {"$ref": "#/definitions/circuit"} + 'interface': { + 'type': 'object', + 'properties': { + 'router': {'type': 'string'}, + 'name': {'type': 'string'}, + 'description': {'type': 'string'}, + 'snmp-index': { + 'type': 'integer', + 'minimum': 1 }, - "bundle": { - "type": "array", - "items": {"type": "string"} + 'bundle': { + 'type': 'array', + 'items': {'type': 'string'} }, - "bundle-parents": { - "type": "array", - "items": {"type": "string"} + 'bundle-parents': { + 'type': 'array', + 'items': {'type': 'string'} }, - "description": {"type": "string"}, - "name": {"type": "string"}, - "snmp-index": {"type": "integer"} + 'circuits': { + 'type': 'array', + 'items': {'$ref': '#/definitions/service'} + } }, - "required": [ - "circuits", - "bundle", - "bundle-parents", - "description", - "name", - "snmp-index"], - "additionalProperties": False - } - } + 'required': [ + 'router', 'name', 'description', + 'snmp-index', 'bundle', 'bundle-parents', + 'circuits'], + 'additionalProperties': False + }, + }, + + 'type': 'array', + 'items': {'$ref': '#/definitions/interface'} +} + +def test_router_interfaces(router, client): rv = client.post( - "/poller/interfaces/" + router, + f'/poller/interfaces/{router}', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 response = json.loads(rv.data.decode("utf-8")) - jsonschema.validate(response, interfaces_list_schema) + jsonschema.validate(response, INTERFACE_LIST_SCHEMA) assert response # at least shouldn't be empty diff --git a/test/test_general_poller_routes.py b/test/test_general_poller_routes.py index 6cf02ae21263d0ec6dead40061786215aca0e5f7..ada06035b3780e3ffa5a15382f6491c88a1ca939 100644 --- a/test/test_general_poller_routes.py +++ b/test/test_general_poller_routes.py @@ -11,7 +11,7 @@ DEFAULT_REQUEST_HEADERS = { } -INTERFACE_LIST_SCHEMA = { +SCHEMA_INTERFACE_LIST_SCHEMA = { '$schema': 'http://json-schema.org/draft-07/schema#', 'definitions': { @@ -33,6 +33,57 @@ INTERFACE_LIST_SCHEMA = { } +INTERFACE_LIST_SCHEMA = { + '$schema': 'http://json-schema.org/draft-07/schema#', + + 'definitions': { + 'service': { + 'type': 'object', + 'properties': { + 'id': {'type': 'integer'}, + 'name': {'type': 'string'}, + 'type': {'type': 'string'}, + 'status': {'type': 'string'}, + }, + 'required': ['id', 'name', 'type', 'status'], + 'additionalProperties': False + }, + 'interface': { + 'type': 'object', + 'properties': { + 'router': {'type': 'string'}, + 'name': {'type': 'string'}, + 'description': {'type': 'string'}, + 'snmp-index': { + 'type': 'integer', + 'minimum': 1 + }, + 'bundle': { + 'type': 'array', + 'items': {'type': 'string'} + }, + 'bundle-parents': { + 'type': 'array', + 'items': {'type': 'string'} + }, + 'circuits': { + 'type': 'array', + 'items': {'$ref': '#/definitions/service'} + } + }, + 'required': [ + 'router', 'name', 'description', + 'snmp-index', 'bundle', 'bundle-parents', + 'circuits'], + 'additionalProperties': False + }, + }, + + 'type': 'array', + 'items': {'$ref': '#/definitions/interface'} +} + + @pytest.mark.parametrize('category', ['mdvpn', 'lhcone', 'MDVpn', 'LHCONE']) def test_service_category(client, mocked_worker_module, category): worker._build_service_category_interface_list() @@ -42,7 +93,7 @@ def test_service_category(client, mocked_worker_module, category): assert rv.status_code == 200 assert rv.is_json response_data = json.loads(rv.data.decode('utf-8')) - jsonschema.validate(response_data, INTERFACE_LIST_SCHEMA) + jsonschema.validate(response_data, SCHEMA_INTERFACE_LIST_SCHEMA) assert response_data, 'expected a non-empty list' @@ -53,3 +104,14 @@ def test_service_category_not_found(client, mocked_worker_module, category): f'/poller/services/{category}', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 404 + + +def test_get_all_interfaces(client): + rv = client.get( + '/poller/interfaces', + headers=DEFAULT_REQUEST_HEADERS) + assert rv.status_code == 200 + assert rv.is_json + response_data = json.loads(rv.data.decode('utf-8')) + jsonschema.validate(response_data, INTERFACE_LIST_SCHEMA) + assert response_data, 'expected a non-empty list'