diff --git a/inventory_provider/db/opsdb.py b/inventory_provider/db/opsdb.py index e41748fd1c7472fdc2ac59914ad3c73d3108a55f..081e7a11f8765b969ae02d78335d243821989d12 100644 --- a/inventory_provider/db/opsdb.py +++ b/inventory_provider/db/opsdb.py @@ -111,7 +111,7 @@ WHERE def get_circuits(connection): _sql = """ -SELECT * +SELECT equipment, other_end_equipment FROM (SELECT c.absid AS id, c.name, @@ -197,6 +197,7 @@ WHERE AND equipment != '' AND circuit_type IN ( 'path', 'service', 'l2circuit', 'link-aggr-group') + and (equipment like '%tal.ee%' or other_end_equipment like '%tal.ee%') ORDER BY name, FIELD(status, diff --git a/inventory_provider/tasks/worker.py b/inventory_provider/tasks/worker.py index b9e48a8c46a1975667d4321f4f0004005d64b352..9189a9cd0b8552d9604f03030256d44bca9f4e5f 100644 --- a/inventory_provider/tasks/worker.py +++ b/inventory_provider/tasks/worker.py @@ -5,6 +5,7 @@ import os import time from redis.exceptions import RedisError +from kombu.exceptions import KombuError from celery import Task, states from celery.result import AsyncResult @@ -535,20 +536,29 @@ def internal_refresh_phase_2(self): # second batch of subtasks: # alarms db status cache # juniper netconf & snmp data - subtasks = [ - update_equipment_locations.apply_async(), - update_lg_routers.apply_async(), - update_access_services.apply_async(), - import_unmanaged_interfaces.apply_async() - ] - for hostname in data.derive_router_hostnames(InventoryTask.config): - logger.debug('queueing router refresh jobs for %r' % hostname) - subtasks.append(reload_router_config.apply_async(args=[hostname])) + try: + + subtasks = [ + update_equipment_locations.apply_async(), + update_lg_routers.apply_async(), + update_access_services.apply_async(), + import_unmanaged_interfaces.apply_async() + ] + for hostname in data.derive_router_hostnames(InventoryTask.config): + logger.debug('queueing router refresh jobs for %r' % hostname) + subtasks.append(reload_router_config.apply_async(args=[hostname])) + + pending_task_ids = [x.id for x in subtasks] - pending_task_ids = [x.id for x in subtasks] + refresh_finalizer.apply_async(args=[json.dumps(pending_task_ids)]) - t = refresh_finalizer.apply_async(args=[json.dumps(pending_task_ids)]) - pending_task_ids.append(t.id) + except KombuError: + # TODO: possible race condition here + # e.g. if one of these tasks takes a long time and another + # update is started, we could end up with strange data + update_latch_status(config, pending=False, failure=True) + logger.exception('error launching refresh phase 2 subtasks') + raise @log_task_entry_and_exit @@ -579,7 +589,7 @@ def launch_refresh_cache_all(config): t = internal_refresh_phase_2.apply_async() return t.id - except RedisError: + except (RedisError, KombuError): update_latch_status(config, pending=False, failure=True) logger.exception('error launching refresh subtasks') raise @@ -647,7 +657,8 @@ def refresh_finalizer(self, pending_task_ids_json): except (jsonschema.ValidationError, json.JSONDecodeError, InventoryTaskError, - RedisError) as e: + RedisError, + KombuError) as e: update_latch_status(InventoryTask.config, failure=True) raise e