-
Robert Latta authoredRobert Latta authored
test_job_routes.py 10.56 KiB
import json
import jsonschema
from inventory_provider.tasks.common import _get_redis, DB_LATCH_SCHEMA
from inventory_provider.routes.jobs \
import INDIVIDUAL_TASK_STATUS_RESPONSE_SCHEMA, \
TASK_LOG_RESPONSE_SCHEMA, TASK_ID_RESPONSE_SCHEMA
DEFAULT_REQUEST_HEADERS = {
"Content-type": "application/json",
"Accept": ["application/json"]
}
def backend_db():
return _get_redis({
'redis': {
'hostname': None,
'port': None
},
'redis-databases': [0, 7]
}).db
def test_job_update_all(client, mocker):
expected_task_id = 'xyz@123#456'
launch_refresh_cache_all = mocker.patch(
'inventory_provider.tasks.worker.launch_refresh_cache_all')
launch_refresh_cache_all.return_value = expected_task_id
rv = client.post(
'orig-jobs/update',
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 200
refresh_task_response = json.loads(rv.data.decode('utf-8'))
jsonschema.validate(refresh_task_response, TASK_ID_RESPONSE_SCHEMA)
assert refresh_task_response['task id'] == expected_task_id
db = backend_db()
assert db['classifier-cache:update-task-id'] \
== expected_task_id.encode('utf-8')
def test_job_update_force_pending(client, mocker):
expected_task_id = 'asf#asdf%111'
launch_refresh_cache_all = mocker.patch(
'inventory_provider.tasks.worker.launch_refresh_cache_all')
launch_refresh_cache_all.return_value = expected_task_id
mocked_get_latch = mocker.patch(
'inventory_provider.routes.jobs.get_latch')
mocked_get_latch.return_value = {'pending': True}
rv = client.post(
'orig-jobs/update?force=true',
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 200
refresh_task_response = json.loads(rv.data.decode('utf-8'))
jsonschema.validate(refresh_task_response, TASK_ID_RESPONSE_SCHEMA)
assert refresh_task_response['task id'] == expected_task_id
def test_job_update_pending_force_false(client, mocker):
def _assert_if_called(*args, **kwargs):
assert False
mocker.patch(
'inventory_provider.tasks.worker.launch_refresh_cache_all',
_assert_if_called)
mocked_get_latch = mocker.patch(
'inventory_provider.routes.jobs.get_latch')
mocked_get_latch.return_value = {'pending': True}
rv = client.post(
'orig-jobs/update?force=no',
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 503
def test_job_update_pending(client, mocker):
def _assert_if_called(*args, **kwargs):
assert False
mocker.patch(
'inventory_provider.tasks.worker.launch_refresh_cache_all',
_assert_if_called)
mocked_get_latch = mocker.patch(
'inventory_provider.routes.jobs.get_latch')
mocked_get_latch.return_value = {'pending': True}
rv = client.post(
'orig-jobs/update',
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 503
class MockedAsyncResult(object):
status = None
result = None
def __init__(self, id, app=None):
self.id = id
def test_reload_router_config(client, mocker):
delay_result = mocker.patch(
'inventory_provider.tasks.worker.reload_router_config.delay')
delay_result.return_value = MockedAsyncResult('bogus task id')
rv = client.post(
'orig-jobs/reload-router-config/ignored###123',
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 200
refresh_task_response = json.loads(rv.data.decode('utf-8'))
jsonschema.validate(refresh_task_response, TASK_ID_RESPONSE_SCHEMA)
assert refresh_task_response['task id'] == 'bogus task id'
def test_check_update_status(client, mocker):
db = backend_db()
db['classifier-cache:update-task-id'] = 'zz55'
mocker.patch(
'inventory_provider.tasks.worker.AsyncResult', MockedAsyncResult)
MockedAsyncResult.status = 'SUCCESS' # celery.states.SUCCESS
MockedAsyncResult.result = {'absab': 1, 'def': 'aaabbb'}
rv = client.post(
'orig-jobs/check-update-status',
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 200
result = json.loads(rv.data.decode('utf-8'))
jsonschema.validate(result, INDIVIDUAL_TASK_STATUS_RESPONSE_SCHEMA)
for status in result:
assert status['id'] == 'zz55'
assert status['status'] == 'SUCCESS'
assert not status['exception']
assert status['ready']
assert status['success']
assert 'result' in status
def test_check_update_status_404(client):
db = backend_db()
db.pop('classifier-cache:update-task-id', None)
rv = client.post(
'orig-jobs/check-update-status',
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 404
def test_check_task_status_success(client, mocker):
mocker.patch(
'inventory_provider.tasks.worker.AsyncResult', MockedAsyncResult)
MockedAsyncResult.status = 'SUCCESS' # celery.states.SUCCESS
MockedAsyncResult.result = {'abc': 1, 'def': 'aaabbb'}
rv = client.post(
'orig-jobs/check-task-status/abc',
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 200
result = json.loads(rv.data.decode('utf-8'))
jsonschema.validate(result, INDIVIDUAL_TASK_STATUS_RESPONSE_SCHEMA)
for status in result:
assert status['id'] == 'abc'
assert status['status'] == 'SUCCESS'
assert not status['exception']
assert status['ready']
assert status['success']
assert 'result' in status
def test_check_task_status_custom_status(client, mocker):
mocker.patch(
'inventory_provider.tasks.worker.AsyncResult', MockedAsyncResult)
MockedAsyncResult.status = 'custom'
MockedAsyncResult.result = None
rv = client.post(
'orig-jobs/check-task-status/xyz',
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 200
result = json.loads(rv.data.decode('utf-8'))
jsonschema.validate(result, INDIVIDUAL_TASK_STATUS_RESPONSE_SCHEMA)
for status in result:
assert status['id'] == 'xyz'
assert status['status'] == 'custom'
assert not status['exception']
assert not status['ready']
assert not status['success']
def test_check_task_status_exception(client, mocker):
mocker.patch(
'inventory_provider.tasks.worker.AsyncResult', MockedAsyncResult)
MockedAsyncResult.status = 'FAILURE' # celery.states.FAILURE
MockedAsyncResult.result = AssertionError('test error message')
rv = client.post(
'orig-jobs/check-task-status/123-xyz.ABC',
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 200
result = json.loads(rv.data.decode('utf-8'))
jsonschema.validate(result, INDIVIDUAL_TASK_STATUS_RESPONSE_SCHEMA)
for status in result:
assert status['id'] == '123-xyz.ABC'
assert status['status'] == 'FAILURE'
assert status['exception']
assert status['ready']
assert not status['success']
assert status['result']['error type'] == 'AssertionError'
assert status['result']['message'] == 'test error message'
def test_latchdb(client, mocked_redis):
rv = client.post(
'testing/latchdb',
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 200
latch = json.loads(rv.data.decode('utf-8'))
jsonschema.validate(latch, DB_LATCH_SCHEMA)
def test_job_log(client):
test_events = {
'joblog:AAAA:task-aaa': {
'type': 'task-aaaa', 'uuid': 'AAAA'},
'joblog:AAAB:task-infox': {
'type': 'task-infox', 'uuid': 'AAAB'},
'joblog:CCCC:task-received': {
'type': 'task-received',
'uuid': 'CCCC',
'name': 'xyz',
'args': ['z', 1]
},
'joblog:CCCC:task-started': {
'type': 'task-started', 'uuid': 'CCCC'},
'joblog:CCCC:task-succeeded': {
'type': 'task-succeeded', 'uuid': 'CCCC'},
'joblog:TTTT:task-received': {
'type': 'task-received',
'uuid': 'TTTT',
'name': 'xyz',
'args': ['q', 123]
},
'joblog:TTTT:task-started': {
'type': 'task-started', 'uuid': 'TTTT'},
'joblog:TTTT:task-failed': {
'type': 'task-failed', 'uuid': 'TTTT'},
'joblog:SSSS1:task-received': {
'type': 'task-received',
'uuid': 'SSSS',
'name': 'xyz',
'args': ['q', 123]
},
'joblog:SSSS1:task-started': {
'type': 'task-started', 'uuid': 'SSSS'},
'joblog:SSSS2:task-received': {
'type': 'task-received',
'uuid': 'SSSS2',
'name': 'xyz',
'args': ['q', 123]
},
'joblog:SSSS2:task-started': {
'type': 'task-started', 'uuid': 'SSSS2'},
'joblog:SSSS3:task-received': {
'type': 'task-received',
'uuid': 'SSSS3',
'name': 'xyz',
'args': ['q', 123]
},
'joblog:SSSS3:task-started': {
'type': 'task-started', 'uuid': 'SSSS3'},
'joblog:BBBB:task-info:99': {
'type': 'task-info',
'uuid': 'BBBB',
'clock': 99,
'message': 'x'
},
'joblog:BBBB:task-info:999': {
'type': 'task-info',
'uuid': 'BBBB',
'clock': 999,
'message': 'x'
},
'joblog:AAAA:task-warning:88': {
'type': 'task-warning',
'uuid': 'AAAA',
'clock': 88,
'message': 'x'
},
'joblog:AAAA:task-warning:888': {
'type': 'task-warning',
'uuid': 'AAAA',
'clock': 888,
'message': 'x'
},
'joblog:AAAA:task-error:77': {
'type': 'task-error',
'uuid': 'AAAA',
'clock': 77,
'message': 'x'
},
'joblog:AAAA:task-error:777': {
'type': 'task-error',
'uuid': 'AAAA',
'clock': 777,
'message': 'x'
},
'joblog:AAAA:task-error:7777': {
'type': 'task-error',
'uuid': 'AAAA',
'clock': 7777,
'message': 'x'
}
}
db = backend_db()
for k, v in test_events.items():
db[k] = json.dumps(v)
rv = client.post(
'orig-jobs/log',
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 200
result = json.loads(rv.data.decode('utf-8'))
jsonschema.validate(result, TASK_LOG_RESPONSE_SCHEMA)
assert len(result['errors']) == 3
assert len(result['pending']) == 3
assert len(result['failed']) == 1
assert len(result['warnings']) == 2