From 2efc6584f6f6235365b12a6d533cf236c7acc49d Mon Sep 17 00:00:00 2001
From: Erik Reid <erik.reid@geant.org>
Date: Thu, 3 Oct 2019 14:16:17 +0200
Subject: [PATCH] use pipeline for all multiple sets

---
 inventory_provider/tasks/worker.py | 47 +++++++++++++++++++-----------
 1 file changed, 30 insertions(+), 17 deletions(-)

diff --git a/inventory_provider/tasks/worker.py b/inventory_provider/tasks/worker.py
index 3d56cba4..084c8003 100644
--- a/inventory_provider/tasks/worker.py
+++ b/inventory_provider/tasks/worker.py
@@ -105,10 +105,12 @@ def update_interfaces_to_services():
     r = get_next_redis(InventoryTask.config)
     for key in r.scan_iter('opsdb:interface_services:*'):
         r.delete(key)
+    rp = r.pipeline()
     for equipment_interface, services in interface_services.items():
-        r.set(
+        rp.set(
             'opsdb:interface_services:' + equipment_interface,
             json.dumps(services))
+    rp.execute()
 
     logger.debug('<<< update_interfaces_to_services')
 
@@ -150,13 +152,15 @@ def update_circuit_hierarchy():
         r = get_next_redis(InventoryTask.config)
         for key in r.scan_iter('opsdb:services:parents:*'):
             r.delete(key)
-        for cid, parents in parent_to_children.items():
-            r.set('opsdb:services:parents:%d' % cid, json.dumps(parents))
-
         for key in r.scan_iter('opsdb:services:children:*'):
             r.delete(key)
+
+        rp = r.pipeline()
+        for cid, parents in parent_to_children.items():
+            rp.set('opsdb:services:parents:%d' % cid, json.dumps(parents))
         for cid, children in child_to_parents.items():
-            r.set('opsdb:services:children:%d' % cid, json.dumps(children))
+            rp.set('opsdb:services:children:%d' % cid, json.dumps(children))
+        rp.execute()
 
     logger.debug('<<< update_circuit_hierarchy')
 
@@ -170,9 +174,11 @@ def update_geant_lambdas():
     for key in r.scan_iter('opsdb:geant_lambdas:*'):
         r.delete(key)
     with db.connection(InventoryTask.config["ops-db"]) as cx:
+        rp = r.pipeline()
         for ld in opsdb.get_geant_lambdas(cx):
-            r.set('opsdb:geant_lambdas:%s' % ld['name'].lower(),
+            rp.set('opsdb:geant_lambdas:%s' % ld['name'].lower(),
                   json.dumps(ld))
+        rp.execute()
 
     logger.debug('<<< geant_lambdas')
 
@@ -203,10 +209,12 @@ def update_junosspace_device_list(self):
             'message': 'found %d routers, saving details' % len(routers)
         })
 
-    for k in r.keys('junosspace:*'):
+    for k in r.scan_iter('junosspace:*'):
         r.delete(k)
+    rp = r.pipeline()
     for k, v in routers.items():
-        r.set(k, v)
+        rp.set(k, v)
+    rp.execute()
 
     logger.debug('<<< update_junosspace_device_list')
 
@@ -268,7 +276,7 @@ def _refresh_peers(hostname, key_base, peers):
     logger.debug(
         'removing cached %s for %r' % (key_base, hostname))
     r = get_next_redis(InventoryTask.config)
-    for k in r.keys(key_base + ':*'):
+    for k in r.scan_iter(key_base + ':*'):
         # potential race condition: another proc could have
         # delete this element between the time we read the
         # keys and the next statement ... check for None below
@@ -278,11 +286,13 @@ def _refresh_peers(hostname, key_base, peers):
             if value['router'] == hostname:
                 r.delete(k)
 
+    rp = r.pipeline()
     for peer in peers:
         peer['router'] = hostname
-        r.set(
+        rp.set(
             '%s:%s' % (key_base, peer['name']),
             json.dumps(peer))
+    rp.execute()
 
 
 def refresh_ix_public_peers(hostname, netconf):
@@ -312,26 +322,27 @@ def refresh_juniper_interface_list(hostname, netconf):
         'removing cached netconf-interfaces for %r' % hostname)
 
     r = get_next_redis(InventoryTask.config)
-    for k in r.keys('netconf-interfaces:%s:*' % hostname):
+    for k in r.scan_iter('netconf-interfaces:%s:*' % hostname):
         r.delete(k)
-
     for k in r.keys('netconf-interface-bundles:%s:*' % hostname):
         r.delete(k)
 
     all_bundles = defaultdict(list)
+
+    rp = r.pipeline()
     for ifc in juniper.list_interfaces(netconf):
         bundles = ifc.get('bundle', None)
         for bundle in bundles:
             if bundle:
                 all_bundles[bundle].append(ifc['name'])
-
-        r.set(
+        rp.set(
             'netconf-interfaces:%s:%s' % (hostname, ifc['name']),
             json.dumps(ifc))
     for k, v in all_bundles.items():
-        r.set(
+        rp.set(
             'netconf-interface-bundles:%s:%s' % (hostname, k),
             json.dumps(v))
+    rp.execute()
 
 
 @app.task(base=InventoryTask, bind=True)
@@ -427,7 +438,7 @@ def _derive_router_hostnames(config):
         junosspace_equipment.add(m.group(1))
 
     opsdb_equipment = set()
-    for k in r.keys('opsdb:interface_services:*'):
+    for k in r.scan_iter('opsdb:interface_services:*'):
         m = re.match(
             'opsdb:interface_services:([^:]+):.*$',
             k.decode('utf-8'))
@@ -558,8 +569,10 @@ def _build_subnet_db(update_callback=lambda s: None):
 
     update_callback('saving {} subnets'.format(len(subnets)))
 
+    rp = r.pipeline()
     for k, v in subnets.items():
-        r.set('subnets:' + k, json.dumps(v))
+        rp.set('subnets:' + k, json.dumps(v))
+    rp.execute()
 
 
 def check_task_status(task_id):
-- 
GitLab