From baf5dadce7d63aeac720ba21469f4d75901388ca Mon Sep 17 00:00:00 2001 From: Erik Reid <erik.reid@geant.org> Date: Fri, 17 Apr 2020 14:57:40 +0200 Subject: [PATCH] refactored worker module, added more redis pipelining --- inventory_provider/tasks/data.py | 124 ++++++++++++++++++++ inventory_provider/tasks/worker.py | 180 ++++++++--------------------- test/test_junosspace_io.py | 4 +- 3 files changed, 172 insertions(+), 136 deletions(-) diff --git a/inventory_provider/tasks/data.py b/inventory_provider/tasks/data.py index e69de29b..278f2721 100644 --- a/inventory_provider/tasks/data.py +++ b/inventory_provider/tasks/data.py @@ -0,0 +1,124 @@ +import json +import logging +import re + +from inventory_provider.db import opsdb, db +from inventory_provider.tasks.common import get_next_redis + +logger = logging.getLogger(__name__) + + +def build_service_interface_user_list(config): + + def _interfaces(): + """ + yields interface info from netconf + :return: + """ + r = get_next_redis(config) + for k in r.scan_iter('netconf-interfaces:*'): + k = k.decode('utf-8') + (_, router_name, ifc_name) = k.split(':') + + info = r.get(k).decode('utf-8') + info = json.loads(info) + + assert ifc_name == info['name'] + yield { + 'router': router_name, + 'interface': info['name'], + 'description': info['description'] + } + + def _lookup_interface_services(wanted_interfaces): + """ + yields interface info from opsdb (with service id) + ... only interfaces in wanted_interfaces + :param wanted_interfaces: + :return: + """ + r = get_next_redis(config) + for k in r.scan_iter('opsdb:interface_services:*'): + k = k.decode('utf-8') + fields = k.split(':') + if len(fields) < 4: + # there are some strange records + # e.g. TS1.*, ts1.*, dp1.*, dtn*, ... + continue + router = fields[2] + ifc_name = fields[3] + + router_interface_key = f'{router}:{ifc_name}' + if router_interface_key not in wanted_interfaces: + continue + + info = r.get(k).decode('utf-8') + info = json.loads(info) + + yield { + 'router': router, + 'interface': ifc_name, + 'service_ids': set([service['id'] for service in info]) + } + + # dict: 'router:interface' -> {'router', 'interface', 'description'} + netconf_interface_map = dict([ + (f'{i["router"]}:{i["interface"]}', i) for i in _interfaces()]) + + # dict: 'router:interface' -> {'router', 'interface', set([service_ids])} + opsdb_interface_map = dict([ + (f'{i["router"]}:{i["interface"]}', i) + for i in _lookup_interface_services(netconf_interface_map.keys())]) + + all_service_ids = set() + for r in opsdb_interface_map.values(): + all_service_ids |= r['service_ids'] + all_service_ids = list(all_service_ids) + + # dict: service_id[int] -> [list of users] + service_user_map = dict() + with db.connection(config["ops-db"]) as cx: + # for user in opsdb.get_service_users(cx, list(all_service_ids)): + service_users = list(opsdb.get_service_users(cx, all_service_ids)) + for user in service_users: + service_user_map.setdefault( + user['service_id'], []).append(user['user']) + + def _users(ifc_key): + """ + ifc = 'router:ifc_name' + :param ifc: + :return: list of users + """ + users = set() + if ifc_key not in opsdb_interface_map: + return [] + service_id_list = opsdb_interface_map[ifc_key].get('service_ids', []) + for service_id in service_id_list: + users |= set(service_user_map.get(service_id, [])) + return list(users) + + for k, v in netconf_interface_map.items(): + v['users'] = _users(k) + yield v + + +def derive_router_hostnames(config): + r = get_next_redis(config) + junosspace_equipment = set() + for k in r.keys('junosspace:*'): + m = re.match('^junosspace:(.*)$', k.decode('utf-8')) + assert m + junosspace_equipment.add(m.group(1)) + + opsdb_equipment = set() + for k in r.scan_iter('opsdb:interface_services:*'): + m = re.match( + 'opsdb:interface_services:([^:]+):.*$', + k.decode('utf-8')) + if m: + opsdb_equipment.add(m.group(1)) + else: + logger.info("Unable to derive router name from %s" % + k.decode('utf-8')) + return junosspace_equipment & opsdb_equipment diff --git a/inventory_provider/tasks/worker.py b/inventory_provider/tasks/worker.py index 2afb2e47..75e19a51 100644 --- a/inventory_provider/tasks/worker.py +++ b/inventory_provider/tasks/worker.py @@ -14,6 +14,7 @@ import jsonschema from inventory_provider.tasks.app import app from inventory_provider.tasks.common \ import get_next_redis, latch_db, get_latch, set_latch, update_latch_status +from inventory_provider.tasks import data from inventory_provider import config from inventory_provider import environment from inventory_provider.db import db, opsdb @@ -105,8 +106,11 @@ def update_interfaces_to_services(self): interface_services[equipment_interface].append(service) r = get_next_redis(InventoryTask.config) + rp = r.pipeline() for key in r.scan_iter('opsdb:interface_services:*'): - r.delete(key) + rp.delete(key) + rp.execute() + rp = r.pipeline() for equipment_interface, services in interface_services.items(): rp.set( @@ -127,8 +131,8 @@ def import_unmanaged_interfaces(self): return { 'name': d['address'], 'interface address': d['network'], - 'interface name': d['interface'], - 'router': d['router'] + 'interface name': d['interface'].lower(), + 'router': d['router'].lower() } interfaces = [ @@ -167,8 +171,11 @@ def update_access_services(self): access_services[service['name']] = service r = get_next_redis(InventoryTask.config) + rp = r.pipeline() for key in r.scan_iter('opsdb:access_services:*'): - r.delete(key) + rp.delete(key) + rp.execute() + rp = r.pipeline() for name, service in access_services.items(): rp.set( @@ -184,12 +191,16 @@ def update_lg_routers(self): logger.debug('>>> update_lg_routers') r = get_next_redis(InventoryTask.config) + rp = r.pipeline() for k in r.scan_iter('opsdb:lg:*'): - r.delete(k) + rp.delete(k) + rp.execute() with db.connection(InventoryTask.config["ops-db"]) as cx: + rp = r.pipeline() for router in opsdb.lookup_lg_routers(cx): - r.set(f'opsdb:lg:{router["equipment name"]}', json.dumps(router)) + rp.set(f'opsdb:lg:{router["equipment name"]}', json.dumps(router)) + rp.execute() logger.debug('<<< update_lg_routers') @@ -199,15 +210,19 @@ def update_equipment_locations(self): logger.debug('>>> update_equipment_locations') r = get_next_redis(InventoryTask.config) + rp = r.pipeline() for k in r.scan_iter('opsdb:location:*'): - r.delete(k) + rp.delete(k) + rp.execute() with db.connection(InventoryTask.config["ops-db"]) as cx: - for h in _derive_router_hostnames(InventoryTask.config): + rp = r.pipeline() + for h in data.derive_router_hostnames(InventoryTask.config): # lookup_pop_info returns a list of locations # (there can sometimes be more than one match) locations = list(opsdb.lookup_pop_info(cx, h)) - r.set('opsdb:location:%s' % h, json.dumps(locations)) + rp.set('opsdb:location:%s' % h, json.dumps(locations)) + rp.execute() logger.debug('<<< update_equipment_locations') @@ -227,10 +242,12 @@ def update_circuit_hierarchy(self): child_to_parents[child_id].append(relation) r = get_next_redis(InventoryTask.config) + rp = r.pipeline() for key in r.scan_iter('opsdb:services:parents:*'): - r.delete(key) + rp.delete(key) for key in r.scan_iter('opsdb:services:children:*'): - r.delete(key) + rp.delete(key) + rp.execute() rp = r.pipeline() for cid, parents in parent_to_children.items(): @@ -247,8 +264,11 @@ def update_geant_lambdas(self): logger.debug('>>> update_geant_lambdas') r = get_next_redis(InventoryTask.config) + rp = r.pipeline() for key in r.scan_iter('opsdb:geant_lambdas:*'): - r.delete(key) + rp.delete(key) + rp.execute() + with db.connection(InventoryTask.config["ops-db"]) as cx: rp = r.pipeline() for ld in opsdb.get_geant_lambdas(cx): @@ -285,8 +305,11 @@ def update_junosspace_device_list(self): 'message': 'found %d routers, saving details' % len(routers) }) + rp = r.pipeline() for k in r.scan_iter('junosspace:*'): - r.delete(k) + rp.delete(k) + rp.execute() + rp = r.pipeline() for k, v in routers.items(): rp.set(k, v) @@ -342,8 +365,10 @@ def clear_cached_classifier_responses(hostname=None): return r.keys('classifier-cache:*') keys_to_delete = _hostname_keys() if hostname else _all_keys() + rp = r.pipeline() for k in keys_to_delete: - r.delete(k) + rp.delete(k) + rp.execute() def _refresh_peers(hostname, key_base, peers): @@ -398,10 +423,12 @@ def refresh_juniper_interface_list(hostname, netconf): 'removing cached netconf-interfaces for %r' % hostname) r = get_next_redis(InventoryTask.config) + rp = r.pipeline() for k in r.scan_iter('netconf-interfaces:%s:*' % hostname): - r.delete(k) + rp.delete(k) for k in r.keys('netconf-interface-bundles:%s:*' % hostname): - r.delete(k) + rp.delete(k) + rp.execute() all_bundles = defaultdict(list) @@ -503,27 +530,6 @@ def reload_router_config(self, hostname): } -def _derive_router_hostnames(config): - r = get_next_redis(config) - junosspace_equipment = set() - for k in r.keys('junosspace:*'): - m = re.match('^junosspace:(.*)$', k.decode('utf-8')) - assert m - junosspace_equipment.add(m.group(1)) - - opsdb_equipment = set() - for k in r.scan_iter('opsdb:interface_services:*'): - m = re.match( - 'opsdb:interface_services:([^:]+):.*$', - k.decode('utf-8')) - if m: - opsdb_equipment.add(m.group(1)) - else: - logger.info("Unable to derive router name from %s" % - k.decode('utf-8')) - return junosspace_equipment & opsdb_equipment - - def _erase_next_db(config): """ flush next db, but first save latch and then restore afterwards @@ -570,7 +576,7 @@ def launch_refresh_cache_all(config): update_access_services.apply_async(), import_unmanaged_interfaces.apply_async() ] - for hostname in _derive_router_hostnames(config): + for hostname in data.derive_router_hostnames(config): logger.debug('queueing router refresh jobs for %r' % hostname) subtasks.append(reload_router_config.apply_async(args=[hostname])) @@ -654,101 +660,6 @@ def refresh_finalizer(self, pending_task_ids_json): logger.debug('<<< refresh_finalizer') -def _build_service_interface_user_list(): - - def _interfaces(): - """ - yields interface info from netconf - :return: - """ - r = get_next_redis(InventoryTask.config) - for k in r.scan_iter('netconf-interfaces:*'): - k = k.decode('utf-8') - (_, router_name, ifc_name) = k.split(':') - - info = r.get(k).decode('utf-8') - info = json.loads(info) - - assert ifc_name == info['name'] - yield { - 'router': router_name, - 'interface': info['name'], - 'description': info['description'] - } - - def _lookup_interface_services(wanted_interfaces): - """ - yields interface info from opsdb (with service id) - ... only interfaces in wanted_interfaces - :param wanted_interfaces: - :return: - """ - r = get_next_redis(InventoryTask.config) - for k in r.scan_iter('opsdb:interface_services:*'): - k = k.decode('utf-8') - fields = k.split(':') - if len(fields) < 4: - # there are some strange records - # e.g. TS1.*, ts1.*, dp1.*, dtn*, ... - continue - router = fields[2] - ifc_name = fields[3] - - router_interface_key = f'{router}:{ifc_name}' - if router_interface_key not in wanted_interfaces: - continue - - info = r.get(k).decode('utf-8') - info = json.loads(info) - - yield { - 'router': router, - 'interface': ifc_name, - 'service_ids': set([service['id'] for service in info]) - } - - # dict: 'router:interface' -> {'router', 'interface', 'description'} - netconf_interface_map = dict([ - (f'{i["router"]}:{i["interface"]}', i) for i in _interfaces()]) - - # dict: 'router:interface' -> {'router', 'interface', set([service_ids])} - opsdb_interface_map = dict([ - (f'{i["router"]}:{i["interface"]}', i) - for i in _lookup_interface_services(netconf_interface_map.keys())]) - - all_service_ids = set() - for r in opsdb_interface_map.values(): - all_service_ids |= r['service_ids'] - all_service_ids = list(all_service_ids) - - # dict: service_id[int] -> [list of users] - service_user_map = dict() - with db.connection(InventoryTask.config["ops-db"]) as cx: - # for user in opsdb.get_service_users(cx, list(all_service_ids)): - service_users = list(opsdb.get_service_users(cx, all_service_ids)) - for user in service_users: - service_user_map.setdefault( - user['service_id'], []).append(user['user']) - - def _users(ifc_key): - """ - ifc = 'router:ifc_name' - :param ifc: - :return: list of users - """ - users = set() - if ifc_key not in opsdb_interface_map: - return [] - service_id_list = opsdb_interface_map[ifc_key].get('service_ids', []) - for service_id in service_id_list: - users |= set(service_user_map.get(service_id, [])) - return list(users) - - for k, v in netconf_interface_map.items(): - v['users'] = _users(k) - yield v - - def _build_service_category_interface_list(update_callback=lambda s: None): logger.debug('>>> _build_interface_services') @@ -760,7 +671,8 @@ def _build_service_category_interface_list(update_callback=lambda s: None): return None update_callback('loading all known interfaces') - interfaces = list(_build_service_interface_user_list()) + interfaces = data.build_service_interface_user_list(InventoryTask.config) + interfaces = list(interfaces) update_callback(f'loaded {len(interfaces)} interfaces, ' 'saving by service category') diff --git a/test/test_junosspace_io.py b/test/test_junosspace_io.py index 1346b1c8..bc0ea303 100644 --- a/test/test_junosspace_io.py +++ b/test/test_junosspace_io.py @@ -5,7 +5,7 @@ import responses import inventory_provider from inventory_provider import juniper -from inventory_provider.tasks import worker +from inventory_provider.tasks import data TEST_DATA_FILENAME = os.path.realpath(os.path.join( inventory_provider.__path__[0], @@ -50,7 +50,7 @@ def test_router_hostname_derivation(mocked_redis): }, 'redis-databases': [0, 11] } - hostnames = list(worker._derive_router_hostnames(config)) + hostnames = list(data.derive_router_hostnames(config)) assert hostnames # test data is non-empty for h in hostnames: assert re.match(r'^(mx[12]|qfx|srx[12])\..+\.geant\.net$', h) -- GitLab