jobs.py 3.35 KiB
import json
import logging
from distutils.util import strtobool
import jsonschema
from flask import Blueprint, current_app, jsonify, Response, request
from inventory_provider.tasks import worker
from inventory_provider.routes import common
from inventory_provider.tasks.common import get_current_redis, get_latch
routes = Blueprint("inventory-data-job-routes", __name__)
logger = logging.getLogger(__name__)
@routes.after_request
def after_request(resp):
return common.after_request(resp)
@routes.route("/update", methods=['GET', 'POST'])
@common.require_accepts_json
def update():
force = request.args.get('force', default='false', type=str)
try:
force = strtobool(force)
except ValueError:
force = False
config = current_app.config['INVENTORY_PROVIDER_CONFIG']
r = get_current_redis(config)
if not force:
latch = get_latch(r)
if latch and latch['pending']:
return Response(
response='an update is already in progress',
status=503,
mimetype="text/html")
phase2_task_id = worker.launch_refresh_cache_all(config)
r.set('classifier-cache:update-task-id', phase2_task_id.encode('utf-8'))
return jsonify({'task id': phase2_task_id})
@routes.route("reload-router-config/<equipment_name>", methods=['GET', 'POST'])
@common.require_accepts_json
def reload_router_config(equipment_name):
result = worker.reload_router_config.delay(equipment_name)
return jsonify({'task id': result.id})
@routes.route("check-task-status/<task_id>", methods=['GET', 'POST'])
@common.require_accepts_json
def check_task_status(task_id):
return jsonify(list(worker.check_task_status(task_id)))
@routes.route("check-update-status", methods=['GET', 'POST'])
@common.require_accepts_json
def check_update_status():
r = common.get_current_redis()
task_id = r.get('classifier-cache:update-task-id')
if not task_id:
return Response(
response='no pending update task found',
status=404,
mimetype="text/html")
task_id = task_id.decode('utf-8')
return jsonify(list(worker.check_task_status(task_id)))
LOG_ENTRY_SCHEMA = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"uuid": {"type": "string"},
"type": {"type": "string"},
"clock": {"type": "integer"}
},
"required": ["uuid", "type"],
"additionalProperties": True
}
@routes.route("log", methods=['GET', 'POST'])
@common.require_accepts_json
def load_task_log():
tasks = {}
# r = common.get_current_redis()
r = common.get_next_redis()
for k in r.scan_iter('joblog:*'):
value = r.get(k.decode('utf-8'))
if not value:
logger.error(f'no data for log entry: {k.decode("utf-8")}')
continue
try:
value = json.loads(value.decode('utf-8'))
jsonschema.validate(value, LOG_ENTRY_SCHEMA)
except (json.JSONDecodeError, jsonschema.ValidationError):
logger.exception('error decoding entry for {k.decode("utf-8")}')
continue
info = tasks.setdefault(value['uuid'], {})
if value['type'] in ['task-info', 'task-warning', 'task-error']:
info.setdefault(value['type'], []).append(value)
else:
info[value['type']] = value
return jsonify(tasks)