import json import jsonschema DEFAULT_REQUEST_HEADERS = { "Content-type": "application/json", "Accept": ["application/json"] } TASK_ID_LIST_SCHEMA = { "$schema": "http://json-schema.org/draft-07/schema#", "type": "array", "items": {"type": "string"} } TASK_STATUS_SCHEMA = { "$schema": "http://json-schema.org/draft-07/schema#", "type": "object", "properties": { "id": {"type": "string"}, "status": {"type": "string"}, "exception": {"type": "boolean"}, "ready": {"type": "boolean"}, "success": {"type": "boolean"}, "result": {"type": "object"} }, "required": ["id", "status", "exception", "ready", "success"], "additionalProperties": False } def test_job_update_all(client, mocker): launch_refresh_cache_all = mocker.patch( 'inventory_provider.tasks.worker.launch_refresh_cache_all') launch_refresh_cache_all.return_value = ['abc', 'def', 'xyz@123#456'] rv = client.post( 'jobs/update', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 task_id_list = json.loads(rv.data.decode('utf-8')) jsonschema.validate(task_id_list, TASK_ID_LIST_SCHEMA) assert len(task_id_list) == 3 class MockedAsyncResult(object): status = None result = None def __init__(self, id, app=None): self.id = id def test_reload_router_config(client, mocker): delay_result = mocker.patch( 'inventory_provider.tasks.worker.reload_router_config.delay') delay_result.return_value = MockedAsyncResult('bogus task id') rv = client.post( 'jobs/reload-router-config/ignored###123', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 task_id_list = json.loads(rv.data.decode('utf-8')) jsonschema.validate(task_id_list, TASK_ID_LIST_SCHEMA) assert task_id_list == ['bogus task id'] def test_check_task_status_success(client, mocker): mocker.patch( 'inventory_provider.tasks.worker.AsyncResult', MockedAsyncResult) MockedAsyncResult.status = 'SUCCESS' # celery.states.SUCCESS MockedAsyncResult.result = {'abc': 1, 'def': 'aaabbb'} rv = client.post( 'jobs/check-task-status/abc', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 status = json.loads(rv.data.decode('utf-8')) jsonschema.validate(status, TASK_STATUS_SCHEMA) assert status['id'] == 'abc' assert status['status'] == 'SUCCESS' assert not status['exception'] assert status['ready'] assert status['success'] assert 'result' in status def test_check_task_status_custom_status(client, mocker): mocker.patch( 'inventory_provider.tasks.worker.AsyncResult', MockedAsyncResult) MockedAsyncResult.status = 'custom' MockedAsyncResult.result = None rv = client.post( 'jobs/check-task-status/xyz', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 status = json.loads(rv.data.decode('utf-8')) jsonschema.validate(status, TASK_STATUS_SCHEMA) assert status['id'] == 'xyz' assert status['status'] == 'custom' assert not status['exception'] assert not status['ready'] assert not status['success'] assert 'result' not in status def test_check_task_status_exception(client, mocker): mocker.patch( 'inventory_provider.tasks.worker.AsyncResult', MockedAsyncResult) MockedAsyncResult.status = 'FAILURE' # celery.states.FAILURE MockedAsyncResult.result = AssertionError('test error message') rv = client.post( 'jobs/check-task-status/123-xyz.ABC', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 status = json.loads(rv.data.decode('utf-8')) jsonschema.validate(status, TASK_STATUS_SCHEMA) assert status['id'] == '123-xyz.ABC' assert status['status'] == 'FAILURE' assert status['exception'] assert status['ready'] assert not status['success'] assert status['result']['error type'] == 'AssertionError' assert status['result']['message'] == 'test error message'