Skip to content
Snippets Groups Projects
Commit c9fb15c1 authored by Erik Reid's avatar Erik Reid
Browse files

Finished feature redis-pipelines-for-big-ops.

parents a9beafb5 a15d91b9
No related branches found
No related tags found
No related merge requests found
...@@ -105,10 +105,12 @@ def update_interfaces_to_services(): ...@@ -105,10 +105,12 @@ def update_interfaces_to_services():
r = get_next_redis(InventoryTask.config) r = get_next_redis(InventoryTask.config)
for key in r.scan_iter('opsdb:interface_services:*'): for key in r.scan_iter('opsdb:interface_services:*'):
r.delete(key) r.delete(key)
rp = r.pipeline()
for equipment_interface, services in interface_services.items(): for equipment_interface, services in interface_services.items():
r.set( rp.set(
'opsdb:interface_services:' + equipment_interface, 'opsdb:interface_services:' + equipment_interface,
json.dumps(services)) json.dumps(services))
rp.execute()
logger.debug('<<< update_interfaces_to_services') logger.debug('<<< update_interfaces_to_services')
...@@ -150,13 +152,15 @@ def update_circuit_hierarchy(): ...@@ -150,13 +152,15 @@ def update_circuit_hierarchy():
r = get_next_redis(InventoryTask.config) r = get_next_redis(InventoryTask.config)
for key in r.scan_iter('opsdb:services:parents:*'): for key in r.scan_iter('opsdb:services:parents:*'):
r.delete(key) r.delete(key)
for cid, parents in parent_to_children.items():
r.set('opsdb:services:parents:%d' % cid, json.dumps(parents))
for key in r.scan_iter('opsdb:services:children:*'): for key in r.scan_iter('opsdb:services:children:*'):
r.delete(key) r.delete(key)
rp = r.pipeline()
for cid, parents in parent_to_children.items():
rp.set('opsdb:services:parents:%d' % cid, json.dumps(parents))
for cid, children in child_to_parents.items(): for cid, children in child_to_parents.items():
r.set('opsdb:services:children:%d' % cid, json.dumps(children)) rp.set('opsdb:services:children:%d' % cid, json.dumps(children))
rp.execute()
logger.debug('<<< update_circuit_hierarchy') logger.debug('<<< update_circuit_hierarchy')
...@@ -170,9 +174,12 @@ def update_geant_lambdas(): ...@@ -170,9 +174,12 @@ def update_geant_lambdas():
for key in r.scan_iter('opsdb:geant_lambdas:*'): for key in r.scan_iter('opsdb:geant_lambdas:*'):
r.delete(key) r.delete(key)
with db.connection(InventoryTask.config["ops-db"]) as cx: with db.connection(InventoryTask.config["ops-db"]) as cx:
rp = r.pipeline()
for ld in opsdb.get_geant_lambdas(cx): for ld in opsdb.get_geant_lambdas(cx):
r.set('opsdb:geant_lambdas:%s' % ld['name'].lower(), rp.set(
json.dumps(ld)) 'opsdb:geant_lambdas:%s' % ld['name'].lower(),
json.dumps(ld))
rp.execute()
logger.debug('<<< geant_lambdas') logger.debug('<<< geant_lambdas')
...@@ -203,10 +210,12 @@ def update_junosspace_device_list(self): ...@@ -203,10 +210,12 @@ def update_junosspace_device_list(self):
'message': 'found %d routers, saving details' % len(routers) 'message': 'found %d routers, saving details' % len(routers)
}) })
for k in r.keys('junosspace:*'): for k in r.scan_iter('junosspace:*'):
r.delete(k) r.delete(k)
rp = r.pipeline()
for k, v in routers.items(): for k, v in routers.items():
r.set(k, v) rp.set(k, v)
rp.execute()
logger.debug('<<< update_junosspace_device_list') logger.debug('<<< update_junosspace_device_list')
...@@ -268,7 +277,7 @@ def _refresh_peers(hostname, key_base, peers): ...@@ -268,7 +277,7 @@ def _refresh_peers(hostname, key_base, peers):
logger.debug( logger.debug(
'removing cached %s for %r' % (key_base, hostname)) 'removing cached %s for %r' % (key_base, hostname))
r = get_next_redis(InventoryTask.config) r = get_next_redis(InventoryTask.config)
for k in r.keys(key_base + ':*'): for k in r.scan_iter(key_base + ':*'):
# potential race condition: another proc could have # potential race condition: another proc could have
# delete this element between the time we read the # delete this element between the time we read the
# keys and the next statement ... check for None below # keys and the next statement ... check for None below
...@@ -278,11 +287,13 @@ def _refresh_peers(hostname, key_base, peers): ...@@ -278,11 +287,13 @@ def _refresh_peers(hostname, key_base, peers):
if value['router'] == hostname: if value['router'] == hostname:
r.delete(k) r.delete(k)
rp = r.pipeline()
for peer in peers: for peer in peers:
peer['router'] = hostname peer['router'] = hostname
r.set( rp.set(
'%s:%s' % (key_base, peer['name']), '%s:%s' % (key_base, peer['name']),
json.dumps(peer)) json.dumps(peer))
rp.execute()
def refresh_ix_public_peers(hostname, netconf): def refresh_ix_public_peers(hostname, netconf):
...@@ -312,26 +323,27 @@ def refresh_juniper_interface_list(hostname, netconf): ...@@ -312,26 +323,27 @@ def refresh_juniper_interface_list(hostname, netconf):
'removing cached netconf-interfaces for %r' % hostname) 'removing cached netconf-interfaces for %r' % hostname)
r = get_next_redis(InventoryTask.config) r = get_next_redis(InventoryTask.config)
for k in r.keys('netconf-interfaces:%s:*' % hostname): for k in r.scan_iter('netconf-interfaces:%s:*' % hostname):
r.delete(k) r.delete(k)
for k in r.keys('netconf-interface-bundles:%s:*' % hostname): for k in r.keys('netconf-interface-bundles:%s:*' % hostname):
r.delete(k) r.delete(k)
all_bundles = defaultdict(list) all_bundles = defaultdict(list)
rp = r.pipeline()
for ifc in juniper.list_interfaces(netconf): for ifc in juniper.list_interfaces(netconf):
bundles = ifc.get('bundle', None) bundles = ifc.get('bundle', None)
for bundle in bundles: for bundle in bundles:
if bundle: if bundle:
all_bundles[bundle].append(ifc['name']) all_bundles[bundle].append(ifc['name'])
rp.set(
r.set(
'netconf-interfaces:%s:%s' % (hostname, ifc['name']), 'netconf-interfaces:%s:%s' % (hostname, ifc['name']),
json.dumps(ifc)) json.dumps(ifc))
for k, v in all_bundles.items(): for k, v in all_bundles.items():
r.set( rp.set(
'netconf-interface-bundles:%s:%s' % (hostname, k), 'netconf-interface-bundles:%s:%s' % (hostname, k),
json.dumps(v)) json.dumps(v))
rp.execute()
@app.task(base=InventoryTask, bind=True) @app.task(base=InventoryTask, bind=True)
...@@ -427,7 +439,7 @@ def _derive_router_hostnames(config): ...@@ -427,7 +439,7 @@ def _derive_router_hostnames(config):
junosspace_equipment.add(m.group(1)) junosspace_equipment.add(m.group(1))
opsdb_equipment = set() opsdb_equipment = set()
for k in r.keys('opsdb:interface_services:*'): for k in r.scan_iter('opsdb:interface_services:*'):
m = re.match( m = re.match(
'opsdb:interface_services:([^:]+):.*$', 'opsdb:interface_services:([^:]+):.*$',
k.decode('utf-8')) k.decode('utf-8'))
...@@ -558,8 +570,10 @@ def _build_subnet_db(update_callback=lambda s: None): ...@@ -558,8 +570,10 @@ def _build_subnet_db(update_callback=lambda s: None):
update_callback('saving {} subnets'.format(len(subnets))) update_callback('saving {} subnets'.format(len(subnets)))
rp = r.pipeline()
for k, v in subnets.items(): for k, v in subnets.items():
r.set('subnets:' + k, json.dumps(v)) rp.set('subnets:' + k, json.dumps(v))
rp.execute()
def check_task_status(task_id): def check_task_status(task_id):
......
...@@ -98,12 +98,12 @@ class MockedRedis(object): ...@@ -98,12 +98,12 @@ class MockedRedis(object):
def scan_iter(self, glob=None): def scan_iter(self, glob=None):
if not glob: if not glob:
for k in MockedRedis.db.keys(): for k in list(MockedRedis.db.keys()):
yield k.encode('utf-8') yield k.encode('utf-8')
m = re.match(r'^([^*]+)\*$', glob) m = re.match(r'^([^*]+)\*$', glob)
assert m # all expected globs are like this assert m # all expected globs are like this
for k in MockedRedis.db.keys(): for k in list(MockedRedis.db.keys()):
if k.startswith(m.group(1)): if k.startswith(m.group(1)):
yield k.encode('utf-8') yield k.encode('utf-8')
...@@ -114,6 +114,12 @@ class MockedRedis(object): ...@@ -114,6 +114,12 @@ class MockedRedis(object):
# only called from testing routes (hopefully) # only called from testing routes (hopefully)
pass pass
def execute(self):
pass
def pipeline(self, *args, **kwargs):
return self
@pytest.fixture @pytest.fixture
def cached_test_data(): def cached_test_data():
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment