diff --git a/inventory_provider/db/opsdb.py b/inventory_provider/db/opsdb.py index 5e6466ba768aeb9bb8b40e3edf1182ca502d2ecd..9797770e88fb30211a49f9fd7652b42f3d0d30d9 100644 --- a/inventory_provider/db/opsdb.py +++ b/inventory_provider/db/opsdb.py @@ -436,3 +436,26 @@ AND circuit_type = 'Path' crs.execute(parent_query, [circuit['absid']]) r = _convert_to_dict(crs) return _fields2rsp(r[0]) if r else None + + +def get_service_users(connection, service_ids): + + def _sublists(l, n): + for x in range(0, len(l), n): + yield l[x:x + n] + + # not sure how to use a tuple in a prepared statement, + # so this is just doing a dumb string replacement ... + query = ( + 'select c.absid, o.name' + ' from organisation o' + ' join circuit_orgs co on co.org_absid=o.absid' + ' join circuit c on c.absid=co.circ_absid' + ' where co.org_type=\'Circuit User\'' + ' and c.absid in (%s)') + + with db.cursor(connection) as crs: + for chunk in _sublists(service_ids, 20): + crs.execute(query % ','.join([str(x) for x in chunk])) + for r in crs.fetchall(): + yield {'service_id': r[0], 'user': r[1]} diff --git a/inventory_provider/tasks/worker.py b/inventory_provider/tasks/worker.py index e39f3d12f6ec6c6eaa6e22eab371925199db1972..b321d5957f4ea66e08a45c1e9f9777581f163699 100644 --- a/inventory_provider/tasks/worker.py +++ b/inventory_provider/tasks/worker.py @@ -560,7 +560,7 @@ def refresh_finalizer(self, pending_task_ids_json): _wait_for_tasks(task_ids, update_callback=_update) _build_subnet_db(update_callback=_update) - _build_interface_services(update_callback=_update) + _build_service_category_interface_list(update_callback=_update) except (jsonschema.ValidationError, json.JSONDecodeError, @@ -574,12 +574,14 @@ def refresh_finalizer(self, pending_task_ids_json): logger.debug('<<< refresh_finalizer') -def _build_interface_services(update_callback=lambda s: None): - logger.debug('>>> _build_interface_services') - - r = get_next_redis(InventoryTask.config) +def _build_service_interface_user_list(): def _interfaces(): + """ + yields interface info from netconf + :return: + """ + r = get_next_redis(InventoryTask.config) for k in r.scan_iter('netconf-interfaces:*'): k = k.decode('utf-8') (_, router_name, ifc_name) = k.split(':') @@ -594,6 +596,83 @@ def _build_interface_services(update_callback=lambda s: None): 'description': info['description'] } + def _lookup_interface_services(wanted_interfaces): + """ + yields interface info from opsdb (with service id) + ... only interfaces in wanted_interfaces + :param wanted_interfaces: + :return: + """ + r = get_next_redis(InventoryTask.config) + for k in r.scan_iter('opsdb:interface_services:*'): + k = k.decode('utf-8') + fields = k.split(':') + if len(fields) < 4: + # there are some strange records + # e.g. TS1.*, ts1.*, dp1.*, dtn*, ... + continue + router = fields[2] + ifc_name = fields[3] + + router_interface_key = f'{router}:{ifc_name}' + if router_interface_key not in wanted_interfaces: + continue + + info = r.get(k).decode('utf-8') + info = json.loads(info) + + for service in info: + yield { + 'router': router, + 'interface': ifc_name, + 'service_id': service['id'] + } + + # dict: 'router:interface' -> {'router', 'interface', 'description'} + netconf_interface_map = dict([ + (f'{i["router"]}:{i["interface"]}', i) for i in _interfaces()]) + + # dict: 'router:interface' -> [list of service_ids] + opsdb_interface_map = {} + for i in _lookup_interface_services(netconf_interface_map.keys()): + key = f'{i["router"]}:{i["interface"]}' + opsdb_interface_map.setdefault(key, []).append(i['service_id']) + + # dict service_id[int] -> [list of users] + service_user_map = {} + with db.connection(InventoryTask.config["ops-db"]) as cx: + service_ids = set() + for l in opsdb_interface_map.values(): + for id in l: + service_ids.add(id) + for user in opsdb.get_service_users(cx, list(service_ids)): + service_user_map.setdefault( + user['service_id'], []).append(user['user']) + + def _users(ifc_key): + """ + ifc = 'router:ifc_name' + :param ifc: + :return: list of users + """ + for service_id in opsdb_interface_map.get(ifc_key, []): + for user in service_user_map.get(service_id, []): + yield user + + for k, v in netconf_interface_map.items(): + users = _users(k) + if not users: + yield v + else: + for u in users: + info = {'user': u} + info.update(v) + yield info + + +def _build_service_category_interface_list(update_callback=lambda s: None): + logger.debug('>>> _build_interface_services') + def _classify(ifc): if ifc['description'].startswith('SRV_MDVPN'): return 'mdvpn' @@ -605,7 +684,7 @@ def _build_interface_services(update_callback=lambda s: None): rp = r.pipeline() update_callback('loading all known interfaces') - interfaces = list(_interfaces()) + interfaces = list(_build_service_interface_user_list()) update_callback(f'loaded {len(interfaces)} interfaces, ' 'saving by service category') diff --git a/test/conftest.py b/test/conftest.py index 392a01901fc4885ee9ff39597bc74fde652ab08b..856d4f46bb0d1073d7d839bbb2875db221c4bc16 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -1,4 +1,5 @@ import ast +import contextlib import json import netifaces import os @@ -184,6 +185,11 @@ def mocked_netifaces(mocker): mocker.patch('netifaces.ifaddresses', lambda n: data[n]) +@contextlib.contextmanager +def _mocked_db_connection(ignored): + yield None + + @pytest.fixture def mocked_worker_module( mocker, mocked_redis, data_config_filename, @@ -205,3 +211,13 @@ def mocked_worker_module( mocker.patch( 'inventory_provider.juniper.load_config', _mocked_load_juniper_netconf_config) + + def _mocked_get_service_users(cx, service_ids): + for id in service_ids: + yield {'service_id': id, 'user': 'AAAAA'} + yield {'service_id': id, 'user': 'BBBB'} + + mocker.patch('inventory_provider.db.db.connection', _mocked_db_connection) + mocker.patch( + 'inventory_provider.db.opsdb.get_service_users', + _mocked_get_service_users) diff --git a/test/test_general_poller_routes.py b/test/test_general_poller_routes.py index 0e88d1e660b919b4da39c4f07820b5c8dac8a3bd..6cf02ae21263d0ec6dead40061786215aca0e5f7 100644 --- a/test/test_general_poller_routes.py +++ b/test/test_general_poller_routes.py @@ -20,7 +20,8 @@ INTERFACE_LIST_SCHEMA = { 'properties': { 'description': {'type': 'string'}, 'router': {'type': 'string'}, - 'interface': {'type': 'string'} + 'interface': {'type': 'string'}, + 'user': {'type': 'string'} }, 'required': ['router', 'interface', 'description'], 'additionalProperties': False @@ -34,7 +35,7 @@ INTERFACE_LIST_SCHEMA = { @pytest.mark.parametrize('category', ['mdvpn', 'lhcone', 'MDVpn', 'LHCONE']) def test_service_category(client, mocked_worker_module, category): - worker._build_interface_services() + worker._build_service_category_interface_list() rv = client.get( f'/poller/services/{category}', headers=DEFAULT_REQUEST_HEADERS) @@ -47,7 +48,7 @@ def test_service_category(client, mocked_worker_module, category): @pytest.mark.parametrize('category', ['mdvpn ', ' mdvpn', 'mdvpn1', 'aaa']) def test_service_category_not_found(client, mocked_worker_module, category): - worker._build_interface_services() + worker._build_service_category_interface_list() rv = client.get( f'/poller/services/{category}', headers=DEFAULT_REQUEST_HEADERS) diff --git a/test/test_opsdb_queries.py b/test/test_opsdb_queries.py index 2fc43bddc23e5ab0b3d9fa6aa46b20b42359dd2d..a268ccb7d370ce0542381f0af8149d558d6d1ab6 100644 --- a/test/test_opsdb_queries.py +++ b/test/test_opsdb_queries.py @@ -1,7 +1,8 @@ +import json import os -import pytest import jsonschema +import pytest from inventory_provider.db import db from inventory_provider.db import opsdb @@ -164,3 +165,65 @@ def test_equipment_location(connection, equipment): def test_coriant_path(connection, equipment, card, port): circuit = opsdb.lookup_coriant_path(connection, equipment, card, port) jsonschema.validate(circuit, CORIANT_PATH_METADATA) + + +SERVICE_USER_LIST_SCHEMA = { + '$schema': 'http://json-schema.org/draft-07/schema#', + + 'definitions': { + 'service-user': { + 'type': 'object', + 'properties': { + 'service_id': {'type': 'integer'}, + 'user': {'type': 'string'} + }, + 'required': ['service_id', 'user'], + 'additionalProperties': False + } + }, + + 'type': 'array', + 'items': {'$ref': '#/definitions/service-user'} +} + + +def test_get_service_users(connection): + service_id_list = [47673, 47675] + users = opsdb.get_service_users(connection, service_id_list) + users = list(users) + jsonschema.validate(users, SERVICE_USER_LIST_SCHEMA) + assert users + + +def test_get_all_service_users(connection, cached_test_data): + + def _all_interfaces(): + for k in cached_test_data.keys(): + if not k.startswith('netconf-interfaces:'): + continue + (_, hostname, ifc_name) = k.split(':') + yield {'hostname': hostname, 'interface': ifc_name} + + def _all_service_ids(interfaces): + for ifc in interfaces: + key = ('opsdb:interface_services' + f':{ifc["hostname"]}:{ifc["interface"]}') + if key not in cached_test_data: + print(f'warning: {key} not found in cached test data') + continue + for service in json.loads(cached_test_data[key]): + info = {'service_id': service['id']} + info.update(ifc) + yield info + + ids = {s['service_id'] for s in _all_service_ids(_all_interfaces())} + assert len(ids) > 0 + + service_users = list(opsdb.get_service_users(connection, list(ids))) + jsonschema.validate(service_users, SERVICE_USER_LIST_SCHEMA) + assert service_users + + # for user in opsdb.get_service_users(connection, list(ids)): + # services.setdefault(user['service_id'], []).append(user['user']) + # + # print([f'{k}: {v}' for k, v in services.items() if len(v) > 1]) diff --git a/test/test_worker_utils.py b/test/test_worker_utils.py index f94d2dab2d488d68a0b9375bf7ca74670ec97e86..ad51b87e88cdbc1237995c3eafdd7a7c67f69fa7 100644 --- a/test/test_worker_utils.py +++ b/test/test_worker_utils.py @@ -28,19 +28,19 @@ def test_build_interface_services(mocked_worker_module): """ ifc_schema = { '$schema': 'http://json-schema.org/draft-07/schema#', - 'type': 'object', 'properties': { 'description': {'type': 'string'}, 'router': {'type': 'string'}, - 'interface': {'type': 'string'} + 'interface': {'type': 'string'}, + 'user': {'type': 'string'} }, 'required': ['router', 'interface', 'description'], 'additionalProperties': False } db = backend_db() # also forces initialization - worker._build_interface_services() + worker._build_service_category_interface_list() seen_types = set() for k, v in db.items():