Skip to content
Snippets Groups Projects
Commit 681f39b9 authored by Erik Reid's avatar Erik Reid
Browse files

catch KombuError and set update status

parent 9faf8327
No related branches found
No related tags found
No related merge requests found
...@@ -111,7 +111,7 @@ WHERE ...@@ -111,7 +111,7 @@ WHERE
def get_circuits(connection): def get_circuits(connection):
_sql = """ _sql = """
SELECT * SELECT equipment, other_end_equipment
FROM (SELECT FROM (SELECT
c.absid AS id, c.absid AS id,
c.name, c.name,
...@@ -197,6 +197,7 @@ WHERE ...@@ -197,6 +197,7 @@ WHERE
AND equipment != '' AND equipment != ''
AND circuit_type IN ( AND circuit_type IN (
'path', 'service', 'l2circuit', 'link-aggr-group') 'path', 'service', 'l2circuit', 'link-aggr-group')
and (equipment like '%tal.ee%' or other_end_equipment like '%tal.ee%')
ORDER BY ORDER BY
name, name,
FIELD(status, FIELD(status,
......
...@@ -5,6 +5,7 @@ import os ...@@ -5,6 +5,7 @@ import os
import time import time
from redis.exceptions import RedisError from redis.exceptions import RedisError
from kombu.exceptions import KombuError
from celery import Task, states from celery import Task, states
from celery.result import AsyncResult from celery.result import AsyncResult
...@@ -535,20 +536,29 @@ def internal_refresh_phase_2(self): ...@@ -535,20 +536,29 @@ def internal_refresh_phase_2(self):
# second batch of subtasks: # second batch of subtasks:
# alarms db status cache # alarms db status cache
# juniper netconf & snmp data # juniper netconf & snmp data
subtasks = [ try:
update_equipment_locations.apply_async(),
update_lg_routers.apply_async(), subtasks = [
update_access_services.apply_async(), update_equipment_locations.apply_async(),
import_unmanaged_interfaces.apply_async() update_lg_routers.apply_async(),
] update_access_services.apply_async(),
for hostname in data.derive_router_hostnames(InventoryTask.config): import_unmanaged_interfaces.apply_async()
logger.debug('queueing router refresh jobs for %r' % hostname) ]
subtasks.append(reload_router_config.apply_async(args=[hostname])) for hostname in data.derive_router_hostnames(InventoryTask.config):
logger.debug('queueing router refresh jobs for %r' % hostname)
subtasks.append(reload_router_config.apply_async(args=[hostname]))
pending_task_ids = [x.id for x in subtasks]
pending_task_ids = [x.id for x in subtasks] refresh_finalizer.apply_async(args=[json.dumps(pending_task_ids)])
t = refresh_finalizer.apply_async(args=[json.dumps(pending_task_ids)]) except KombuError:
pending_task_ids.append(t.id) # TODO: possible race condition here
# e.g. if one of these tasks takes a long time and another
# update is started, we could end up with strange data
update_latch_status(config, pending=False, failure=True)
logger.exception('error launching refresh phase 2 subtasks')
raise
@log_task_entry_and_exit @log_task_entry_and_exit
...@@ -579,7 +589,7 @@ def launch_refresh_cache_all(config): ...@@ -579,7 +589,7 @@ def launch_refresh_cache_all(config):
t = internal_refresh_phase_2.apply_async() t = internal_refresh_phase_2.apply_async()
return t.id return t.id
except RedisError: except (RedisError, KombuError):
update_latch_status(config, pending=False, failure=True) update_latch_status(config, pending=False, failure=True)
logger.exception('error launching refresh subtasks') logger.exception('error launching refresh subtasks')
raise raise
...@@ -647,7 +657,8 @@ def refresh_finalizer(self, pending_task_ids_json): ...@@ -647,7 +657,8 @@ def refresh_finalizer(self, pending_task_ids_json):
except (jsonschema.ValidationError, except (jsonschema.ValidationError,
json.JSONDecodeError, json.JSONDecodeError,
InventoryTaskError, InventoryTaskError,
RedisError) as e: RedisError,
KombuError) as e:
update_latch_status(InventoryTask.config, failure=True) update_latch_status(InventoryTask.config, failure=True)
raise e raise e
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment