From 447d84eae8afaa95afac43ca033e2b346288de37 Mon Sep 17 00:00:00 2001 From: Robert Latta <robert.latta@geant.org> Date: Wed, 27 Jan 2021 09:27:58 +0000 Subject: [PATCH] Added complete inventory update task --- inventory_provider/tasks/ims_worker.py | 86 +++++++++++++++++++++++--- 1 file changed, 77 insertions(+), 9 deletions(-) diff --git a/inventory_provider/tasks/ims_worker.py b/inventory_provider/tasks/ims_worker.py index d975a57d..80957658 100644 --- a/inventory_provider/tasks/ims_worker.py +++ b/inventory_provider/tasks/ims_worker.py @@ -9,26 +9,97 @@ from enum import IntFlag from pathlib import Path from uuid import uuid4 +from kombu.exceptions import KombuError +from redis import RedisError + from inventory_provider.db import ims_data from inventory_provider.db.ims import IMS -from inventory_provider import environment +from inventory_provider import environment, config +from inventory_provider.tasks import monitor, data from inventory_provider.tasks.app import app -from inventory_provider.tasks.common import get_current_redis, get_next_redis +from inventory_provider.tasks.common import get_current_redis, \ + get_next_redis, update_latch_status, get_latch, set_latch from inventory_provider.tasks.worker import InventoryTask, \ - log_task_entry_and_exit + log_task_entry_and_exit, import_unmanaged_interfaces, \ + reload_router_config, refresh_finalizer, update_neteng_managed_device_list, \ + _erase_next_db environment.setup_logging() logger = logging.getLogger(__name__) +@log_task_entry_and_exit +def launch_refresh_cache_all(config): + """ + utility function intended to be called outside of the worker process + :param config: config structure as defined in config.py + :return: + """ + + try: + _erase_next_db(config) + update_latch_status(config, pending=True) + + monitor.clear_joblog(get_current_redis(config)) + + # first batch of subtasks: refresh cached IMS data + subtasks = [ + update_neteng_managed_device_list.apply_async(), + update_circuit_hierarchy_ims.apply_async(), + update_fibre_spans_ims.apply_async(), + update_interfaces_to_port_ids_ims.apply_async(), + update_port_ids_to_services_ims.apply_async(), + ] + [x.get() for x in subtasks] + + # now launch the task whose only purpose is to + # act as a convenient parent for all of the remaining tasks + t = internal_refresh_phase_2_ims.apply_async() + return t.id + + except (RedisError, KombuError): + update_latch_status(config, pending=False, failure=True) + logger.exception('error launching refresh subtasks') + raise + + +@app.task(base=InventoryTask, bind=True, name='internal_refresh_phase_2_ims') +@log_task_entry_and_exit +def internal_refresh_phase_2_ims(self): + # second batch of subtasks: + # alarms db status cache + # juniper netconf & snmp data + try: + + subtasks = [ + update_equipment_locations_ims.apply_async(), + update_lg_routers_ims.apply_async(), + import_unmanaged_interfaces.apply_async() + ] + for hostname in data.derive_router_hostnames(InventoryTask.config): + logger.debug(f'queueing router refresh jobs for {hostname}') + subtasks.append(reload_router_config.apply_async(args=[hostname])) + + pending_task_ids = [x.id for x in subtasks] + + refresh_finalizer.apply_async(args=[json.dumps(pending_task_ids)]) + + except KombuError: + # TODO: possible race condition here + # e.g. if one of these tasks takes a long time and another + # update is started, we could end up with strange data + update_latch_status(config, pending=False, failure=True) + logger.exception('error launching refresh phase 2 subtasks') + raise + + @app.task(base=InventoryTask, bind=True, name='update_fibre_spans_ims') @log_task_entry_and_exit -def update_fibre_spans(self, use_current=False): +def update_fibre_spans_ims(self, use_current=False): if use_current: r = get_current_redis(InventoryTask.config) - # scan with bigger batches, to mitigate network latency effects else: r = get_next_redis(InventoryTask.config) rp = r.pipeline() @@ -96,7 +167,6 @@ def update_port_ids_to_services_ims(self, use_current=False): if use_current: r = get_current_redis(InventoryTask.config) - # scan with bigger batches, to mitigate network latency effects else: r = get_next_redis(InventoryTask.config) @@ -120,11 +190,10 @@ def update_circuit_hierarchy_ims(self, use_current=False): if use_current: r = get_current_redis(InventoryTask.config) - # scan with bigger batches, to mitigate network latency effects else: r = get_next_redis(InventoryTask.config) rp = r.pipeline() - for k in r.scan_iter('ims:circuit_hierarchy:*', count=1000): + for k in r.scan_iter('ims:circuit_hierarchy:*', count=2000): rp.delete(k) rp.execute() @@ -143,7 +212,6 @@ def update_equipment_locations_ims(self, use_current=False): if use_current: r = get_current_redis(InventoryTask.config) - # scan with bigger batches, to mitigate network latency effects else: r = get_next_redis(InventoryTask.config) rp = r.pipeline() -- GitLab