diff --git a/changelog b/changelog index 4803204a0f6660d11e47fa46229b079c31321178..b2cf190a7981c61bc773c758e5805147e26422bd 100644 --- a/changelog +++ b/changelog @@ -51,4 +51,5 @@ 0.32: Ensured all Related Services are returned for juniper links 0.33: Added Related Services for Infinera info 0.34: POL1-135: initial support for service category api - DBOARD3-203: omite 'inactive' interfaces \ No newline at end of file + DBOARD3-203: omit 'inactive' interfaces +0.35: POL1-135: added customer(user) info to service category api response \ No newline at end of file diff --git a/inventory_provider/db/opsdb.py b/inventory_provider/db/opsdb.py index 5e6466ba768aeb9bb8b40e3edf1182ca502d2ecd..9797770e88fb30211a49f9fd7652b42f3d0d30d9 100644 --- a/inventory_provider/db/opsdb.py +++ b/inventory_provider/db/opsdb.py @@ -436,3 +436,26 @@ AND circuit_type = 'Path' crs.execute(parent_query, [circuit['absid']]) r = _convert_to_dict(crs) return _fields2rsp(r[0]) if r else None + + +def get_service_users(connection, service_ids): + + def _sublists(l, n): + for x in range(0, len(l), n): + yield l[x:x + n] + + # not sure how to use a tuple in a prepared statement, + # so this is just doing a dumb string replacement ... + query = ( + 'select c.absid, o.name' + ' from organisation o' + ' join circuit_orgs co on co.org_absid=o.absid' + ' join circuit c on c.absid=co.circ_absid' + ' where co.org_type=\'Circuit User\'' + ' and c.absid in (%s)') + + with db.cursor(connection) as crs: + for chunk in _sublists(service_ids, 20): + crs.execute(query % ','.join([str(x) for x in chunk])) + for r in crs.fetchall(): + yield {'service_id': r[0], 'user': r[1]} diff --git a/inventory_provider/routes/poller.py b/inventory_provider/routes/poller.py index ad87d6866e6f017371e2efcbca720cb52543af5b..52fcd5dc5065ae919629fe6d732542c5dd8624ce 100644 --- a/inventory_provider/routes/poller.py +++ b/inventory_provider/routes/poller.py @@ -88,12 +88,23 @@ def poller_interface_oids(hostname): @common.require_accepts_json def service_category_interfaces(category): - result = [] - - r = common.get_current_redis() - for k in r.scan_iter(f'interface-services:{category.lower()}:*'): - ifc = r.get(k.decode('utf-8')) - result.append(json.loads(ifc.decode('utf-8'))) + def _interfaces(): + r = common.get_current_redis() + for k in r.scan_iter(f'interface-services:{category.lower()}:*'): + cached_ifc = r.get(k.decode('utf-8')).decode('utf-8') + cached_ifc = json.loads(cached_ifc) + basic_ifc_info = dict() + for k in ['description', 'interface', 'router']: + basic_ifc_info[k] = cached_ifc[k] + if not cached_ifc['users']: + yield basic_ifc_info + else: + for user in cached_ifc['users']: + ifc = {'user': user} + ifc.update(basic_ifc_info) + yield ifc + + result = list(_interfaces()) if not result: return Response( diff --git a/inventory_provider/tasks/worker.py b/inventory_provider/tasks/worker.py index e39f3d12f6ec6c6eaa6e22eab371925199db1972..ef7f351e11e2d67f994dd016b4c769a374597365 100644 --- a/inventory_provider/tasks/worker.py +++ b/inventory_provider/tasks/worker.py @@ -560,7 +560,7 @@ def refresh_finalizer(self, pending_task_ids_json): _wait_for_tasks(task_ids, update_callback=_update) _build_subnet_db(update_callback=_update) - _build_interface_services(update_callback=_update) + _build_service_category_interface_list(update_callback=_update) except (jsonschema.ValidationError, json.JSONDecodeError, @@ -574,12 +574,14 @@ def refresh_finalizer(self, pending_task_ids_json): logger.debug('<<< refresh_finalizer') -def _build_interface_services(update_callback=lambda s: None): - logger.debug('>>> _build_interface_services') - - r = get_next_redis(InventoryTask.config) +def _build_service_interface_user_list(): def _interfaces(): + """ + yields interface info from netconf + :return: + """ + r = get_next_redis(InventoryTask.config) for k in r.scan_iter('netconf-interfaces:*'): k = k.decode('utf-8') (_, router_name, ifc_name) = k.split(':') @@ -594,6 +596,82 @@ def _build_interface_services(update_callback=lambda s: None): 'description': info['description'] } + def _lookup_interface_services(wanted_interfaces): + """ + yields interface info from opsdb (with service id) + ... only interfaces in wanted_interfaces + :param wanted_interfaces: + :return: + """ + r = get_next_redis(InventoryTask.config) + for k in r.scan_iter('opsdb:interface_services:*'): + k = k.decode('utf-8') + fields = k.split(':') + if len(fields) < 4: + # there are some strange records + # e.g. TS1.*, ts1.*, dp1.*, dtn*, ... + continue + router = fields[2] + ifc_name = fields[3] + + router_interface_key = f'{router}:{ifc_name}' + if router_interface_key not in wanted_interfaces: + continue + + info = r.get(k).decode('utf-8') + info = json.loads(info) + + yield { + 'router': router, + 'interface': ifc_name, + 'service_ids': set([service['id'] for service in info]) + } + + # dict: 'router:interface' -> {'router', 'interface', 'description'} + netconf_interface_map = dict([ + (f'{i["router"]}:{i["interface"]}', i) for i in _interfaces()]) + + # dict: 'router:interface' -> {'router', 'interface', set([service_ids])} + opsdb_interface_map = dict([ + (f'{i["router"]}:{i["interface"]}', i) + for i in _lookup_interface_services(netconf_interface_map.keys())]) + + all_service_ids = set() + for r in opsdb_interface_map.values(): + all_service_ids |= r['service_ids'] + all_service_ids = list(all_service_ids) + + # dict: service_id[int] -> [list of users] + service_user_map = dict() + with db.connection(InventoryTask.config["ops-db"]) as cx: + # for user in opsdb.get_service_users(cx, list(all_service_ids)): + service_users = list(opsdb.get_service_users(cx, all_service_ids)) + for user in service_users: + service_user_map.setdefault( + user['service_id'], []).append(user['user']) + + def _users(ifc_key): + """ + ifc = 'router:ifc_name' + :param ifc: + :return: list of users + """ + users = set() + if ifc_key not in opsdb_interface_map: + return [] + service_id_list = opsdb_interface_map[ifc_key].get('service_ids', []) + for service_id in service_id_list: + users |= set(service_user_map.get(service_id, [])) + return list(users) + + for k, v in netconf_interface_map.items(): + v['users'] = _users(k) + yield v + + +def _build_service_category_interface_list(update_callback=lambda s: None): + logger.debug('>>> _build_interface_services') + def _classify(ifc): if ifc['description'].startswith('SRV_MDVPN'): return 'mdvpn' @@ -601,14 +679,14 @@ def _build_interface_services(update_callback=lambda s: None): return 'lhcone' return None - r = get_next_redis(InventoryTask.config) - rp = r.pipeline() - update_callback('loading all known interfaces') - interfaces = list(_interfaces()) + interfaces = list(_build_service_interface_user_list()) update_callback(f'loaded {len(interfaces)} interfaces, ' 'saving by service category') + r = get_next_redis(InventoryTask.config) + rp = r.pipeline() + for ifc in interfaces: service_type = _classify(ifc) if not service_type: diff --git a/setup.py b/setup.py index 9cac2d79e6f4dd0b3e418041668329fd12d84b1e..e4e9a295b34dd80b20a2f19e2f2856ee4d5a31b6 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ from setuptools import setup, find_packages setup( name='inventory-provider', - version="0.34", + version="0.35", author='GEANT', author_email='swd@geant.org', description='Dashboard inventory provider', diff --git a/test/conftest.py b/test/conftest.py index 392a01901fc4885ee9ff39597bc74fde652ab08b..856d4f46bb0d1073d7d839bbb2875db221c4bc16 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -1,4 +1,5 @@ import ast +import contextlib import json import netifaces import os @@ -184,6 +185,11 @@ def mocked_netifaces(mocker): mocker.patch('netifaces.ifaddresses', lambda n: data[n]) +@contextlib.contextmanager +def _mocked_db_connection(ignored): + yield None + + @pytest.fixture def mocked_worker_module( mocker, mocked_redis, data_config_filename, @@ -205,3 +211,13 @@ def mocked_worker_module( mocker.patch( 'inventory_provider.juniper.load_config', _mocked_load_juniper_netconf_config) + + def _mocked_get_service_users(cx, service_ids): + for id in service_ids: + yield {'service_id': id, 'user': 'AAAAA'} + yield {'service_id': id, 'user': 'BBBB'} + + mocker.patch('inventory_provider.db.db.connection', _mocked_db_connection) + mocker.patch( + 'inventory_provider.db.opsdb.get_service_users', + _mocked_get_service_users) diff --git a/test/test_general_poller_routes.py b/test/test_general_poller_routes.py index 0e88d1e660b919b4da39c4f07820b5c8dac8a3bd..6cf02ae21263d0ec6dead40061786215aca0e5f7 100644 --- a/test/test_general_poller_routes.py +++ b/test/test_general_poller_routes.py @@ -20,7 +20,8 @@ INTERFACE_LIST_SCHEMA = { 'properties': { 'description': {'type': 'string'}, 'router': {'type': 'string'}, - 'interface': {'type': 'string'} + 'interface': {'type': 'string'}, + 'user': {'type': 'string'} }, 'required': ['router', 'interface', 'description'], 'additionalProperties': False @@ -34,7 +35,7 @@ INTERFACE_LIST_SCHEMA = { @pytest.mark.parametrize('category', ['mdvpn', 'lhcone', 'MDVpn', 'LHCONE']) def test_service_category(client, mocked_worker_module, category): - worker._build_interface_services() + worker._build_service_category_interface_list() rv = client.get( f'/poller/services/{category}', headers=DEFAULT_REQUEST_HEADERS) @@ -47,7 +48,7 @@ def test_service_category(client, mocked_worker_module, category): @pytest.mark.parametrize('category', ['mdvpn ', ' mdvpn', 'mdvpn1', 'aaa']) def test_service_category_not_found(client, mocked_worker_module, category): - worker._build_interface_services() + worker._build_service_category_interface_list() rv = client.get( f'/poller/services/{category}', headers=DEFAULT_REQUEST_HEADERS) diff --git a/test/test_opsdb_queries.py b/test/test_opsdb_queries.py index 2fc43bddc23e5ab0b3d9fa6aa46b20b42359dd2d..a268ccb7d370ce0542381f0af8149d558d6d1ab6 100644 --- a/test/test_opsdb_queries.py +++ b/test/test_opsdb_queries.py @@ -1,7 +1,8 @@ +import json import os -import pytest import jsonschema +import pytest from inventory_provider.db import db from inventory_provider.db import opsdb @@ -164,3 +165,65 @@ def test_equipment_location(connection, equipment): def test_coriant_path(connection, equipment, card, port): circuit = opsdb.lookup_coriant_path(connection, equipment, card, port) jsonschema.validate(circuit, CORIANT_PATH_METADATA) + + +SERVICE_USER_LIST_SCHEMA = { + '$schema': 'http://json-schema.org/draft-07/schema#', + + 'definitions': { + 'service-user': { + 'type': 'object', + 'properties': { + 'service_id': {'type': 'integer'}, + 'user': {'type': 'string'} + }, + 'required': ['service_id', 'user'], + 'additionalProperties': False + } + }, + + 'type': 'array', + 'items': {'$ref': '#/definitions/service-user'} +} + + +def test_get_service_users(connection): + service_id_list = [47673, 47675] + users = opsdb.get_service_users(connection, service_id_list) + users = list(users) + jsonschema.validate(users, SERVICE_USER_LIST_SCHEMA) + assert users + + +def test_get_all_service_users(connection, cached_test_data): + + def _all_interfaces(): + for k in cached_test_data.keys(): + if not k.startswith('netconf-interfaces:'): + continue + (_, hostname, ifc_name) = k.split(':') + yield {'hostname': hostname, 'interface': ifc_name} + + def _all_service_ids(interfaces): + for ifc in interfaces: + key = ('opsdb:interface_services' + f':{ifc["hostname"]}:{ifc["interface"]}') + if key not in cached_test_data: + print(f'warning: {key} not found in cached test data') + continue + for service in json.loads(cached_test_data[key]): + info = {'service_id': service['id']} + info.update(ifc) + yield info + + ids = {s['service_id'] for s in _all_service_ids(_all_interfaces())} + assert len(ids) > 0 + + service_users = list(opsdb.get_service_users(connection, list(ids))) + jsonschema.validate(service_users, SERVICE_USER_LIST_SCHEMA) + assert service_users + + # for user in opsdb.get_service_users(connection, list(ids)): + # services.setdefault(user['service_id'], []).append(user['user']) + # + # print([f'{k}: {v}' for k, v in services.items() if len(v) > 1]) diff --git a/test/test_worker_utils.py b/test/test_worker_utils.py index f94d2dab2d488d68a0b9375bf7ca74670ec97e86..4637dedb7a4560cd415a97bac7053e7950d18192 100644 --- a/test/test_worker_utils.py +++ b/test/test_worker_utils.py @@ -28,25 +28,29 @@ def test_build_interface_services(mocked_worker_module): """ ifc_schema = { '$schema': 'http://json-schema.org/draft-07/schema#', - 'type': 'object', 'properties': { 'description': {'type': 'string'}, 'router': {'type': 'string'}, - 'interface': {'type': 'string'} + 'interface': {'type': 'string'}, + 'users': { + 'type': 'array', + 'items': {'type': 'string'} + } }, 'required': ['router', 'interface', 'description'], 'additionalProperties': False } db = backend_db() # also forces initialization - worker._build_interface_services() + worker._build_service_category_interface_list() seen_types = set() for k, v in db.items(): if not k.startswith('interface-services:'): continue + print(v) (_, type, router, ifc_name) = k.split(':') ifc_info = json.loads(v)