Skip to content
Snippets Groups Projects
Commit baf5c16c authored by Erik Reid's avatar Erik Reid
Browse files

Finished feature performance-improvements.

parents 0127216b 4f79edfb
No related branches found
No related tags found
No related merge requests found
......@@ -41,24 +41,7 @@ def check_task_status(task_id):
@routes.route("latchdb", methods=['GET', 'POST'])
def latch_db():
config = current_app.config["INVENTORY_PROVIDER_CONFIG"]
db_ids = config['redis-databases']
db_ids = sorted(set(db_ids))
r = worker_common.get_next_redis(config)
latch = worker_common.get_latch(r)
if not latch:
latch = {
'current': db_ids[0],
'next': db_ids[0]
}
next_idx = db_ids.index(latch['next'])
next_idx = (next_idx + 1) % len(db_ids)
worker_common.set_latch(
config, new_current=latch['next'], new_next=db_ids[next_idx])
worker_common.latch_db(config)
r = worker_common.get_current_redis(config)
return jsonify(worker_common.get_latch(r))
......@@ -48,6 +48,24 @@ def set_latch(config, new_current, new_next):
r.set('db:latch', json.dumps(latch))
def latch_db(config):
db_ids = config['redis-databases']
db_ids = sorted(set(db_ids))
r = get_next_redis(config)
latch = get_latch(r)
if not latch:
latch = {
'current': db_ids[0],
'next': db_ids[0]
}
next_idx = db_ids.index(latch['next'])
next_idx = (next_idx + 1) % len(db_ids)
set_latch(config, new_current=latch['next'], new_next=db_ids[next_idx])
def _get_redis(config, dbid=None):
if dbid is None:
......
......@@ -2,21 +2,26 @@ import json
import logging
import os
import re
import time
from celery import Task, states
from celery.result import AsyncResult
from collections import defaultdict
from lxml import etree
import jsonschema
from inventory_provider.tasks.app import app
from inventory_provider.tasks.common import get_next_redis
from inventory_provider.tasks.common import get_next_redis, latch_db
from inventory_provider import config
from inventory_provider import environment
from inventory_provider.db import db, opsdb
from inventory_provider import snmp
from inventory_provider import juniper
FINALIZER_POLLING_FREQUENCY_S = 2.5
FINALIZER_TIMEOUT_S = 300
# TODO: error callback (cf. http://docs.celeryproject.org/en/latest/userguide/calling.html#linking-callbacks-errbacks) # noqa: E501
environment.setup_logging()
......@@ -441,6 +446,9 @@ def launch_refresh_cache_all(config):
"""
logger = logging.getLogger(__name__)
r = get_next_redis(config)
r.flushdb()
# first batch of subtasks: refresh cached opsdb data
subtasks = [
update_junosspace_device_list.apply_async(),
......@@ -457,11 +465,86 @@ def launch_refresh_cache_all(config):
update_equipment_locations.apply_async(),
]
for hostname in _derive_router_hostnames(config):
logger.debug(
'queueing router refresh jobs for %r' % hostname)
logger.debug('queueing router refresh jobs for %r' % hostname)
subtasks.append(reload_router_config.apply_async(args=[hostname]))
return [x.id for x in subtasks]
pending_task_ids = [x.id for x in subtasks]
t = refresh_finalizer.apply_async(args=[json.dumps(pending_task_ids)])
pending_task_ids.append(t.id)
return pending_task_ids
def _wait_for_tasks(task_ids, update_callback=lambda s: None):
start_time = time.time()
while task_ids and time.time() - start_time < FINALIZER_TIMEOUT_S:
update_callback('waiting for tasks to complete: %r' % task_ids)
time.sleep(FINALIZER_POLLING_FREQUENCY_S)
task_ids = [
id for id in task_ids
if not check_task_status(id)['ready']
]
if task_ids:
raise InventoryTaskError(
'timeout waiting for pending tasks to complete')
update_callback('pending taskscompleted in {} seconds'.format(
time.time() - start_time))
@app.task(base=InventoryTask, bind=True)
def refresh_finalizer(self, pending_task_ids_json):
logger = logging.getLogger(__name__)
logger.debug('>>> refresh_finalizer')
logger.debug('task_ids: %r' % pending_task_ids_json)
input_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "array",
"items": {"type": "string"}
}
def _update(s):
logger.debug(s)
self.update_state(
state=states.STARTED,
meta={
'task': 'refresh_finalizer',
'message': s
})
task_ids = json.loads(pending_task_ids_json)
logger.debug('task_ids: %r' % task_ids)
jsonschema.validate(task_ids, input_schema)
_wait_for_tasks(task_ids, update_callback=_update)
_build_subnet_db(update_callback=_update)
_update('latching current/next dbs')
latch_db(InventoryTask.config)
logger.debug('<<< refresh_finalizer')
def _build_subnet_db(update_callback=lambda s: None):
r = get_next_redis(InventoryTask.config)
update_callback('loading all network addresses')
subnets = {}
for k in r.scan_iter('reverse_interface_addresses:*'):
info = r.get(k.decode('utf-8')).decode('utf-8')
info = json.loads(info)
entry = subnets.setdefault('subnet', [])
entry.append({
'interface name': info['interface name'],
'router': info['router']
})
update_callback('saving {} subnets'.format(len(subnets)))
for k, v in subnets.items():
r.set('subnets:' + k, json.dumps(v))
def check_task_status(task_id):
......
......@@ -14,6 +14,7 @@ commands =
coverage run --source inventory_provider -m py.test {posargs}
coverage xml
coverage html
coverage report --fail-under 80
coverage report --fail-under 75
# coverage report --fail-under 80
flake8
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment