Skip to content
Snippets Groups Projects
test_job_routes.py 10.56 KiB
import json
import jsonschema

from inventory_provider.tasks.common import _get_redis, DB_LATCH_SCHEMA
from inventory_provider.routes.jobs \
    import INDIVIDUAL_TASK_STATUS_RESPONSE_SCHEMA, \
    TASK_LOG_RESPONSE_SCHEMA, TASK_ID_RESPONSE_SCHEMA

DEFAULT_REQUEST_HEADERS = {
    "Content-type": "application/json",
    "Accept": ["application/json"]
}


def backend_db():
    return _get_redis({
        'redis': {
            'hostname': None,
            'port': None
        },
        'redis-databases': [0, 7]
    }).db


def test_job_update_all(client, mocker):
    expected_task_id = 'xyz@123#456'
    launch_refresh_cache_all = mocker.patch(
        'inventory_provider.tasks.worker.launch_refresh_cache_all')
    launch_refresh_cache_all.return_value = expected_task_id

    rv = client.post(
        'orig-jobs/update',
        headers=DEFAULT_REQUEST_HEADERS)
    assert rv.status_code == 200
    refresh_task_response = json.loads(rv.data.decode('utf-8'))
    jsonschema.validate(refresh_task_response, TASK_ID_RESPONSE_SCHEMA)
    assert refresh_task_response['task id'] == expected_task_id

    db = backend_db()
    assert db['classifier-cache:update-task-id'] \
        == expected_task_id.encode('utf-8')


def test_job_update_force_pending(client, mocker):
    expected_task_id = 'asf#asdf%111'
    launch_refresh_cache_all = mocker.patch(
        'inventory_provider.tasks.worker.launch_refresh_cache_all')
    launch_refresh_cache_all.return_value = expected_task_id

    mocked_get_latch = mocker.patch(
        'inventory_provider.routes.jobs.get_latch')
    mocked_get_latch.return_value = {'pending': True}

    rv = client.post(
        'orig-jobs/update?force=true',
        headers=DEFAULT_REQUEST_HEADERS)
    assert rv.status_code == 200
    refresh_task_response = json.loads(rv.data.decode('utf-8'))
    jsonschema.validate(refresh_task_response, TASK_ID_RESPONSE_SCHEMA)
    assert refresh_task_response['task id'] == expected_task_id


def test_job_update_pending_force_false(client, mocker):
    def _assert_if_called(*args, **kwargs):
        assert False
    mocker.patch(
        'inventory_provider.tasks.worker.launch_refresh_cache_all',
        _assert_if_called)

    mocked_get_latch = mocker.patch(
        'inventory_provider.routes.jobs.get_latch')
    mocked_get_latch.return_value = {'pending': True}

    rv = client.post(
        'orig-jobs/update?force=no',
        headers=DEFAULT_REQUEST_HEADERS)
    assert rv.status_code == 503


def test_job_update_pending(client, mocker):
    def _assert_if_called(*args, **kwargs):
        assert False
    mocker.patch(
        'inventory_provider.tasks.worker.launch_refresh_cache_all',
        _assert_if_called)

    mocked_get_latch = mocker.patch(
        'inventory_provider.routes.jobs.get_latch')
    mocked_get_latch.return_value = {'pending': True}

    rv = client.post(
        'orig-jobs/update',
        headers=DEFAULT_REQUEST_HEADERS)
    assert rv.status_code == 503


class MockedAsyncResult(object):
    status = None
    result = None

    def __init__(self, id, app=None):
        self.id = id


def test_reload_router_config(client, mocker):
    delay_result = mocker.patch(
        'inventory_provider.tasks.worker.reload_router_config.delay')
    delay_result.return_value = MockedAsyncResult('bogus task id')

    rv = client.post(
        'orig-jobs/reload-router-config/ignored###123',
        headers=DEFAULT_REQUEST_HEADERS)
    assert rv.status_code == 200
    refresh_task_response = json.loads(rv.data.decode('utf-8'))
    jsonschema.validate(refresh_task_response, TASK_ID_RESPONSE_SCHEMA)
    assert refresh_task_response['task id'] == 'bogus task id'


