import json import jsonschema from inventory_provider.tasks.common import _get_redis, DB_LATCH_SCHEMA from inventory_provider.routes.jobs \ import INDIVIDUAL_TASK_STATUS_RESPONSE_SCHEMA, \ TASK_LOG_RESPONSE_SCHEMA, TASK_ID_RESPONSE_SCHEMA DEFAULT_REQUEST_HEADERS = { "Content-type": "application/json", "Accept": ["application/json"] } def backend_db(): return _get_redis({ 'redis': { 'hostname': None, 'port': None }, 'redis-databases': [0, 7] }).db def test_job_update_all(client, mocker): expected_task_id = 'xyz@123#456' launch_refresh_cache_all = mocker.patch( 'inventory_provider.tasks.worker.launch_refresh_cache_all') launch_refresh_cache_all.return_value = expected_task_id rv = client.post( 'orig-jobs/update', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 refresh_task_response = json.loads(rv.data.decode('utf-8')) jsonschema.validate(refresh_task_response, TASK_ID_RESPONSE_SCHEMA) assert refresh_task_response['task id'] == expected_task_id db = backend_db() assert db['classifier-cache:update-task-id'] \ == expected_task_id.encode('utf-8') def test_job_update_force_pending(client, mocker): expected_task_id = 'asf#asdf%111' launch_refresh_cache_all = mocker.patch( 'inventory_provider.tasks.worker.launch_refresh_cache_all') launch_refresh_cache_all.return_value = expected_task_id mocked_get_latch = mocker.patch( 'inventory_provider.routes.jobs.get_latch') mocked_get_latch.return_value = {'pending': True} rv = client.post( 'orig-jobs/update?force=true', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 refresh_task_response = json.loads(rv.data.decode('utf-8')) jsonschema.validate(refresh_task_response, TASK_ID_RESPONSE_SCHEMA) assert refresh_task_response['task id'] == expected_task_id def test_job_update_pending_force_false(client, mocker): def _assert_if_called(*args, **kwargs): assert False mocker.patch( 'inventory_provider.tasks.worker.launch_refresh_cache_all', _assert_if_called) mocked_get_latch = mocker.patch( 'inventory_provider.routes.jobs.get_latch') mocked_get_latch.return_value = {'pending': True} rv = client.post( 'orig-jobs/update?force=no', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 503 def test_job_update_pending(client, mocker): def _assert_if_called(*args, **kwargs): assert False mocker.patch( 'inventory_provider.tasks.worker.launch_refresh_cache_all', _assert_if_called) mocked_get_latch = mocker.patch( 'inventory_provider.routes.jobs.get_latch') mocked_get_latch.return_value = {'pending': True} rv = client.post( 'orig-jobs/update', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 503 class MockedAsyncResult(object): status = None result = None def __init__(self, id, app=None): self.id = id def test_reload_router_config(client, mocker): delay_result = mocker.patch( 'inventory_provider.tasks.worker.reload_router_config.delay') delay_result.return_value = MockedAsyncResult('bogus task id') rv = client.post( 'orig-jobs/reload-router-config/ignored###123', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 refresh_task_response = json.loads(rv.data.decode('utf-8')) jsonschema.validate(refresh_task_response, TASK_ID_RESPONSE_SCHEMA) assert refresh_task_response['task id'] == 'bogus task id' def test_check_update_status(client, mocker): db = backend_db() db['classifier-cache:update-task-id'] = 'zz55' mocker.patch( 'inventory_provider.tasks.worker.AsyncResult', MockedAsyncResult) MockedAsyncResult.status = 'SUCCESS' # celery.states.SUCCESS MockedAsyncResult.result = {'absab': 1, 'def': 'aaabbb'} rv = client.post( 'orig-jobs/check-update-status', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 result = json.loads(rv.data.decode('utf-8')) jsonschema.validate(result, INDIVIDUAL_TASK_STATUS_RESPONSE_SCHEMA) for status in result: assert status['id'] == 'zz55' assert status['status'] == 'SUCCESS' assert not status['exception'] assert status['ready'] assert status['success'] assert 'result' in status def test_check_update_status_404(client): db = backend_db() db.pop('classifier-cache:update-task-id', None) rv = client.post( 'orig-jobs/check-update-status', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 404 def test_check_task_status_success(client, mocker): mocker.patch( 'inventory_provider.tasks.worker.AsyncResult', MockedAsyncResult) MockedAsyncResult.status = 'SUCCESS' # celery.states.SUCCESS MockedAsyncResult.result = {'abc': 1, 'def': 'aaabbb'} rv = client.post( 'orig-jobs/check-task-status/abc', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 result = json.loads(rv.data.decode('utf-8')) jsonschema.validate(result, INDIVIDUAL_TASK_STATUS_RESPONSE_SCHEMA) for status in result: assert status['id'] == 'abc' assert status['status'] == 'SUCCESS' assert not status['exception'] assert status['ready'] assert status['success'] assert 'result' in status def test_check_task_status_custom_status(client, mocker): mocker.patch( 'inventory_provider.tasks.worker.AsyncResult', MockedAsyncResult) MockedAsyncResult.status = 'custom' MockedAsyncResult.result = None rv = client.post( 'orig-jobs/check-task-status/xyz', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 result = json.loads(rv.data.decode('utf-8')) jsonschema.validate(result, INDIVIDUAL_TASK_STATUS_RESPONSE_SCHEMA) for status in result: assert status['id'] == 'xyz' assert status['status'] == 'custom' assert not status['exception'] assert not status['ready'] assert not status['success'] def test_check_task_status_exception(client, mocker): mocker.patch( 'inventory_provider.tasks.worker.AsyncResult', MockedAsyncResult) MockedAsyncResult.status = 'FAILURE' # celery.states.FAILURE MockedAsyncResult.result = AssertionError('test error message') rv = client.post( 'orig-jobs/check-task-status/123-xyz.ABC', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 result = json.loads(rv.data.decode('utf-8')) jsonschema.validate(result, INDIVIDUAL_TASK_STATUS_RESPONSE_SCHEMA) for status in result: assert status['id'] == '123-xyz.ABC' assert status['status'] == 'FAILURE' assert status['exception'] assert status['ready'] assert not status['success'] assert status['result']['error type'] == 'AssertionError' assert status['result']['message'] == 'test error message' def test_latchdb(client, mocked_redis): rv = client.post( 'testing/latchdb', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 latch = json.loads(rv.data.decode('utf-8')) jsonschema.validate(latch, DB_LATCH_SCHEMA) def test_job_log(client): test_events = { 'joblog:AAAA:task-aaa': { 'type': 'task-aaaa', 'uuid': 'AAAA'}, 'joblog:AAAB:task-infox': { 'type': 'task-infox', 'uuid': 'AAAB'}, 'joblog:CCCC:task-received': { 'type': 'task-received', 'uuid': 'CCCC', 'name': 'xyz', 'args': ['z', 1] }, 'joblog:CCCC:task-started': { 'type': 'task-started', 'uuid': 'CCCC'}, 'joblog:CCCC:task-succeeded': { 'type': 'task-succeeded', 'uuid': 'CCCC'}, 'joblog:TTTT:task-received': { 'type': 'task-received', 'uuid': 'TTTT', 'name': 'xyz', 'args': ['q', 123] }, 'joblog:TTTT:task-started': { 'type': 'task-started', 'uuid': 'TTTT'}, 'joblog:TTTT:task-failed': { 'type': 'task-failed', 'uuid': 'TTTT'}, 'joblog:SSSS1:task-received': { 'type': 'task-received', 'uuid': 'SSSS', 'name': 'xyz', 'args': ['q', 123] }, 'joblog:SSSS1:task-started': { 'type': 'task-started', 'uuid': 'SSSS'}, 'joblog:SSSS2:task-received': { 'type': 'task-received', 'uuid': 'SSSS2', 'name': 'xyz', 'args': ['q', 123] }, 'joblog:SSSS2:task-started': { 'type': 'task-started', 'uuid': 'SSSS2'}, 'joblog:SSSS3:task-received': { 'type': 'task-received', 'uuid': 'SSSS3', 'name': 'xyz', 'args': ['q', 123] }, 'joblog:SSSS3:task-started': { 'type': 'task-started', 'uuid': 'SSSS3'}, 'joblog:BBBB:task-info:99': { 'type': 'task-info', 'uuid': 'BBBB', 'clock': 99, 'message': 'x' }, 'joblog:BBBB:task-info:999': { 'type': 'task-info', 'uuid': 'BBBB', 'clock': 999, 'message': 'x' }, 'joblog:AAAA:task-warning:88': { 'type': 'task-warning', 'uuid': 'AAAA', 'clock': 88, 'message': 'x' }, 'joblog:AAAA:task-warning:888': { 'type': 'task-warning', 'uuid': 'AAAA', 'clock': 888, 'message': 'x' }, 'joblog:AAAA:task-error:77': { 'type': 'task-error', 'uuid': 'AAAA', 'clock': 77, 'message': 'x' }, 'joblog:AAAA:task-error:777': { 'type': 'task-error', 'uuid': 'AAAA', 'clock': 777, 'message': 'x' }, 'joblog:AAAA:task-error:7777': { 'type': 'task-error', 'uuid': 'AAAA', 'clock': 7777, 'message': 'x' } } db = backend_db() for k, v in test_events.items(): db[k] = json.dumps(v) rv = client.post( 'orig-jobs/log', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 result = json.loads(rv.data.decode('utf-8')) jsonschema.validate(result, TASK_LOG_RESPONSE_SCHEMA) assert len(result['errors']) == 3 assert len(result['pending']) == 3 assert len(result['failed']) == 1 assert len(result['warnings']) == 2