Skip to content
Snippets Groups Projects
Commit 2efc6584 authored by Erik Reid's avatar Erik Reid
Browse files

use pipeline for all multiple sets

parent a9beafb5
No related branches found
No related tags found
No related merge requests found
......@@ -105,10 +105,12 @@ def update_interfaces_to_services():
r = get_next_redis(InventoryTask.config)
for key in r.scan_iter('opsdb:interface_services:*'):
r.delete(key)
rp = r.pipeline()
for equipment_interface, services in interface_services.items():
r.set(
rp.set(
'opsdb:interface_services:' + equipment_interface,
json.dumps(services))
rp.execute()
logger.debug('<<< update_interfaces_to_services')
......@@ -150,13 +152,15 @@ def update_circuit_hierarchy():
r = get_next_redis(InventoryTask.config)
for key in r.scan_iter('opsdb:services:parents:*'):
r.delete(key)
for cid, parents in parent_to_children.items():
r.set('opsdb:services:parents:%d' % cid, json.dumps(parents))
for key in r.scan_iter('opsdb:services:children:*'):
r.delete(key)
rp = r.pipeline()
for cid, parents in parent_to_children.items():
rp.set('opsdb:services:parents:%d' % cid, json.dumps(parents))
for cid, children in child_to_parents.items():
r.set('opsdb:services:children:%d' % cid, json.dumps(children))
rp.set('opsdb:services:children:%d' % cid, json.dumps(children))
rp.execute()
logger.debug('<<< update_circuit_hierarchy')
......@@ -170,9 +174,11 @@ def update_geant_lambdas():
for key in r.scan_iter('opsdb:geant_lambdas:*'):
r.delete(key)
with db.connection(InventoryTask.config["ops-db"]) as cx:
rp = r.pipeline()
for ld in opsdb.get_geant_lambdas(cx):
r.set('opsdb:geant_lambdas:%s' % ld['name'].lower(),
rp.set('opsdb:geant_lambdas:%s' % ld['name'].lower(),
json.dumps(ld))
rp.execute()
logger.debug('<<< geant_lambdas')
......@@ -203,10 +209,12 @@ def update_junosspace_device_list(self):
'message': 'found %d routers, saving details' % len(routers)
})
for k in r.keys('junosspace:*'):
for k in r.scan_iter('junosspace:*'):
r.delete(k)
rp = r.pipeline()
for k, v in routers.items():
r.set(k, v)
rp.set(k, v)
rp.execute()
logger.debug('<<< update_junosspace_device_list')
......@@ -268,7 +276,7 @@ def _refresh_peers(hostname, key_base, peers):
logger.debug(
'removing cached %s for %r' % (key_base, hostname))
r = get_next_redis(InventoryTask.config)
for k in r.keys(key_base + ':*'):
for k in r.scan_iter(key_base + ':*'):
# potential race condition: another proc could have
# delete this element between the time we read the
# keys and the next statement ... check for None below
......@@ -278,11 +286,13 @@ def _refresh_peers(hostname, key_base, peers):
if value['router'] == hostname:
r.delete(k)
rp = r.pipeline()
for peer in peers:
peer['router'] = hostname
r.set(
rp.set(
'%s:%s' % (key_base, peer['name']),
json.dumps(peer))
rp.execute()
def refresh_ix_public_peers(hostname, netconf):
......@@ -312,26 +322,27 @@ def refresh_juniper_interface_list(hostname, netconf):
'removing cached netconf-interfaces for %r' % hostname)
r = get_next_redis(InventoryTask.config)
for k in r.keys('netconf-interfaces:%s:*' % hostname):
for k in r.scan_iter('netconf-interfaces:%s:*' % hostname):
r.delete(k)
for k in r.keys('netconf-interface-bundles:%s:*' % hostname):
r.delete(k)
all_bundles = defaultdict(list)
rp = r.pipeline()
for ifc in juniper.list_interfaces(netconf):
bundles = ifc.get('bundle', None)
for bundle in bundles:
if bundle:
all_bundles[bundle].append(ifc['name'])
r.set(
rp.set(
'netconf-interfaces:%s:%s' % (hostname, ifc['name']),
json.dumps(ifc))
for k, v in all_bundles.items():
r.set(
rp.set(
'netconf-interface-bundles:%s:%s' % (hostname, k),
json.dumps(v))
rp.execute()
@app.task(base=InventoryTask, bind=True)
......@@ -427,7 +438,7 @@ def _derive_router_hostnames(config):
junosspace_equipment.add(m.group(1))
opsdb_equipment = set()
for k in r.keys('opsdb:interface_services:*'):
for k in r.scan_iter('opsdb:interface_services:*'):
m = re.match(
'opsdb:interface_services:([^:]+):.*$',
k.decode('utf-8'))
......@@ -558,8 +569,10 @@ def _build_subnet_db(update_callback=lambda s: None):
update_callback('saving {} subnets'.format(len(subnets)))
rp = r.pipeline()
for k, v in subnets.items():
r.set('subnets:' + k, json.dumps(v))
rp.set('subnets:' + k, json.dumps(v))
rp.execute()
def check_task_status(task_id):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment