diff --git a/changelog b/changelog index dd8c210b48676a8c2be9f422e4fa48634dda56cd..3cddca7822186195afbc96eb6e8ea757a0639d48 100644 --- a/changelog +++ b/changelog @@ -37,3 +37,4 @@ 0.20: included both v4 & v6 addresses in peering info 0.21: added parsing of 'logical-systems' (DBOARD3-150) 0.22: return a skeleton response for unknown interfaces (DBOARD3-169) +0.23: use redis pipelines where possible diff --git a/inventory_provider/tasks/worker.py b/inventory_provider/tasks/worker.py index 3d56cba4e9e5a7f49d65f89a3b84bcb0b72ef0bc..1ac375df5ea1464db6c31ba3aa424d88e285c5f8 100644 --- a/inventory_provider/tasks/worker.py +++ b/inventory_provider/tasks/worker.py @@ -105,10 +105,12 @@ def update_interfaces_to_services(): r = get_next_redis(InventoryTask.config) for key in r.scan_iter('opsdb:interface_services:*'): r.delete(key) + rp = r.pipeline() for equipment_interface, services in interface_services.items(): - r.set( + rp.set( 'opsdb:interface_services:' + equipment_interface, json.dumps(services)) + rp.execute() logger.debug('<<< update_interfaces_to_services') @@ -150,13 +152,15 @@ def update_circuit_hierarchy(): r = get_next_redis(InventoryTask.config) for key in r.scan_iter('opsdb:services:parents:*'): r.delete(key) - for cid, parents in parent_to_children.items(): - r.set('opsdb:services:parents:%d' % cid, json.dumps(parents)) - for key in r.scan_iter('opsdb:services:children:*'): r.delete(key) + + rp = r.pipeline() + for cid, parents in parent_to_children.items(): + rp.set('opsdb:services:parents:%d' % cid, json.dumps(parents)) for cid, children in child_to_parents.items(): - r.set('opsdb:services:children:%d' % cid, json.dumps(children)) + rp.set('opsdb:services:children:%d' % cid, json.dumps(children)) + rp.execute() logger.debug('<<< update_circuit_hierarchy') @@ -170,9 +174,12 @@ def update_geant_lambdas(): for key in r.scan_iter('opsdb:geant_lambdas:*'): r.delete(key) with db.connection(InventoryTask.config["ops-db"]) as cx: + rp = r.pipeline() for ld in opsdb.get_geant_lambdas(cx): - r.set('opsdb:geant_lambdas:%s' % ld['name'].lower(), - json.dumps(ld)) + rp.set( + 'opsdb:geant_lambdas:%s' % ld['name'].lower(), + json.dumps(ld)) + rp.execute() logger.debug('<<< geant_lambdas') @@ -203,10 +210,12 @@ def update_junosspace_device_list(self): 'message': 'found %d routers, saving details' % len(routers) }) - for k in r.keys('junosspace:*'): + for k in r.scan_iter('junosspace:*'): r.delete(k) + rp = r.pipeline() for k, v in routers.items(): - r.set(k, v) + rp.set(k, v) + rp.execute() logger.debug('<<< update_junosspace_device_list') @@ -268,7 +277,7 @@ def _refresh_peers(hostname, key_base, peers): logger.debug( 'removing cached %s for %r' % (key_base, hostname)) r = get_next_redis(InventoryTask.config) - for k in r.keys(key_base + ':*'): + for k in r.scan_iter(key_base + ':*'): # potential race condition: another proc could have # delete this element between the time we read the # keys and the next statement ... check for None below @@ -278,11 +287,13 @@ def _refresh_peers(hostname, key_base, peers): if value['router'] == hostname: r.delete(k) + rp = r.pipeline() for peer in peers: peer['router'] = hostname - r.set( + rp.set( '%s:%s' % (key_base, peer['name']), json.dumps(peer)) + rp.execute() def refresh_ix_public_peers(hostname, netconf): @@ -312,26 +323,27 @@ def refresh_juniper_interface_list(hostname, netconf): 'removing cached netconf-interfaces for %r' % hostname) r = get_next_redis(InventoryTask.config) - for k in r.keys('netconf-interfaces:%s:*' % hostname): + for k in r.scan_iter('netconf-interfaces:%s:*' % hostname): r.delete(k) - for k in r.keys('netconf-interface-bundles:%s:*' % hostname): r.delete(k) all_bundles = defaultdict(list) + + rp = r.pipeline() for ifc in juniper.list_interfaces(netconf): bundles = ifc.get('bundle', None) for bundle in bundles: if bundle: all_bundles[bundle].append(ifc['name']) - - r.set( + rp.set( 'netconf-interfaces:%s:%s' % (hostname, ifc['name']), json.dumps(ifc)) for k, v in all_bundles.items(): - r.set( + rp.set( 'netconf-interface-bundles:%s:%s' % (hostname, k), json.dumps(v)) + rp.execute() @app.task(base=InventoryTask, bind=True) @@ -427,7 +439,7 @@ def _derive_router_hostnames(config): junosspace_equipment.add(m.group(1)) opsdb_equipment = set() - for k in r.keys('opsdb:interface_services:*'): + for k in r.scan_iter('opsdb:interface_services:*'): m = re.match( 'opsdb:interface_services:([^:]+):.*$', k.decode('utf-8')) @@ -558,8 +570,10 @@ def _build_subnet_db(update_callback=lambda s: None): update_callback('saving {} subnets'.format(len(subnets))) + rp = r.pipeline() for k, v in subnets.items(): - r.set('subnets:' + k, json.dumps(v)) + rp.set('subnets:' + k, json.dumps(v)) + rp.execute() def check_task_status(task_id): diff --git a/setup.py b/setup.py index 2ef16d74521bd373a69600c23a177cda06855b3e..3216093a9577d8285409787da633dd3d675832b0 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ from setuptools import setup, find_packages setup( name='inventory-provider', - version="0.22", + version="0.23", author='GEANT', author_email='swd@geant.org', description='Dashboard inventory provider', diff --git a/test/conftest.py b/test/conftest.py index 6009cc6a90f39ff520818e28846e953ce444dc30..2c9869be1015d6ef6d63b70e2fe09fe69a5a40af 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -98,12 +98,12 @@ class MockedRedis(object): def scan_iter(self, glob=None): if not glob: - for k in MockedRedis.db.keys(): + for k in list(MockedRedis.db.keys()): yield k.encode('utf-8') m = re.match(r'^([^*]+)\*$', glob) assert m # all expected globs are like this - for k in MockedRedis.db.keys(): + for k in list(MockedRedis.db.keys()): if k.startswith(m.group(1)): yield k.encode('utf-8') @@ -114,6 +114,12 @@ class MockedRedis(object): # only called from testing routes (hopefully) pass + def execute(self): + pass + + def pipeline(self, *args, **kwargs): + return self + @pytest.fixture def cached_test_data(): diff --git a/test/test_classifier_routes.py b/test/test_classifier_routes.py index 013aa6ce6ab7f7d817a3d1afb40f71ecaef1a182..b713132bce148fea693a0c1728d046e38bdf1a53 100644 --- a/test/test_classifier_routes.py +++ b/test/test_classifier_routes.py @@ -140,6 +140,26 @@ def test_juniper_link_info(client): jsonschema.validate(response_data, JUNIPER_LINK_METADATA) +def test_juniper_link_info_not_found(client): + rv = client.get( + '/classifier/juniper-link-info/' + 'mx1.ams.nl.geant.net/unknown-interface-name', + headers=DEFAULT_REQUEST_HEADERS) + assert rv.status_code == 200 + assert rv.is_json + response_data = json.loads(rv.data.decode('utf-8')) + jsonschema.validate(response_data, JUNIPER_LINK_METADATA) + assert response_data == { + 'interface': { + 'name': 'unknown-interface-name', + 'description': '', + 'ipv4': [], + 'ipv6': [], + 'bundle': [] + } + } + + VPN_RR_PEER_INFO_KEYS = {'vpn-rr-peer-info'} IX_PUBLIC_PEER_INFO_KEYS = {'ix-public-peer-info', 'interfaces'}