Skip to content
Snippets Groups Projects
Commit 447d84ea authored by Robert Latta's avatar Robert Latta
Browse files

Added complete inventory update task

parent 2c7b3de0
No related branches found
No related tags found
No related merge requests found
......@@ -9,26 +9,97 @@ from enum import IntFlag
from pathlib import Path
from uuid import uuid4
from kombu.exceptions import KombuError
from redis import RedisError
from inventory_provider.db import ims_data
from inventory_provider.db.ims import IMS
from inventory_provider import environment
from inventory_provider import environment, config
from inventory_provider.tasks import monitor, data
from inventory_provider.tasks.app import app
from inventory_provider.tasks.common import get_current_redis, get_next_redis
from inventory_provider.tasks.common import get_current_redis, \
get_next_redis, update_latch_status, get_latch, set_latch
from inventory_provider.tasks.worker import InventoryTask, \
log_task_entry_and_exit
log_task_entry_and_exit, import_unmanaged_interfaces, \
reload_router_config, refresh_finalizer, update_neteng_managed_device_list, \
_erase_next_db
environment.setup_logging()
logger = logging.getLogger(__name__)
@log_task_entry_and_exit
def launch_refresh_cache_all(config):
"""
utility function intended to be called outside of the worker process
:param config: config structure as defined in config.py
:return:
"""
try:
_erase_next_db(config)
update_latch_status(config, pending=True)
monitor.clear_joblog(get_current_redis(config))
# first batch of subtasks: refresh cached IMS data
subtasks = [
update_neteng_managed_device_list.apply_async(),
update_circuit_hierarchy_ims.apply_async(),
update_fibre_spans_ims.apply_async(),
update_interfaces_to_port_ids_ims.apply_async(),
update_port_ids_to_services_ims.apply_async(),
]
[x.get() for x in subtasks]
# now launch the task whose only purpose is to
# act as a convenient parent for all of the remaining tasks
t = internal_refresh_phase_2_ims.apply_async()
return t.id
except (RedisError, KombuError):
update_latch_status(config, pending=False, failure=True)
logger.exception('error launching refresh subtasks')
raise
@app.task(base=InventoryTask, bind=True, name='internal_refresh_phase_2_ims')
@log_task_entry_and_exit
def internal_refresh_phase_2_ims(self):
# second batch of subtasks:
# alarms db status cache
# juniper netconf & snmp data
try:
subtasks = [
update_equipment_locations_ims.apply_async(),
update_lg_routers_ims.apply_async(),
import_unmanaged_interfaces.apply_async()
]
for hostname in data.derive_router_hostnames(InventoryTask.config):
logger.debug(f'queueing router refresh jobs for {hostname}')
subtasks.append(reload_router_config.apply_async(args=[hostname]))
pending_task_ids = [x.id for x in subtasks]
refresh_finalizer.apply_async(args=[json.dumps(pending_task_ids)])
except KombuError:
# TODO: possible race condition here
# e.g. if one of these tasks takes a long time and another
# update is started, we could end up with strange data
update_latch_status(config, pending=False, failure=True)
logger.exception('error launching refresh phase 2 subtasks')
raise
@app.task(base=InventoryTask, bind=True, name='update_fibre_spans_ims')
@log_task_entry_and_exit
def update_fibre_spans(self, use_current=False):
def update_fibre_spans_ims(self, use_current=False):
if use_current:
r = get_current_redis(InventoryTask.config)
# scan with bigger batches, to mitigate network latency effects
else:
r = get_next_redis(InventoryTask.config)
rp = r.pipeline()
......@@ -96,7 +167,6 @@ def update_port_ids_to_services_ims(self, use_current=False):
if use_current:
r = get_current_redis(InventoryTask.config)
# scan with bigger batches, to mitigate network latency effects
else:
r = get_next_redis(InventoryTask.config)
......@@ -120,11 +190,10 @@ def update_circuit_hierarchy_ims(self, use_current=False):
if use_current:
r = get_current_redis(InventoryTask.config)
# scan with bigger batches, to mitigate network latency effects
else:
r = get_next_redis(InventoryTask.config)
rp = r.pipeline()
for k in r.scan_iter('ims:circuit_hierarchy:*', count=1000):
for k in r.scan_iter('ims:circuit_hierarchy:*', count=2000):
rp.delete(k)
rp.execute()
......@@ -143,7 +212,6 @@ def update_equipment_locations_ims(self, use_current=False):
if use_current:
r = get_current_redis(InventoryTask.config)
# scan with bigger batches, to mitigate network latency effects
else:
r = get_next_redis(InventoryTask.config)
rp = r.pipeline()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment