Skip to content
Snippets Groups Projects
jobs.py 7.39 KiB
import json
import logging

from distutils.util import strtobool
from flask import Blueprint, current_app, jsonify, Response, request

from inventory_provider.tasks import monitor
from inventory_provider.tasks import worker
from inventory_provider.routes import common
from inventory_provider.tasks.common import get_current_redis, get_latch

routes = Blueprint("inventory-data-job-routes", __name__)
logger = logging.getLogger(__name__)


TASK_ID_RESPONSE_SCHEMA = {
    "$schema": "http://json-schema.org/draft-07/schema#",
    "type": "object",
    "properties": {
        "task id": {"type": "string"}
    },
    "required": ["task id"],
    "additionalProperties": False
}

TASK_LOG_RESPONSE_SCHEMA = {
    "$schema": "http://json-schema.org/draft-07/schema#",
    "type": "object",
    "properties": {
      "pending": {"type": "array", "items": {"type": "string"}},
      "errors": {"type": "array", "items": {"type": "string"}},
      "failed": {"type": "array", "items": {"type": "string"}},
      "warnings": {"type": "array", "items": {"type": "string"}},
    },
    "required": ["pending", "errors", "failed", "warnings"],
    "additionalProperties": False
}

INDIVIDUAL_TASK_STATUS_RESPONSE_SCHEMA = {
    "$schema": "http://json-schema.org/draft-07/schema#",
    "definitions": {
        "task": {
            "type": "object",
            "properties": {
                "id": {"type": "string"},
                "status": {"type": "string"},
                "exception": {"type": "boolean"},
                "ready": {"type": "boolean"},
                "success": {"type": "boolean"},
                "result": {"type": ["object", "null"]},
                "parent": {"type": ["string", "null"]}
            },
            "required": [
                "id", "status", "exception", "ready", "success", "parent"],
            "additionalProperties": False
        }
    },

    "type": "array",
    "items": {"$ref": "#/definitions/task"}
}


# INDIVIDUAL_TASK_STATUS_RESPONSE_SCHEMA = {
#     "$schema": "http://json-schema.org/draft-07/schema#",
#     "type": "object",
#     "properties": {
#         "id": {"type": "string"},
#         "status": {"type": "string"},
#         "exception": {"type": "boolean"},
#         "ready": {"type": "boolean"},
#         "success": {"type": "boolean"},
#         "result": {"type": "object"}
#     },
#     "required": ["id", "status", "exception", "ready", "success"],
#     "additionalProperties": False
# }


@routes.after_request
def after_request(resp):
    return common.after_request(resp)


@routes.route("/update", methods=['GET', 'POST'])
@common.require_accepts_json
def update():
    """
    Handler for `/jobs/update`.

    This resource updates the inventory network data for juniper devices.
    The function completes asynchronously and a list of outstanding
    task id's is returned so the caller can
    use `/jobs/check-task-status` to determine when all jobs
    are finished.  The response will be formatted as follows:

    .. asjson:: inventory_provider.routes.jobs.TASK_ID_RESPONSE_SCHEMA

    :return:
    """
    force = request.args.get('force', default='false', type=str)
    try:
        force = strtobool(force)
    except ValueError:
        force = False

    config = current_app.config['INVENTORY_PROVIDER_CONFIG']
    r = get_current_redis(config)

    if not force:
        latch = get_latch(r)
        if latch and latch['pending']:
            return Response(
                response='an update is already in progress',
                status=503,
                mimetype="text/html")

    phase2_task_id = worker.launch_refresh_cache_all(config)

    r.set('classifier-cache:update-task-id', phase2_task_id.encode('utf-8'))
    return jsonify({'task id': phase2_task_id})


@routes.route("reload-router-config/<equipment_name>", methods=['GET', 'POST'])
@common.require_accepts_json
def reload_router_config(equipment_name):
    result = worker.reload_router_config.delay(equipment_name)
    return jsonify({'task id': result.id})


@routes.route("check-task-status/<task_id>", methods=['GET', 'POST'])
@common.require_accepts_json
def check_task_status(task_id):
    """
    Handler for /jobs/check-task-status/*`task-id`*

    This resource returns the current status of
    an asynchronous task started by `/jobs/update`
    or `jobs/reload-router-config`.  The return value
    will be formatted as follows:

    .. asjson::
       inventory_provider.routes.jobs.INDIVIDUAL_TASK_STATUS_RESPONSE_SCHEMA

    :param task_id:
    :return:
    """
    return jsonify(list(worker.check_task_status(task_id)))


@routes.route("check-update-status", methods=['GET', 'POST'])
@common.require_accepts_json
def check_update_status():
    r = common.get_current_redis()
    task_id = r.get('classifier-cache:update-task-id')
    if not task_id:
        return Response(
            response='no pending update task found',
            status=404,
            mimetype="text/html")

    task_id = task_id.decode('utf-8')
    return jsonify(list(worker.check_task_status(task_id)))


@routes.route("log", methods=['GET', 'POST'])
@common.require_accepts_json
def load_task_log():
    """
    Handler for `/jobs/log`.

    This resource returns the state of the previous (or current)
    tasks associated with a call to `/jobs/update`.  The response
    contains error or warning messages, if any were generated, and
    will be formatted according to the following schema:

    .. asjson:: inventory_provider.routes.jobs.TASK_LOG_RESPONSE_SCHEMA

    :return:
    """
    FINALIZATION_EVENTS = {'task-succeeded', 'task-failed', 'task-revoked'}

    config = current_app.config['INVENTORY_PROVIDER_CONFIG']
    r = get_current_redis(config)

    cache_key = 'joblog:cached-response'
    result = r.get(cache_key)
    if result:
        result = json.loads(result.decode('utf-8'))
    else:
        result = {
            'pending': [],
            'warnings': [],
            'errors': [],
            'failed': []
        }

        found_tasks = False
        for task in monitor.load_task_log(
                current_app.config['INVENTORY_PROVIDER_CONFIG']).values():

            found_tasks = True

            for event in task.get('task-warning', []):
                result['warnings'].append(event['message'])
            for event in task.get('task-error', []):
                result['errors'].append(event['message'])

            # build the description if task-received is available
            description = None
            if 'task-received' in task:
                event = task['task-received'][0]
                description = f'{event["name"]}{event["args"]}'

            if 'task-failed' in task:
                if not description:
                    logger.error('found task-failed event without'
                                 f'task-received: {task}')
                    description = task['task-failed'][0]['uuid']
                result['failed'].append(description)

            if 'task-started' in task:
                finished = set(task.keys()) & FINALIZATION_EVENTS
                if not finished:
                    if not description:
                        logger.error('found task-started event without'
                                     f'task-received: {task}')
                        description = task['task-started'][0]['uuid']
                    result['pending'].append(description)

        if found_tasks and not result['pending']:
            r.set(cache_key, json.dumps(result))

    return jsonify(result)