Skip to content
Snippets Groups Projects
Commit 077dfbb0 authored by Erik Reid's avatar Erik Reid
Browse files

don't use async subtasks

parent 63d89bfe
No related branches found
No related tags found
No related merge requests found
import logging
from flask import Blueprint, Response
from flask import Blueprint, Response, current_app
from inventory_provider.tasks.app import app
from inventory_provider.tasks import worker
from inventory_provider.constants import TASK_LOGGER_NAME
routes = Blueprint("inventory-data-job-routes", __name__)
......@@ -11,11 +11,31 @@ routes = Blueprint("inventory-data-job-routes", __name__)
@routes.route("/update", methods=['GET', 'POST'])
def update():
task_logger = logging.getLogger(TASK_LOGGER_NAME)
task_logger.debug(
'launching task: '
'inventory_provider.tasks.worker.refresh_cache_all')
app.send_task(
'inventory_provider.tasks.worker.refresh_cache_all')
worker.start_refresh_cache_all(
current_app.config["INVENTORY_PROVIDER_CONFIG"])
#
#
# app.send_task(
# 'inventory_provider.tasks.worker.refresh_cache_all')
# db_subtasks = [
# update_junosspace_device_list.s(),
# update_inventory_system_cache.s()
# ]
#
# ch = (
# group(db_subtasks),
# _chain_separator_task.s(),
#
# )
#
#
# task_logger.debug(
# 'launching task: '
# 'inventory_provider.tasks.worker.refresh_cache_all')
# app.send_task(
# 'inventory_provider.tasks.worker.refresh_cache_all')
return Response("OK")
......
......@@ -3,7 +3,6 @@ import logging
import re
from celery import bootsteps, Task, group
from celery.result import allow_join_result
from collections import defaultdict
from lxml import etree
......@@ -25,7 +24,7 @@ environment.setup_logging()
class InventoryTask(Task):
config = None
logger = None
# logger = None
def __init__(self):
pass
......@@ -49,7 +48,7 @@ class InventoryTask(Task):
"sanity failure: expected string data as value"
r = get_redis(InventoryTask.config)
r.set(name=key, value=value)
InventoryTask.logger.debug("saved %s" % key)
# InventoryTask.logger.debug("saved %s" % key)
return "OK"
@staticmethod
......@@ -79,12 +78,19 @@ class InventoryTask(Task):
# etree.tostring(xml_doc, encoding='unicode'))
# def _wait_for_result(async_result):
# import time
# logger = logging.getLogger(constants.TASK_LOGGER_NAME)
# while not async_result.ready():
# logger.debug("async_result not ready ... wait")
# time.sleep(5.0)
# return async_result.get()
class WorkerArgs(bootsteps.Step):
def __init__(self, worker, config_filename, **options):
with open(config_filename) as f:
InventoryTask.config = config.load(f)
InventoryTask.logger = logging.getLogger(constants.TASK_LOGGER_NAME)
# InventoryTask.logger = logging.getLogger(constants.TASK_LOGGER_NAME)
# interfaces_key = "interface_services"
# equipment_locations_key = "equipment_locations"
......@@ -108,8 +114,8 @@ app.steps['worker'].add(WorkerArgs)
@app.task(bind=InventoryTask)
def snmp_refresh_interfaces(self, hostname, community):
logger = logging.getLogger(constants.TASK_LOGGER_NAME)
logger.debug('STARTING: snmp_refresh_interfaces(%r, %r)'
task_logger = logging.getLogger(constants.TASK_LOGGER_NAME)
task_logger.debug('>>> snmp_refresh_interfaces(%r, %r)'
% (hostname, community))
InventoryTask.save_value_json(
......@@ -119,20 +125,20 @@ def snmp_refresh_interfaces(self, hostname, community):
community,
InventoryTask.config)))
logger.debug('FINISHED: snmp_refresh_interfaces(%r, %r)'
task_logger.debug('<<< snmp_refresh_interfaces(%r, %r)'
% (hostname, community))
@app.task(bind=InventoryTask)
def netconf_refresh_config(self, hostname):
logger = logging.getLogger(constants.TASK_LOGGER_NAME)
logger.debug('STARTING: netconf_refresh_config(%r)' % hostname)
task_logger = logging.getLogger(constants.TASK_LOGGER_NAME)
task_logger.debug('>>> netconf_refresh_config(%r)' % hostname)
InventoryTask.save_value_etree(
'netconf:' + hostname,
juniper.load_config(hostname, InventoryTask.config["ssh"]))
logger.debug('FINISHED: netconf_refresh_config(%r)' % hostname)
task_logger.debug('<<< netconf_refresh_config(%r)' % hostname)
# @app.task(bind=InventoryTask)
......@@ -149,6 +155,9 @@ def netconf_refresh_config(self, hostname):
@app.task()
def update_interfaces_to_services():
task_logger = logging.getLogger(constants.TASK_LOGGER_NAME)
task_logger.debug('>>> update_interfaces_to_services')
interface_services = defaultdict(list)
with db.connection(InventoryTask.config["ops-db"]) as cx:
for service in opsdb.get_circuits(cx):
......@@ -164,9 +173,14 @@ def update_interfaces_to_services():
'opsdb:interface_services:' + equipment_interface,
json.dumps(services))
task_logger.debug('<<< update_interfaces_to_services')
@app.task()
def update_equipment_locations():
task_logger = logging.getLogger(constants.TASK_LOGGER_NAME)
task_logger.debug('>>> update_equipment_locations')
r = get_redis(InventoryTask.config)
for key in r.scan_iter('opsdb:location:*'):
r.delete(key)
......@@ -174,9 +188,14 @@ def update_equipment_locations():
for ld in opsdb.get_equipment_location_data(cx):
r.set('opsdb:location:%s' % ld['equipment_name'], json.dumps(ld))
task_logger.debug('<<< update_equipment_locations')
@app.task()
def update_circuit_hierarchy():
task_logger = logging.getLogger(constants.TASK_LOGGER_NAME)
task_logger.debug('>>> update_circuit_hierarchy')
# TODO: integers are not JSON keys
with db.connection(InventoryTask.config["ops-db"]) as cx:
child_to_parents = defaultdict(list)
......@@ -198,9 +217,14 @@ def update_circuit_hierarchy():
for cid, children in child_to_parents.items():
r.set('opsdb:services:children:%d' % cid, json.dumps(children))
task_logger.debug('<<< update_circuit_hierarchy')
@app.task()
def update_interface_statuses():
task_logger = logging.getLogger(constants.TASK_LOGGER_NAME)
task_logger.debug('>>> update_interface_statuses')
with db.connection(InventoryTask.config["ops-db"]) as cx:
services = opsdb.get_circuits(cx)
with db.connection(InventoryTask.config["alarms-db"]) as cx:
......@@ -214,39 +238,42 @@ def update_interface_statuses():
service["interface_name"])
InventoryTask.save_value(key, status)
task_logger.debug('<<< update_interface_statuses')
@app.task()
def update_inventory_system_cache():
logger = logging.getLogger(constants.TASK_LOGGER_NAME)
logger.error('HERE: update_inventory_system_cache')
g = group([
update_interfaces_to_services.s(),
update_circuit_hierarchy.s(),
update_equipment_locations.s(),
update_interface_statuses.s()
])
g.apply_async()
# @app.task()
# def update_inventory_system_cache():
# task_logger = logging.getLogger(constants.TASK_LOGGER_NAME)
# task_logger.debug('>>> update_inventory_system_cache')
#
# subtasks = [
# update_interfaces_to_services.s(),
# update_circuit_hierarchy.s(),
# update_equipment_locations.s(),
# # update_interface_statuses.s()
# ]
#
# group(subtasks).apply()
#
# task_logger.debug('<<< update_inventory_system_cache')
@app.task()
def update_junosspace_device_list():
logger = logging.getLogger(constants.TASK_LOGGER_NAME)
logger.error('HERE: update_junosspace_device_list')
task_logger = logging.getLogger(constants.TASK_LOGGER_NAME)
task_logger.debug('>>> update_junosspace_device_list')
r = get_redis(InventoryTask.config)
logger.error(InventoryTask.config["junosspace"])
for d in juniper.load_routers_from_junosspace(
InventoryTask.config["junosspace"]):
logger.error(d)
r.set(
'junosspace:' + d['hostname'],
json.dumps(d).encode('utf-8'))
task_logger.debug('<<< update_junosspace_device_list')
def _derive_router_hostnames(config):
def _derive_router_hostnames(config):
r = get_redis(config)
junosspace_equipment = set()
for k in r.keys('junosspace:*'):
......@@ -264,39 +291,96 @@ def _derive_router_hostnames(config):
return junosspace_equipment & opsdb_equipment
@app.task()
def refresh_cache_all():
# @app.task()
# def refresh_cache_for_router(hostname):
# task_logger = logging.getLogger(constants.TASK_LOGGER_NAME)
# task_logger.debug('>>> refresh_cache_for_router(%r)' % hostname)
#
# # TODO: !!!! extract community string from netconf data
# task_logger.error(
# 'TODO: !!!! extract community string from netconf data')
# subtasks = [
# netconf_refresh_config.s(hostname),
# snmp_refresh_interfaces.s(hostname, '0pBiFbD')
# ]
#
# group(subtasks).apply()
#
# # TODO: clear classifier cache
#
# task_logger.debug('<<< refresh_cache_for_router(%r)' % hostname)
# @app.task()
# def _chain_separator_task():
# """
# boilerplate in order to support groups as chord elements
# cf. https://stackoverflow.com/questions/15123772/celery-chaining-groups-and-subtasks-out-of-order-execution
# cf. http://docs.celeryproject.org/en/latest/userguide/canvas.html
# ('Chaining a group together with another task will automatically upgrade it to be a chord')
# :return:
# """ # noqa E501
# task_logger = logging.getLogger(constants.TASK_LOGGER_NAME)
# task_logger.debug('>>>_chain_separator_task<<<')
# pass
# @app.task()
# def refresh_cache_all():
# task_logger = logging.getLogger(constants.TASK_LOGGER_NAME)
# task_logger.debug('>>> refresh_cache_all')
#
# subtasks = [
# update_junosspace_device_list.s(),
# update_inventory_system_cache.s()
# ]
#
# group(subtasks).apply()
#
# subtasks = []
# for hostname in _derive_router_hostnames(InventoryTask.config):
# task_logger.debug(
# 'queueing refresh_cache_for_router for %r' % hostname)
# subtasks.append(refresh_cache_for_router.s(hostname))
#
# group(subtasks).apply()
#
# task_logger.debug('<<< refresh_cache_all')
def start_refresh_cache_all(config):
"""
utility function intended to be called outside of the worker process
:param config: config structure as defined in config.py
:return:
"""
task_logger = logging.getLogger(constants.TASK_LOGGER_NAME)
task_logger.debug('starting update_junosspace_device_list,'
'update_inventory_system_cache')
g = group([
# first batch of subtasks: refresh cached opsdb data
subtasks = [
update_junosspace_device_list.s(),
update_inventory_system_cache.s()
])
results = g.apply_async()
with allow_join_result():
results.join()
update_interfaces_to_services.s(),
update_circuit_hierarchy.s()
]
for hostname in _derive_router_hostnames(InventoryTask.config):
task_logger.info("fetching details for: %r" % hostname)
results = group(subtasks).apply_async()
results.join()
# second batch of subtasks:
# alarms db status cache
# juniper netconf & snmp data
subtasks = [
update_interface_statuses.s()
]
for hostname in _derive_router_hostnames(config):
task_logger.debug(
'launching task: '
'inventory_provider.tasks.worker.netconf_refresh_config'
'(%s)' % hostname)
app.send_task(
'inventory_provider.tasks.worker.netconf_refresh_config',
args=[hostname])
'queueing router refresh jobs for %r' % hostname)
task_logger.debug(
'launching task: '
'inventory_provider.tasks.worker.snmp_refresh_interfaces'
'(%s)' % hostname)
# TODO: !!!! extract community string from netconf data
app.send_task(
'inventory_provider.tasks.worker.snmp_refresh_interfaces',
args=[hostname, '0pBiFbD'])
task_logger.error(
'TODO: !!!! extract community string from netconf data')
subtasks.append(netconf_refresh_config.s(hostname))
# TODO: these should be synchronous, and then cleanup classifier cache
subtasks.append(snmp_refresh_interfaces.s(hostname, '0pBiFbD'))
return "OK"
return group(subtasks).apply_async()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment