import json import logging import queue import random import threading from distutils.util import strtobool import jsonschema from flask import Blueprint, current_app, jsonify, Response, request from inventory_provider.tasks import common as tasks_common from inventory_provider.tasks import worker from inventory_provider.routes import common from inventory_provider.tasks.common import get_current_redis, get_latch routes = Blueprint("inventory-data-job-routes", __name__) logger = logging.getLogger(__name__) @routes.after_request def after_request(resp): return common.after_request(resp) @routes.route("/update", methods=['GET', 'POST']) @common.require_accepts_json def update(): force = request.args.get('force', default='false', type=str) try: force = strtobool(force) except ValueError: force = False config = current_app.config['INVENTORY_PROVIDER_CONFIG'] r = get_current_redis(config) if not force: latch = get_latch(r) if latch and latch['pending']: return Response( response='an update is already in progress', status=503, mimetype="text/html") phase2_task_id = worker.launch_refresh_cache_all(config) r.set('classifier-cache:update-task-id', phase2_task_id.encode('utf-8')) return jsonify({'task id': phase2_task_id}) @routes.route("reload-router-config/<equipment_name>", methods=['GET', 'POST']) @common.require_accepts_json def reload_router_config(equipment_name): result = worker.reload_router_config.delay(equipment_name) return jsonify({'task id': result.id}) @routes.route("check-task-status/<task_id>", methods=['GET', 'POST']) @common.require_accepts_json def check_task_status(task_id): return jsonify(list(worker.check_task_status(task_id))) @routes.route("check-update-status", methods=['GET', 'POST']) @common.require_accepts_json def check_update_status(): r = common.get_current_redis() task_id = r.get('classifier-cache:update-task-id') if not task_id: return Response( response='no pending update task found', status=404, mimetype="text/html") task_id = task_id.decode('utf-8') return jsonify(list(worker.check_task_status(task_id))) LOG_ENTRY_SCHEMA = { "$schema": "http://json-schema.org/draft-07/schema#", "type": "object", "properties": { "uuid": {"type": "string"}, "type": {"type": "string"}, "clock": {"type": "integer"} }, "required": ["uuid", "type"], "additionalProperties": True } def _redis_client_proc(key_queue, value_queue, params): try: r = tasks_common.get_current_redis(params) while True: key = key_queue.get() # contract is that None means no more requests if not key: break value = r.get(key).decode('utf-8') value = json.loads(value) jsonschema.validate(value, LOG_ENTRY_SCHEMA) value_queue.put(value) except (json.JSONDecodeError, jsonschema.ValidationError): logger.exception(f'error decoding entry for {key}') finally: # contract is to return None when finished value_queue.put(None) @routes.route("log", methods=['GET', 'POST']) @common.require_accepts_json def load_task_log(): response_queue = queue.Queue() threads = [] for _ in range(10): q = queue.Queue() t = threading.Thread( target=_redis_client_proc, args=[ q, response_queue, current_app.config['INVENTORY_PROVIDER_CONFIG'] ]) t.start() threads.append({'thread': t, 'queue': q}) r = common.get_current_redis() for k in r.scan_iter('joblog:*'): t = random.choice(threads) t['queue'].put(k.decode('utf-8')) # tell all threads there are no more keys coming for t in threads: t['queue'].put(None) num_finished = 0 tasks = {} while num_finished < len(threads): value = response_queue.get() if not value: num_finished += 1 logger.debug('one worker thread finished') continue info = tasks.setdefault(value['uuid'], {}) if value['type'] in ['task-info', 'task-warning', 'task-error']: info.setdefault(value['type'], []).append(value) else: info[value['type']] = value for t in threads: t['thread'].join(timeout=0.5) # timeout, for sanity return jsonify(tasks)