diff --git a/inventory_provider/routes/jobs.py b/inventory_provider/routes/jobs.py index 81e6501f7a4ef335476c3c3f18177ed9257cd07e..c26ab6a588041272a855065d38d13e1d028b4474 100644 --- a/inventory_provider/routes/jobs.py +++ b/inventory_provider/routes/jobs.py @@ -13,11 +13,11 @@ def update(): @routes.route("update-interface-statuses") def update_interface_statuses(): - worker.update_interface_statuses().async_start() + worker.update_interface_statuses.delay() return Response("OK") @routes.route("reload-router-config/<equipment_name>") def reload_router_config(equipment_name): - worker.reload_router_config().async_start(equipment_name) + worker.reload_router_config.delay(equipment_name) return Response("OK") diff --git a/inventory_provider/tasks/worker.py b/inventory_provider/tasks/worker.py index 73b4468a46679d34697985efd34578f455cc6a88..cdfc648fff7e5d261919f4fdda0a99b4420eb4ae 100644 --- a/inventory_provider/tasks/worker.py +++ b/inventory_provider/tasks/worker.py @@ -238,38 +238,40 @@ def clear_cached_classifier_responses(hostname): r.delete(k) -def refresh_ix_public_peers(hostname, netconf): +def _refresh_peers(hostname, key_base, peers): task_logger = logging.getLogger(constants.TASK_LOGGER_NAME) task_logger.debug( - 'removing cached ix public peers for %r' % hostname) + 'removing cached %s for %r' % (key_base, hostname)) r = get_redis(InventoryTask.config) - for k in r.keys('ix_public_peer:*'): - value = json.loads(r.get(k.decode('utf-8')).decode('utf-8')) - if value['router'] == hostname: - r.delete(k) - - for peer in juniper.ix_public_peers(netconf): + for k in r.keys(key_base + ':*'): + # potential race condition: another proc could have + # delete this element between the time we read the + # keys and the next statement ... check for None below + value = r.get(k.decode('utf-8')) + if value: + value = json.loads(value.decode('utf-8')) + if value['router'] == hostname: + r.delete(k) + + for peer in peers: peer['router'] = hostname r.set( - 'ix_public_peer:' + peer['name'], + '%s:%s' % (key_base, peer['name']), json.dumps(peer)) -def refresh_vpn_rr_peers(hostname, netconf): - task_logger = logging.getLogger(constants.TASK_LOGGER_NAME) - task_logger.debug( - 'removing cached vpn rr for %r' % hostname) - r = get_redis(InventoryTask.config) - for k in r.keys('vpn_rr_peer:*'): - value = json.loads(r.get(k.decode('utf-8')).decode('utf-8')) - if value['router'] == hostname: - r.delete(k) +def refresh_ix_public_peers(hostname, netconf): + _refresh_peers( + hostname, + 'ix_public_peer', + juniper.ix_public_peers(netconf)) - for peer in juniper.vpn_rr_peers(netconf): - peer['router'] = hostname - r.set( - 'vpn_rr_peer:' + peer['name'], - json.dumps(peer)) + +def refresh_vpn_rr_peers(hostname, netconf): + _refresh_peers( + hostname, + 'vpn_rr_peer', + juniper.vpn_rr_peers(netconf)) @app.task @@ -277,7 +279,7 @@ def reload_router_config(hostname): task_logger = logging.getLogger(constants.TASK_LOGGER_NAME) task_logger.debug('>>> update_router_config') - netconf_refresh_config.apply(hostname) + netconf_refresh_config.apply(args=[hostname]) netconf_doc = load_netconf_data(hostname) if netconf_doc is None: @@ -292,7 +294,7 @@ def reload_router_config(hostname): task_logger.error( 'error extracting community string for %r' % hostname) else: - snmp_refresh_interfaces.apply(args=(hostname, community)) + snmp_refresh_interfaces.apply(args=[hostname, community]) # TODO: move this out of else? (i.e. clear even if netconf fails?) clear_cached_classifier_responses(hostname)