jobs.py 7.39 KiB
import json
import logging
from distutils.util import strtobool
from flask import Blueprint, current_app, jsonify, Response, request
from inventory_provider.tasks import monitor
from inventory_provider.tasks import worker
from inventory_provider.routes import common
from inventory_provider.tasks.common import get_current_redis, get_latch
routes = Blueprint("inventory-data-job-routes", __name__)
logger = logging.getLogger(__name__)
TASK_ID_RESPONSE_SCHEMA = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"task id": {"type": "string"}
},
"required": ["task id"],
"additionalProperties": False
}
TASK_LOG_RESPONSE_SCHEMA = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"pending": {"type": "array", "items": {"type": "string"}},
"errors": {"type": "array", "items": {"type": "string"}},
"failed": {"type": "array", "items": {"type": "string"}},
"warnings": {"type": "array", "items": {"type": "string"}},
},
"required": ["pending", "errors", "failed", "warnings"],
"additionalProperties": False
}
INDIVIDUAL_TASK_STATUS_RESPONSE_SCHEMA = {
"$schema": "http://json-schema.org/draft-07/schema#",
"definitions": {
"task": {
"type": "object",
"properties": {
"id": {"type": "string"},
"status": {"type": "string"},
"exception": {"type": "boolean"},
"ready": {"type": "boolean"},
"success": {"type": "boolean"},
"result": {"type": ["object", "null"]},
"parent": {"type": ["string", "null"]}
},
"required": [
"id", "status", "exception", "ready", "success", "parent"],
"additionalProperties": False
}
},
"type": "array",
"items": {"$ref": "#/definitions/task"}
}
# INDIVIDUAL_TASK_STATUS_RESPONSE_SCHEMA = {
# "$schema": "http://json-schema.org/draft-07/schema#",
# "type": "object",
# "properties": {
# "id": {"type": "string"},
# "status": {"type": "string"},
# "exception": {"type": "boolean"},
# "ready": {"type": "boolean"},
# "success": {"type": "boolean"},
# "result": {"type": "object"}
# },
# "required": ["id", "status", "exception", "ready", "success"],
# "additionalProperties": False
# }
@routes.after_request
def after_request(resp):
return common.after_request(resp)
@routes.route("/update", methods=['GET', 'POST'])
@common.require_accepts_json
def update():
"""
Handler for `/jobs/update`.
This resource updates the inventory network data for juniper devices.
The function completes asynchronously and a list of outstanding
task id's is returned so the caller can
use `/jobs/check-task-status` to determine when all jobs
are finished. The response will be formatted as follows:
.. asjson:: inventory_provider.routes.jobs.TASK_ID_RESPONSE_SCHEMA
:return:
"""
force = request.args.get('force', default='false', type=str)
try:
force = strtobool(force)
except ValueError:
force = False
config = current_app.config['INVENTORY_PROVIDER_CONFIG']
r = get_current_redis(config)
if not force:
latch = get_latch(r)
if latch and latch['pending']:
return Response(
response='an update is already in progress',
status=503,
mimetype="text/html")
phase2_task_id = worker.launch_refresh_cache_all(config)
r.set('classifier-cache:update-task-id', phase2_task_id.encode('utf-8'))
return jsonify({'task id': phase2_task_id})
@routes.route("reload-router-config/<equipment_name>", methods=['GET', 'POST'])
@common.require_accepts_json
def reload_router_config(equipment_name):
result = worker.reload_router_config.delay(equipment_name)
return jsonify({'task id': result.id})
@routes.route("check-task-status/<task_id>", methods=['GET', 'POST'])
@common.require_accepts_json
def check_task_status(task_id):
"""
Handler for /jobs/check-task-status/*`task-id`*
This resource returns the current status of
an asynchronous task started by `/jobs/update`
or `jobs/reload-router-config`. The return value
will be formatted as follows:
.. asjson::
inventory_provider.routes.jobs.INDIVIDUAL_TASK_STATUS_RESPONSE_SCHEMA
:param task_id:
:return:
"""
return jsonify(list(worker.check_task_status(task_id)))
@routes.route("check-update-status", methods=['GET', 'POST'])
@common.require_accepts_json
def check_update_status():
r = common.get_current_redis()
task_id = r.get('classifier-cache:update-task-id')
if not task_id:
return Response(
response='no pending update task found',
status=404,
mimetype="text/html")
task_id = task_id.decode('utf-8')
return jsonify(list(worker.check_task_status(task_id)))
@routes.route("log", methods=['GET', 'POST'])
@common.require_accepts_json
def load_task_log():
"""
Handler for `/jobs/log`.
This resource returns the state of the previous (or current)
tasks associated with a call to `/jobs/update`. The response
contains error or warning messages, if any were generated, and
will be formatted according to the following schema:
.. asjson:: inventory_provider.routes.jobs.TASK_LOG_RESPONSE_SCHEMA
:return:
"""
FINALIZATION_EVENTS = {'task-succeeded', 'task-failed', 'task-revoked'}
config = current_app.config['INVENTORY_PROVIDER_CONFIG']
r = get_current_redis(config)
cache_key = 'joblog:cached-response'
result = r.get(cache_key)
if result:
result = json.loads(result.decode('utf-8'))
else:
result = {
'pending': [],
'warnings': [],
'errors': [],
'failed': []
}
found_tasks = False
for task in monitor.load_task_log(
current_app.config['INVENTORY_PROVIDER_CONFIG']).values():
found_tasks = True
for event in task.get('task-warning', []):
result['warnings'].append(event['message'])
for event in task.get('task-error', []):
result['errors'].append(event['message'])
# build the description if task-received is available
description = None
if 'task-received' in task:
event = task['task-received'][0]
description = f'{event["name"]}{event["args"]}'
if 'task-failed' in task:
if not description:
logger.error('found task-failed event without'
f'task-received: {task}')
description = task['task-failed'][0]['uuid']
result['failed'].append(description)
if 'task-started' in task:
finished = set(task.keys()) & FINALIZATION_EVENTS
if not finished:
if not description:
logger.error('found task-started event without'
f'task-received: {task}')
description = task['task-started'][0]['uuid']
result['pending'].append(description)
if found_tasks and not result['pending']:
r.set(cache_key, json.dumps(result))
return jsonify(result)