def test_check_update_status(client, mocker):

    db = backend_db()
    db['classifier-cache:update-task-id'] = 'zz55'

    mocker.patch(
        'inventory_provider.tasks.worker.AsyncResult', MockedAsyncResult)
    MockedAsyncResult.status = 'SUCCESS'  # celery.states.SUCCESS
    MockedAsyncResult.result = {'absab': 1, 'def': 'aaabbb'}

    rv = client.post(
        'orig-jobs/check-update-status',
        headers=DEFAULT_REQUEST_HEADERS)
    assert rv.status_code == 200
    result = json.loads(rv.data.decode('utf-8'))
    jsonschema.validate(result, INDIVIDUAL_TASK_STATUS_RESPONSE_SCHEMA)
    for status in result:
        assert status['id'] == 'zz55'
        assert status['status'] == 'SUCCESS'
        assert not status['exception']
        assert status['ready']
        assert status['success']
        assert 'result' in status


def test_check_update_status_404(client):

    db = backend_db()
    db.pop('classifier-cache:update-task-id', None)

    rv = client.post(
        'orig-jobs/check-update-status',
        headers=DEFAULT_REQUEST_HEADERS)
    assert rv.status_code == 404


def test_check_task_status_success(client, mocker):
    mocker.patch(
        'inventory_provider.tasks.worker.AsyncResult', MockedAsyncResult)
    MockedAsyncResult.status = 'SUCCESS'  # celery.states.SUCCESS
    MockedAsyncResult.result = {'abc': 1, 'def': 'aaabbb'}

    rv = client.post(
        'orig-jobs/check-task-status/abc',
        headers=DEFAULT_REQUEST_HEADERS)
    assert rv.status_code == 200
    result = json.loads(rv.data.decode('utf-8'))
    jsonschema.validate(result, INDIVIDUAL_TASK_STATUS_RESPONSE_SCHEMA)
    for status in result:
        assert status['id'] == 'abc'
        assert status['status'] == 'SUCCESS'
        assert not status['exception']
        assert status['ready']
        assert status['success']
        assert 'result' in status


def test_check_task_status_custom_status(client, mocker):
    mocker.patch(
        'inventory_provider.tasks.worker.AsyncResult', MockedAsyncResult)
    MockedAsyncResult.status = 'custom'
    MockedAsyncResult.result = None

    rv = client.post(
        'orig-jobs/check-task-status/xyz',
        headers=DEFAULT_REQUEST_HEADERS)
    assert rv.status_code == 200
    result = json.loads(rv.data.decode('utf-8'))
    jsonschema.validate(result, INDIVIDUAL_TASK_STATUS_RESPONSE_SCHEMA)
    for status in result:
        assert status['id'] == 'xyz'
        assert status['status'] == 'custom'
        assert not status['exception']
        assert not status['ready']
        assert not status['success']


def test_check_task_status_exception(client, mocker):
    mocker.patch(
        'inventory_provider.tasks.worker.AsyncResult', MockedAsyncResult)
    MockedAsyncResult.status = 'FAILURE'  # celery.states.FAILURE
    MockedAsyncResult.result = AssertionError('test error message')

    rv = client.post(
        'orig-jobs/check-task-status/123-xyz.ABC',
        headers=DEFAULT_REQUEST_HEADERS)
    assert rv.status_code == 200
    result = json.loads(rv.data.decode('utf-8'))
    jsonschema.validate(result, INDIVIDUAL_TASK_STATUS_RESPONSE_SCHEMA)
    for status in result:
        assert status['id'] == '123-xyz.ABC'
        assert status['status'] == 'FAILURE'
        assert status['exception']
        assert status['ready']
        assert not status['success']
        assert status['result']['error type'] == 'AssertionError'
        assert status['result']['message'] == 'test error message'


