Skip to content
Snippets Groups Projects
Commit ef3728f0 authored by Erik Reid's avatar Erik Reid
Browse files

use a count parameter with scan_iter

scan_iter is very slow when the result set is empty,
so it seems better to use a decent block count
parent 3d5153c1
No related branches found
No related tags found
No related merge requests found
...@@ -121,7 +121,8 @@ def clear_joblog(r, keys_read_event=None): ...@@ -121,7 +121,8 @@ def clear_joblog(r, keys_read_event=None):
:return: :return:
""" """
rp = r.pipeline() rp = r.pipeline()
for key in r.scan_iter('joblog:*'): # scan with bigger batches, to mitigate network latency effects
for key in r.scan_iter('joblog:*', count=1000):
rp.delete(key) rp.delete(key)
if keys_read_event: if keys_read_event:
assert isinstance(keys_read_event, threading.Event) # sanity assert isinstance(keys_read_event, threading.Event) # sanity
...@@ -193,7 +194,8 @@ def load_task_log(config_params, ignored_keys=[]): ...@@ -193,7 +194,8 @@ def load_task_log(config_params, ignored_keys=[]):
threads.append({'thread': t, 'queue': q}) threads.append({'thread': t, 'queue': q})
r = get_current_redis(config_params) r = get_current_redis(config_params)
for k in r.scan_iter('joblog:*'): # scan with bigger batches, to mitigate network latency effects
for k in r.scan_iter('joblog:*', count=1000):
k = k.decode('utf-8') k = k.decode('utf-8')
if k in ignored_keys: if k in ignored_keys:
logger.debug('ignoring key: {k}') logger.debug('ignoring key: {k}')
......
...@@ -146,7 +146,8 @@ def update_interfaces_to_services(self): ...@@ -146,7 +146,8 @@ def update_interfaces_to_services(self):
r = get_next_redis(InventoryTask.config) r = get_next_redis(InventoryTask.config)
rp = r.pipeline() rp = r.pipeline()
for key in r.scan_iter('opsdb:interface_services:*'): # scan with bigger batches, to mitigate network latency effects
for key in r.scan_iter('opsdb:interface_services:*', count=1000):
rp.delete(key) rp.delete(key)
rp.execute() rp.execute()
...@@ -207,7 +208,8 @@ def update_access_services(self): ...@@ -207,7 +208,8 @@ def update_access_services(self):
r = get_next_redis(InventoryTask.config) r = get_next_redis(InventoryTask.config)
rp = r.pipeline() rp = r.pipeline()
for key in r.scan_iter('opsdb:access_services:*'): # scan with bigger batches, to mitigate network latency effects
for key in r.scan_iter('opsdb:access_services:*', count=1000):
rp.delete(key) rp.delete(key)
rp.execute() rp.execute()
...@@ -225,7 +227,8 @@ def update_lg_routers(self): ...@@ -225,7 +227,8 @@ def update_lg_routers(self):
r = get_next_redis(InventoryTask.config) r = get_next_redis(InventoryTask.config)
rp = r.pipeline() rp = r.pipeline()
for k in r.scan_iter('opsdb:lg:*'): # scan with bigger batches, to mitigate network latency effects
for k in r.scan_iter('opsdb:lg:*', count=1000):
rp.delete(k) rp.delete(k)
rp.execute() rp.execute()
...@@ -241,7 +244,8 @@ def update_lg_routers(self): ...@@ -241,7 +244,8 @@ def update_lg_routers(self):
def update_equipment_locations(self): def update_equipment_locations(self):
r = get_next_redis(InventoryTask.config) r = get_next_redis(InventoryTask.config)
rp = r.pipeline() rp = r.pipeline()
for k in r.scan_iter('opsdb:location:*'): # scan with bigger batches, to mitigate network latency effects
for k in r.scan_iter('opsdb:location:*', count=1000):
rp.delete(k) rp.delete(k)
rp.execute() rp.execute()
...@@ -271,9 +275,10 @@ def update_circuit_hierarchy(self): ...@@ -271,9 +275,10 @@ def update_circuit_hierarchy(self):
r = get_next_redis(InventoryTask.config) r = get_next_redis(InventoryTask.config)
rp = r.pipeline() rp = r.pipeline()
for key in r.scan_iter('opsdb:services:parents:*'): # scan with bigger batches, to mitigate network latency effects
for key in r.scan_iter('opsdb:services:parents:*', count=1000):
rp.delete(key) rp.delete(key)
for key in r.scan_iter('opsdb:services:children:*'): for key in r.scan_iter('opsdb:services:children:*', count=1000):
rp.delete(key) rp.delete(key)
rp.execute() rp.execute()
...@@ -291,7 +296,8 @@ def update_geant_lambdas(self): ...@@ -291,7 +296,8 @@ def update_geant_lambdas(self):
r = get_next_redis(InventoryTask.config) r = get_next_redis(InventoryTask.config)
rp = r.pipeline() rp = r.pipeline()
for key in r.scan_iter('opsdb:geant_lambdas:*'): # scan with bigger batches, to mitigate network latency effects
for key in r.scan_iter('opsdb:geant_lambdas:*', count=1000):
rp.delete(key) rp.delete(key)
rp.execute() rp.execute()
...@@ -425,9 +431,11 @@ def refresh_juniper_interface_list(hostname, netconf): ...@@ -425,9 +431,11 @@ def refresh_juniper_interface_list(hostname, netconf):
r = get_next_redis(InventoryTask.config) r = get_next_redis(InventoryTask.config)
rp = r.pipeline() rp = r.pipeline()
for k in r.scan_iter('netconf-interfaces:%s:*' % hostname): # scan with bigger batches, to mitigate network latency effects
for k in r.scan_iter('netconf-interfaces:%s:*' % hostname, count=1000):
rp.delete(k) rp.delete(k)
for k in r.keys('netconf-interface-bundles:%s:*' % hostname): for k in r.scan_iter(
'netconf-interface-bundles:%s:*' % hostname, count=1000):
rp.delete(k) rp.delete(k)
rp.execute() rp.execute()
...@@ -695,7 +703,8 @@ def _build_subnet_db(update_callback=lambda s: None): ...@@ -695,7 +703,8 @@ def _build_subnet_db(update_callback=lambda s: None):
update_callback('loading all network addresses') update_callback('loading all network addresses')
subnets = {} subnets = {}
for k in r.scan_iter('reverse_interface_addresses:*'): # scan with bigger batches, to mitigate network latency effects
for k in r.scan_iter('reverse_interface_addresses:*', count=1000):
info = r.get(k.decode('utf-8')).decode('utf-8') info = r.get(k.decode('utf-8')).decode('utf-8')
info = json.loads(info) info = json.loads(info)
entry = subnets.setdefault(info['interface address'], []) entry = subnets.setdefault(info['interface address'], [])
......
...@@ -107,7 +107,7 @@ class MockedRedis(object): ...@@ -107,7 +107,7 @@ class MockedRedis(object):
key = key.decode('utf-8') key = key.decode('utf-8')
del MockedRedis.db[key] del MockedRedis.db[key]
def scan_iter(self, glob=None): def scan_iter(self, glob=None, count='unused'):
if not glob: if not glob:
for k in list(MockedRedis.db.keys()): for k in list(MockedRedis.db.keys()):
yield k.encode('utf-8') yield k.encode('utf-8')
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment