From ad5ba97208c75704bf95fa62831f674e224302a5 Mon Sep 17 00:00:00 2001 From: Robert Latta <robert.latta@geant.org> Date: Tue, 17 Aug 2021 07:52:35 +0000 Subject: [PATCH] set latch pending to false only after completion when errors occur --- inventory_provider/tasks/worker.py | 115 +++++++++++++++++------------ 1 file changed, 66 insertions(+), 49 deletions(-) diff --git a/inventory_provider/tasks/worker.py b/inventory_provider/tasks/worker.py index c08bad8c..9590db91 100644 --- a/inventory_provider/tasks/worker.py +++ b/inventory_provider/tasks/worker.py @@ -1169,13 +1169,11 @@ def update_entry_point(self): ( ims_task.s().on_error(task_error_handler.s()), chord( - (reload_router_config_chorded.s(r).on_error( - task_error_handler.s()) for r in routers), + (reload_router_config_chorded.s(r) for r in routers), empty_task.si('router tasks complete') ), chord( - (reload_lab_router_config_chorded.s(r).on_error( - task_error_handler.s()) for r in lab_routers), + (reload_lab_router_config_chorded.s(r) for r in lab_routers), empty_task.si('lab router tasks complete') ) ), @@ -1197,7 +1195,7 @@ def empty_task(self, message): logger.warning(f'message from empty task: {message}') -# updated with tramsaction +# updated with transaction def _erase_next_db_chorded(config): """ flush next db, but first save latch and then restore afterwards @@ -1236,60 +1234,69 @@ def _erase_next_db_chorded(config): @app.task(base=InventoryTask, bind=True, name='reload_lab_router_config') @log_task_entry_and_exit def reload_lab_router_config_chorded(self, hostname): - self.log_info(f'loading netconf data for lab {hostname} RL') + try: + self.log_info(f'loading netconf data for lab {hostname} RL') - # load new netconf data, in this thread - netconf_str = retrieve_and_persist_netconf_config(hostname, lab=True) - netconf_doc = etree.fromstring(netconf_str) + # load new netconf data, in this thread + netconf_str = retrieve_and_persist_netconf_config( + hostname, lab=True, update_callback=self.log_warning) + netconf_doc = etree.fromstring(netconf_str) - refresh_juniper_interface_list(hostname, netconf_doc, lab=True) + refresh_juniper_interface_list(hostname, netconf_doc, lab=True) - # load snmp indexes - community = juniper.snmp_community_string(netconf_doc) - if not community: - raise InventoryTaskError( - f'error extracting community string for {hostname}') - else: - self.log_info(f'refreshing snmp interface indexes for {hostname}') - logical_systems = juniper.logical_systems(netconf_doc) + # load snmp indexes + community = juniper.snmp_community_string(netconf_doc) + if not community: + raise InventoryTaskError( + f'error extracting community string for {hostname}') + else: + self.log_info(f'refreshing snmp interface indexes for {hostname}') + logical_systems = juniper.logical_systems(netconf_doc) - # load snmp data, in this thread - snmp_refresh_interfaces(hostname, community, logical_systems) + # load snmp data, in this thread + snmp_refresh_interfaces(hostname, community, logical_systems) - self.log_info(f'updated configuration for lab {hostname}') + self.log_info(f'updated configuration for lab {hostname}') + except Exception as e: + logger.error(e) + update_latch_status(InventoryTask.config, pending=True, failure=True) # updated @app.task(base=InventoryTask, bind=True, name='reload_router_config') @log_task_entry_and_exit def reload_router_config_chorded(self, hostname): - self.log_info(f'loading netconf data for {hostname} RL') - netconf_str = retrieve_and_persist_netconf_config( - hostname, self.log_warning) + try: + self.log_info(f'loading netconf data for {hostname} RL') + netconf_str = retrieve_and_persist_netconf_config( + hostname, update_callback=self.log_warning) - netconf_doc = etree.fromstring(netconf_str) + netconf_doc = etree.fromstring(netconf_str) - # clear cached classifier responses for this router, and - # refresh peering data - logger.info(f'refreshing peers & clearing cache for {hostname}') - refresh_juniper_bgp_peers(hostname, netconf_doc) - refresh_juniper_interface_list(hostname, netconf_doc) + # clear cached classifier responses for this router, and + # refresh peering data + logger.info(f'refreshing peers & clearing cache for {hostname}') + refresh_juniper_bgp_peers(hostname, netconf_doc) + refresh_juniper_interface_list(hostname, netconf_doc) - # load snmp indexes - community = juniper.snmp_community_string(netconf_doc) - if not community: - raise InventoryTaskError( - f'error extracting community string for {hostname}') - else: - self.log_info(f'refreshing snmp interface indexes for {hostname}') - logical_systems = juniper.logical_systems(netconf_doc) + # load snmp indexes + community = juniper.snmp_community_string(netconf_doc) + if not community: + raise InventoryTaskError( + f'error extracting community string for {hostname}') + else: + self.log_info(f'refreshing snmp interface indexes for {hostname}') + logical_systems = juniper.logical_systems(netconf_doc) - # load snmp data, in this thread - snmp_refresh_interfaces_chorded( - hostname, community, logical_systems, self.log_info) - snmp_refresh_peerings_chorded(hostname, community, logical_systems) + # load snmp data, in this thread + snmp_refresh_interfaces_chorded( + hostname, community, logical_systems, self.log_info) + snmp_refresh_peerings_chorded(hostname, community, logical_systems) - logger.info(f'updated configuration for {hostname}') + logger.info(f'updated configuration for {hostname}') + except Exception as e: + logger.error(e) + update_latch_status(InventoryTask.config, pending=True, failure=True) # new @@ -1303,7 +1310,7 @@ def retrieve_and_persist_netconf_config( netconf_doc = juniper.load_config( hostname, InventoryTask.config["ssh"]) netconf_str = etree.tostring(netconf_doc, encoding='unicode') - except (ConnectionError, juniper.NetconfHandlingError): + except (ConnectionError, juniper.NetconfHandlingError, InventoryTaskError): msg = f'error loading netconf data from {hostname}' logger.exception(msg) update_callback(msg) @@ -1311,6 +1318,7 @@ def retrieve_and_persist_netconf_config( netconf_str = r.get(redis_key) if not netconf_str: + update_callback(f'no cached netconf for {redis_key}') raise InventoryTaskError( f'netconf error with {hostname}' f' and no cached netconf data found') @@ -1394,11 +1402,15 @@ def snmp_refresh_peerings_chorded( @log_task_entry_and_exit def ims_task(self, use_current=False): - extracted_data = extract_ims_data() - transformed_data = transform_ims_data(extracted_data) - transformed_data['locations'] = extracted_data['locations'] - transformed_data['lg_routers'] = extracted_data['lg_routers'] - persist_ims_data(transformed_data, use_current) + try: + extracted_data = extract_ims_data() + transformed_data = transform_ims_data(extracted_data) + transformed_data['locations'] = extracted_data['locations'] + transformed_data['lg_routers'] = extracted_data['lg_routers'] + persist_ims_data(transformed_data, use_current) + except Exception as e: + logger.error(e) + update_latch_status(InventoryTask.config, pending=True, failure=True) # new @@ -1767,6 +1779,11 @@ def persist_ims_data(data, use_current=False): @log_task_entry_and_exit def final_task(self): + r = get_current_redis(InventoryTask.config) + latch = get_latch(r) + if latch['failure']: + raise InventoryTaskError('Sub-task failed - check logs for details') + _build_subnet_db(update_callback=self.log_info) _build_snmp_peering_db(update_callback=self.log_info) _build_juniper_peering_db(update_callback=self.log_info) -- GitLab