def test_latchdb(client, mocked_redis):

    rv = client.post(
        'testing/latchdb',
        headers=DEFAULT_REQUEST_HEADERS)
    assert rv.status_code == 200
    latch = json.loads(rv.data.decode('utf-8'))
    jsonschema.validate(latch, DB_LATCH_SCHEMA)


def test_job_log(client):

    test_events = {
        'joblog:AAAA:task-aaa': {
            'type': 'task-aaaa', 'uuid': 'AAAA'},
        'joblog:AAAB:task-infox': {
            'type': 'task-infox', 'uuid': 'AAAB'},

        'joblog:CCCC:task-received': {
            'type': 'task-received',
            'uuid': 'CCCC',
            'name': 'xyz',
            'args': ['z', 1]
        },
        'joblog:CCCC:task-started': {
            'type': 'task-started', 'uuid': 'CCCC'},
        'joblog:CCCC:task-succeeded': {
            'type': 'task-succeeded', 'uuid': 'CCCC'},

        'joblog:TTTT:task-received': {
            'type': 'task-received',
            'uuid': 'TTTT',
            'name': 'xyz',
            'args': ['q', 123]
        },
        'joblog:TTTT:task-started': {
            'type': 'task-started', 'uuid': 'TTTT'},
        'joblog:TTTT:task-failed': {
            'type': 'task-failed', 'uuid': 'TTTT'},

        'joblog:SSSS1:task-received': {
            'type': 'task-received',
            'uuid': 'SSSS',
            'name': 'xyz',
            'args': ['q', 123]
        },
        'joblog:SSSS1:task-started': {
            'type': 'task-started', 'uuid': 'SSSS'},
        'joblog:SSSS2:task-received': {
            'type': 'task-received',
            'uuid': 'SSSS2',
            'name': 'xyz',
            'args': ['q', 123]
        },
        'joblog:SSSS2:task-started': {
            'type': 'task-started', 'uuid': 'SSSS2'},
        'joblog:SSSS3:task-received': {
            'type': 'task-received',
            'uuid': 'SSSS3',
            'name': 'xyz',
            'args': ['q', 123]
        },
        'joblog:SSSS3:task-started': {
            'type': 'task-started', 'uuid': 'SSSS3'},

        'joblog:BBBB:task-info:99': {
            'type': 'task-info',
            'uuid': 'BBBB',
            'clock': 99,
            'message': 'x'
        },
        'joblog:BBBB:task-info:999': {
            'type': 'task-info',
            'uuid': 'BBBB',
            'clock': 999,
            'message': 'x'
        },

        'joblog:AAAA:task-warning:88': {
            'type': 'task-warning',
            'uuid': 'AAAA',
            'clock': 88,
            'message': 'x'
        },
        'joblog:AAAA:task-warning:888': {
            'type': 'task-warning',
            'uuid': 'AAAA',
            'clock': 888,
            'message': 'x'
        },

        'joblog:AAAA:task-error:77': {
            'type': 'task-error',
            'uuid': 'AAAA',
            'clock': 77,
            'message': 'x'
        },
        'joblog:AAAA:task-error:777': {
            'type': 'task-error',
            'uuid': 'AAAA',
            'clock': 777,
            'message': 'x'
        },
        'joblog:AAAA:task-error:7777': {
            'type': 'task-error',
            'uuid': 'AAAA',
            'clock': 7777,
            'message': 'x'
        }

    }

    db = backend_db()
    for k, v in test_events.items():
        db[k] = json.dumps(v)

    rv = client.post(
        'orig-jobs/log',
        headers=DEFAULT_REQUEST_HEADERS)
    assert rv.status_code == 200
    result = json.loads(rv.data.decode('utf-8'))
    jsonschema.validate(result, TASK_LOG_RESPONSE_SCHEMA)

    assert len(result['errors']) == 3
    assert len(result['pending']) == 3
    assert len(result['failed']) == 1
    assert len(result['warnings']) == 2