Skip to content
Snippets Groups Projects
Commit 47fab4cb authored by Erik Reid's avatar Erik Reid
Browse files

latch dbs after finalizing

parent 1aabc1df
No related branches found
No related tags found
No related merge requests found
...@@ -41,24 +41,7 @@ def check_task_status(task_id): ...@@ -41,24 +41,7 @@ def check_task_status(task_id):
@routes.route("latchdb", methods=['GET', 'POST']) @routes.route("latchdb", methods=['GET', 'POST'])
def latch_db(): def latch_db():
config = current_app.config["INVENTORY_PROVIDER_CONFIG"] config = current_app.config["INVENTORY_PROVIDER_CONFIG"]
db_ids = config['redis-databases'] worker_common.latch_db(config)
db_ids = sorted(set(db_ids))
r = worker_common.get_next_redis(config)
latch = worker_common.get_latch(r)
if not latch:
latch = {
'current': db_ids[0],
'next': db_ids[0]
}
next_idx = db_ids.index(latch['next'])
next_idx = (next_idx + 1) % len(db_ids)
worker_common.set_latch(
config, new_current=latch['next'], new_next=db_ids[next_idx])
r = worker_common.get_current_redis(config) r = worker_common.get_current_redis(config)
return jsonify(worker_common.get_latch(r)) return jsonify(worker_common.get_latch(r))
...@@ -48,6 +48,24 @@ def set_latch(config, new_current, new_next): ...@@ -48,6 +48,24 @@ def set_latch(config, new_current, new_next):
r.set('db:latch', json.dumps(latch)) r.set('db:latch', json.dumps(latch))
def latch_db(config):
db_ids = config['redis-databases']
db_ids = sorted(set(db_ids))
r = get_next_redis(config)
latch = get_latch(r)
if not latch:
latch = {
'current': db_ids[0],
'next': db_ids[0]
}
next_idx = db_ids.index(latch['next'])
next_idx = (next_idx + 1) % len(db_ids)
set_latch(config, new_current=latch['next'], new_next=db_ids[next_idx])
def _get_redis(config, dbid=None): def _get_redis(config, dbid=None):
if dbid is None: if dbid is None:
......
...@@ -12,7 +12,7 @@ from lxml import etree ...@@ -12,7 +12,7 @@ from lxml import etree
import jsonschema import jsonschema
from inventory_provider.tasks.app import app from inventory_provider.tasks.app import app
from inventory_provider.tasks.common import get_next_redis from inventory_provider.tasks.common import get_next_redis, latch_db
from inventory_provider import config from inventory_provider import config
from inventory_provider import environment from inventory_provider import environment
from inventory_provider.db import db, opsdb from inventory_provider.db import db, opsdb
...@@ -464,13 +464,9 @@ def launch_refresh_cache_all(config): ...@@ -464,13 +464,9 @@ def launch_refresh_cache_all(config):
subtasks = [ subtasks = [
update_equipment_locations.apply_async(), update_equipment_locations.apply_async(),
] ]
# for hostname in _derive_router_hostnames(config): for hostname in _derive_router_hostnames(config):
hostnames = _derive_router_hostnames(config) logger.debug('queueing router refresh jobs for %r' % hostname)
hostnames = list(hostnames)[:2] subtasks.append(reload_router_config.apply_async(args=[hostname]))
logger.error("HOSTNAMES: %r" % hostnames)
# for hostname in hostnames:
# logger.debug('queueing router refresh jobs for %r' % hostname)
# subtasks.append(reload_router_config.apply_async(args=[hostname]))
pending_task_ids = [x.id for x in subtasks] pending_task_ids = [x.id for x in subtasks]
...@@ -479,12 +475,10 @@ def launch_refresh_cache_all(config): ...@@ -479,12 +475,10 @@ def launch_refresh_cache_all(config):
return pending_task_ids return pending_task_ids
def _wait_for_tasks(task_ids): def _wait_for_tasks(task_ids, update_callback=lambda s: None):
logger = logging.getLogger(__name__)
start_time = time.time() start_time = time.time()
while task_ids and time.time() - start_time < FINALIZER_TIMEOUT_S: while task_ids and time.time() - start_time < FINALIZER_TIMEOUT_S:
logger.debug('waiting for tasks to complete: %r', task_ids) update_callback('waiting for tasks to complete: %r' % task_ids)
time.sleep(FINALIZER_POLLING_FREQUENCY_S) time.sleep(FINALIZER_POLLING_FREQUENCY_S)
task_ids = [ task_ids = [
id for id in task_ids id for id in task_ids
...@@ -495,9 +489,8 @@ def _wait_for_tasks(task_ids): ...@@ -495,9 +489,8 @@ def _wait_for_tasks(task_ids):
raise InventoryTaskError( raise InventoryTaskError(
'timeout waiting for pending tasks to complete') 'timeout waiting for pending tasks to complete')
logger.debug( update_callback('pending taskscompleted in {} seconds'.format(
'previous tasks completed in {} seconds'.format( time.time() - start_time))
time.time - start_time))
@app.task(base=InventoryTask, bind=True) @app.task(base=InventoryTask, bind=True)
...@@ -512,19 +505,32 @@ def refresh_finalizer(self, pending_task_ids_json): ...@@ -512,19 +505,32 @@ def refresh_finalizer(self, pending_task_ids_json):
"items": {"type": "string"} "items": {"type": "string"}
} }
def _update(s):
logger.debug(s)
self.update_state(
state=states.STARTED,
meta={
'task': 'refresh_finalizer',
'message': s
})
task_ids = json.loads(pending_task_ids_json) task_ids = json.loads(pending_task_ids_json)
logger.debug('task_ids: %r' % task_ids) logger.debug('task_ids: %r' % task_ids)
jsonschema.validate(task_ids, input_schema) jsonschema.validate(task_ids, input_schema)
_wait_for_tasks(task_ids) _wait_for_tasks(task_ids, update_callback=_update)
_build_subnet_db() _build_subnet_db(update_callback=_update)
_update('latching current/next dbs')
latch_db(InventoryTask.config)
logger.debug('<<< refresh_finalizer') logger.debug('<<< refresh_finalizer')
def _build_subnet_db(): def _build_subnet_db(update_callback=lambda s: None):
r = get_next_redis(InventoryTask.config) r = get_next_redis(InventoryTask.config)
update_callback('loading all network addresses')
subnets = {} subnets = {}
for k in r.scan_iter('reverse_interface_addresses:*'): for k in r.scan_iter('reverse_interface_addresses:*'):
info = r.get(k.decode('utf-8')).decode('utf-8') info = r.get(k.decode('utf-8')).decode('utf-8')
...@@ -535,6 +541,8 @@ def _build_subnet_db(): ...@@ -535,6 +541,8 @@ def _build_subnet_db():
'router': info['router'] 'router': info['router']
}) })
update_callback('saving {} subnets'.format(len(subnets)))
for k, v in subnets.items(): for k, v in subnets.items():
r.set('subnets:' + k, json.dumps(v)) r.set('subnets:' + k, json.dumps(v))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment