diff --git a/inventory_provider/tasks/worker.py b/inventory_provider/tasks/worker.py
index 3d56cba4e9e5a7f49d65f89a3b84bcb0b72ef0bc..1ac375df5ea1464db6c31ba3aa424d88e285c5f8 100644
--- a/inventory_provider/tasks/worker.py
+++ b/inventory_provider/tasks/worker.py
@@ -105,10 +105,12 @@ def update_interfaces_to_services():
     r = get_next_redis(InventoryTask.config)
     for key in r.scan_iter('opsdb:interface_services:*'):
         r.delete(key)
+    rp = r.pipeline()
     for equipment_interface, services in interface_services.items():
-        r.set(
+        rp.set(
             'opsdb:interface_services:' + equipment_interface,
             json.dumps(services))
+    rp.execute()
 
     logger.debug('<<< update_interfaces_to_services')
 
@@ -150,13 +152,15 @@ def update_circuit_hierarchy():
         r = get_next_redis(InventoryTask.config)
         for key in r.scan_iter('opsdb:services:parents:*'):
             r.delete(key)
-        for cid, parents in parent_to_children.items():
-            r.set('opsdb:services:parents:%d' % cid, json.dumps(parents))
-
         for key in r.scan_iter('opsdb:services:children:*'):
             r.delete(key)
+
+        rp = r.pipeline()
+        for cid, parents in parent_to_children.items():
+            rp.set('opsdb:services:parents:%d' % cid, json.dumps(parents))
         for cid, children in child_to_parents.items():
-            r.set('opsdb:services:children:%d' % cid, json.dumps(children))
+            rp.set('opsdb:services:children:%d' % cid, json.dumps(children))
+        rp.execute()
 
     logger.debug('<<< update_circuit_hierarchy')
 
@@ -170,9 +174,12 @@ def update_geant_lambdas():
     for key in r.scan_iter('opsdb:geant_lambdas:*'):
         r.delete(key)
     with db.connection(InventoryTask.config["ops-db"]) as cx:
+        rp = r.pipeline()
         for ld in opsdb.get_geant_lambdas(cx):
-            r.set('opsdb:geant_lambdas:%s' % ld['name'].lower(),
-                  json.dumps(ld))
+            rp.set(
+                'opsdb:geant_lambdas:%s' % ld['name'].lower(),
+                json.dumps(ld))
+        rp.execute()
 
     logger.debug('<<< geant_lambdas')
 
@@ -203,10 +210,12 @@ def update_junosspace_device_list(self):
             'message': 'found %d routers, saving details' % len(routers)
         })
 
-    for k in r.keys('junosspace:*'):
+    for k in r.scan_iter('junosspace:*'):
         r.delete(k)
+    rp = r.pipeline()
     for k, v in routers.items():
-        r.set(k, v)
+        rp.set(k, v)
+    rp.execute()
 
     logger.debug('<<< update_junosspace_device_list')
 
@@ -268,7 +277,7 @@ def _refresh_peers(hostname, key_base, peers):
     logger.debug(
         'removing cached %s for %r' % (key_base, hostname))
     r = get_next_redis(InventoryTask.config)
-    for k in r.keys(key_base + ':*'):
+    for k in r.scan_iter(key_base + ':*'):
         # potential race condition: another proc could have
         # delete this element between the time we read the
         # keys and the next statement ... check for None below
@@ -278,11 +287,13 @@ def _refresh_peers(hostname, key_base, peers):
             if value['router'] == hostname:
                 r.delete(k)
 
+    rp = r.pipeline()
     for peer in peers:
         peer['router'] = hostname
-        r.set(
+        rp.set(
             '%s:%s' % (key_base, peer['name']),
             json.dumps(peer))
+    rp.execute()
 
 
 def refresh_ix_public_peers(hostname, netconf):
@@ -312,26 +323,27 @@ def refresh_juniper_interface_list(hostname, netconf):
         'removing cached netconf-interfaces for %r' % hostname)
 
     r = get_next_redis(InventoryTask.config)
-    for k in r.keys('netconf-interfaces:%s:*' % hostname):
+    for k in r.scan_iter('netconf-interfaces:%s:*' % hostname):
         r.delete(k)
-
     for k in r.keys('netconf-interface-bundles:%s:*' % hostname):
         r.delete(k)
 
     all_bundles = defaultdict(list)
+
+    rp = r.pipeline()
     for ifc in juniper.list_interfaces(netconf):
         bundles = ifc.get('bundle', None)
         for bundle in bundles:
             if bundle:
                 all_bundles[bundle].append(ifc['name'])
-
-        r.set(
+        rp.set(
             'netconf-interfaces:%s:%s' % (hostname, ifc['name']),
             json.dumps(ifc))
     for k, v in all_bundles.items():
-        r.set(
+        rp.set(
             'netconf-interface-bundles:%s:%s' % (hostname, k),
             json.dumps(v))
+    rp.execute()
 
 
 @app.task(base=InventoryTask, bind=True)
@@ -427,7 +439,7 @@ def _derive_router_hostnames(config):
         junosspace_equipment.add(m.group(1))
 
     opsdb_equipment = set()
-    for k in r.keys('opsdb:interface_services:*'):
+    for k in r.scan_iter('opsdb:interface_services:*'):
         m = re.match(
             'opsdb:interface_services:([^:]+):.*$',
             k.decode('utf-8'))
@@ -558,8 +570,10 @@ def _build_subnet_db(update_callback=lambda s: None):
 
     update_callback('saving {} subnets'.format(len(subnets)))
 
+    rp = r.pipeline()
     for k, v in subnets.items():
-        r.set('subnets:' + k, json.dumps(v))
+        rp.set('subnets:' + k, json.dumps(v))
+    rp.execute()
 
 
 def check_task_status(task_id):
diff --git a/test/conftest.py b/test/conftest.py
index 6009cc6a90f39ff520818e28846e953ce444dc30..2c9869be1015d6ef6d63b70e2fe09fe69a5a40af 100644
--- a/test/conftest.py
+++ b/test/conftest.py
@@ -98,12 +98,12 @@ class MockedRedis(object):
 
     def scan_iter(self, glob=None):
         if not glob:
-            for k in MockedRedis.db.keys():
+            for k in list(MockedRedis.db.keys()):
                 yield k.encode('utf-8')
 
         m = re.match(r'^([^*]+)\*$', glob)
         assert m  # all expected globs are like this
-        for k in MockedRedis.db.keys():
+        for k in list(MockedRedis.db.keys()):
             if k.startswith(m.group(1)):
                 yield k.encode('utf-8')
 
@@ -114,6 +114,12 @@ class MockedRedis(object):
         # only called from testing routes (hopefully)
         pass
 
+    def execute(self):
+        pass
+
+    def pipeline(self, *args, **kwargs):
+        return self
+
 
 @pytest.fixture
 def cached_test_data():