Skip to content
Snippets Groups Projects
Commit b93e13a4 authored by Erik Reid's avatar Erik Reid
Browse files

Finished feature refactory-async-task-info.

parents 7d7d4cee 2b3e6fda
No related branches found
No related tags found
No related merge requests found
......@@ -63,6 +63,7 @@ CONFIG_SCHEMA = {
"properties": {
"hostname": {"type": "string"},
"port": {"type": "integer"},
"celery-db-index": {"type": "integer"},
"socket_timeout": {"$ref": "#/definitions/timeout"}
},
"required": ["hostname", "port"],
......@@ -73,6 +74,7 @@ CONFIG_SCHEMA = {
"properties": {
"hostname": {"type": "string"},
"port": {"type": "integer"},
"celery-db-index": {"type": "integer"},
"name": {"type": "string"},
"redis_socket_timeout": {"$ref": "#/definitions/timeout"},
"sentinel_socket_timeout": {"$ref": "#/definitions/timeout"}
......
......@@ -32,16 +32,16 @@ def update():
status=503,
mimetype="text/html")
job_ids = worker.launch_refresh_cache_all(
phase2_task_id = worker.launch_refresh_cache_all(
current_app.config["INVENTORY_PROVIDER_CONFIG"])
return jsonify(job_ids)
return jsonify({'task id': phase2_task_id})
@routes.route("reload-router-config/<equipment_name>", methods=['GET', 'POST'])
@common.require_accepts_json
def reload_router_config(equipment_name):
result = worker.reload_router_config.delay(equipment_name)
return jsonify([result.id])
return jsonify({'task id': result.id})
@routes.route("check-task-status/<task_id>", methods=['GET', 'POST'])
......
import logging
import os
from inventory_provider import config
import redis.sentinel
logger = logging.getLogger(__name__)
DEFAULT_CELERY_DB_INDEX = 1
DEFAULT_SENTINEL_SOCKET_TIMEOUT = 0.5
DEFAULT_REDIS_SOCKET_TIMEOUT = 5.0
assert os.path.isfile(os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME']), (
'config file %r not found' %
os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME'])
broker_transport_options = result_backend_transport_options = dict()
with open(os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME']) as f:
logging.info(
'loading config from: %r'
......@@ -17,27 +21,29 @@ with open(os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME']) as f:
if 'sentinel' in inventory_provider_config:
params = inventory_provider_config['sentinel']
sentinel = redis.sentinel.Sentinel(
[(params['hostname'], int(params['port']))],
socket_timeout=0.5)
master = sentinel.discover_master(params['name'])
assert master
_broker_hostname = master[0]
_broker_port = master[1]
_broker_scheme = 'sentinel'
broker_transport_options['master_name'] = params['name']
sentinel_socket_timeout = params.get(
'sentinel_socket_timeout', DEFAULT_SENTINEL_SOCKET_TIMEOUT)
else:
params = inventory_provider_config['redis']
_broker_hostname = params['hostname']
_broker_port = int(params['port'])
_broker_scheme = 'redis'
redis_socket_timeout = params.get(
'redis_socket_timeout', DEFAULT_REDIS_SOCKET_TIMEOUT)
_broker_db_index = 1 # TODO: this should be a config param
_broker_hostname = params['hostname']
_broker_port = params['port']
_broker_db_index = params.get('celery-db-index', DEFAULT_CELERY_DB_INDEX)
if ':' in _broker_hostname:
# assume this means hostname is an ipv6 address
_broker_hostname = '[%s]' % _broker_hostname
broker_url = result_backend = 'redis://%s:%d/%d' % (
_broker_hostname, _broker_port, _broker_db_index)
_broker_hostname = f'[_broker_hostname]'
broker_url = result_backend = (f'{_broker_scheme}://{_broker_hostname}'
f':{_broker_port}/{_broker_db_index}')
logger.debug('broker_url: %r' % broker_url)
task_eager_propagates = True
task_track_started = True
......@@ -67,8 +67,10 @@ class InventoryTask(Task):
logging.debug("loaded config: %r" % InventoryTask.config)
def update_state(self, **kwargs):
meta = kwargs.setdefault('meta', dict())
meta['task'] = self.name
logger.debug(json.dumps(
{'state': kwargs['state'], 'meta': str(kwargs['meta'])}
{'state': kwargs['state'], 'meta': str(meta)}
))
super().update_state(**kwargs)
......@@ -271,7 +273,6 @@ def update_neteng_managed_device_list(self):
self.update_state(
state=states.STARTED,
meta={
'task': 'update_neteng_managed_device_list',
'message': 'querying netdash for managed routers'
})
......@@ -281,7 +282,6 @@ def update_neteng_managed_device_list(self):
self.update_state(
state=states.STARTED,
meta={
'task': 'update_neteng_managed_device_list',
'message': f'found {len(routers)} routers, saving details'
})
......@@ -425,7 +425,6 @@ def reload_router_config(self, hostname):
self.update_state(
state=states.STARTED,
meta={
'task': 'reload_router_config',
'hostname': hostname,
'message': 'loading router netconf data'
})
......@@ -463,7 +462,6 @@ def reload_router_config(self, hostname):
self.update_state(
state=states.STARTED,
meta={
'task': 'reload_router_config',
'hostname': hostname,
'message': 'refreshing peers & clearing cache'
})
......@@ -482,7 +480,6 @@ def reload_router_config(self, hostname):
self.update_state(
state=states.STARTED,
meta={
'task': 'reload_router_config',
'hostname': hostname,
'message': 'refreshing snmp interface indexes'
})
......@@ -515,6 +512,29 @@ def _erase_next_db(config):
new_next=saved_latch['next'])
@app.task(base=InventoryTask, bind=True, name='internal_refresh_phase_2')
@log_task_entry_and_exit
def internal_refresh_phase_2(self):
# second batch of subtasks:
# alarms db status cache
# juniper netconf & snmp data
subtasks = [
update_equipment_locations.apply_async(),
update_lg_routers.apply_async(),
update_access_services.apply_async(),
import_unmanaged_interfaces.apply_async()
]
for hostname in data.derive_router_hostnames(config):
logger.debug('queueing router refresh jobs for %r' % hostname)
subtasks.append(reload_router_config.apply_async(args=[hostname]))
pending_task_ids = [x.id for x in subtasks]
t = refresh_finalizer.apply_async(args=[json.dumps(pending_task_ids)])
pending_task_ids.append(t.id)
return pending_task_ids
def launch_refresh_cache_all(config):
"""
utility function intended to be called outside of the worker process
......@@ -534,24 +554,10 @@ def launch_refresh_cache_all(config):
]
[x.get() for x in subtasks]
# second batch of subtasks:
# alarms db status cache
# juniper netconf & snmp data
subtasks = [
update_equipment_locations.apply_async(),
update_lg_routers.apply_async(),
update_access_services.apply_async(),
import_unmanaged_interfaces.apply_async()
]
for hostname in data.derive_router_hostnames(config):
logger.debug('queueing router refresh jobs for %r' % hostname)
subtasks.append(reload_router_config.apply_async(args=[hostname]))
pending_task_ids = [x.id for x in subtasks]
t = refresh_finalizer.apply_async(args=[json.dumps(pending_task_ids)])
pending_task_ids.append(t.id)
return pending_task_ids
# now launch the task whose only purpose is to
# act as a convenient parent for all of the remaining tasks
t = internal_refresh_phase_2.apply_async()
return t.id
def _wait_for_tasks(task_ids, update_callback=lambda s: None):
......@@ -601,7 +607,6 @@ def refresh_finalizer(self, pending_task_ids_json):
self.update_state(
state=states.STARTED,
meta={
'task': 'refresh_finalizer',
'message': s
})
......@@ -683,14 +688,13 @@ def check_task_status(task_id):
'ready': r.status in states.READY_STATES,
'success': r.status == states.SUCCESS,
}
if r.result:
# TODO: only discovered this case by testing, is this the only one?
# ... otherwise need to pre-test json serialization
if isinstance(r.result, Exception):
result['result'] = {
'error type': type(r.result).__name__,
'message': str(r.result)
}
else:
result['result'] = r.result
# TODO: only discovered this case by testing, is this the only one?
# ... otherwise need to pre-test json serialization
if isinstance(r.result, Exception):
result['result'] = {
'error type': type(r.result).__name__,
'message': str(r.result)
}
else:
result['result'] = r.result
return result
......@@ -33,7 +33,7 @@ def test_otrs_exports(data_config_filename, data_config, mocker):
args, kwargs = mocked_run.call_args
called_with = args[0]
t = r'^rsync -aP --rsh="ssh -l {user} -p 22 -i {key_file} -o \'UserKnownHostsFile {known_hosts}\'" /\w+/\w+/\* {destination}$' # noqa
t = r'^rsync -aP --rsh="ssh -l {user} -p 22 -i {key_file} -o \'UserKnownHostsFile {known_hosts}\'" /\S+/\* {destination}$' # noqa
p = t.format(
user=otrs_config['username'],
......
......@@ -7,10 +7,14 @@ DEFAULT_REQUEST_HEADERS = {
"Accept": ["application/json"]
}
TASK_ID_LIST_SCHEMA = {
TASK_ID_RESPONSE_SCHEMA = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "array",
"items": {"type": "string"}
"type": "object",
"properties": {
"task id": {"type": "string"}
},
"required": ["task id"],
"additionalProperties": False
}
TASK_STATUS_SCHEMA = {
......@@ -22,7 +26,7 @@ TASK_STATUS_SCHEMA = {
"exception": {"type": "boolean"},
"ready": {"type": "boolean"},
"success": {"type": "boolean"},
"result": {"type": "object"}
"result": {"type": ["object", "null"]}
},
"required": ["id", "status", "exception", "ready", "success"],
"additionalProperties": False
......@@ -30,25 +34,25 @@ TASK_STATUS_SCHEMA = {
def test_job_update_all(client, mocker):
expected_task_ids = ['abc', 'def', 'xyz@123#456']
expected_task_id = 'xyz@123#456'
launch_refresh_cache_all = mocker.patch(
'inventory_provider.tasks.worker.launch_refresh_cache_all')
launch_refresh_cache_all.return_value = expected_task_ids
launch_refresh_cache_all.return_value = expected_task_id
rv = client.post(
'jobs/update',
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 200
task_id_list = json.loads(rv.data.decode('utf-8'))
jsonschema.validate(task_id_list, TASK_ID_LIST_SCHEMA)
assert set(task_id_list) == set(expected_task_ids)
refresh_task_response = json.loads(rv.data.decode('utf-8'))
jsonschema.validate(refresh_task_response, TASK_ID_RESPONSE_SCHEMA)
assert refresh_task_response['task id'] == expected_task_id
def test_job_update_force_pending(client, mocker):
expected_task_ids = ['22', 'agfafg', 'asf#asdf%111', '234']
expected_task_id = 'asf#asdf%111'
launch_refresh_cache_all = mocker.patch(
'inventory_provider.tasks.worker.launch_refresh_cache_all')
launch_refresh_cache_all.return_value = expected_task_ids
launch_refresh_cache_all.return_value = expected_task_id
mocked_get_latch = mocker.patch(
'inventory_provider.routes.jobs.get_latch')
......@@ -58,9 +62,9 @@ def test_job_update_force_pending(client, mocker):
'jobs/update?force=true',
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 200
task_id_list = json.loads(rv.data.decode('utf-8'))
jsonschema.validate(task_id_list, TASK_ID_LIST_SCHEMA)
assert set(task_id_list) == set(expected_task_ids)
refresh_task_response = json.loads(rv.data.decode('utf-8'))
jsonschema.validate(refresh_task_response, TASK_ID_RESPONSE_SCHEMA)
assert refresh_task_response['task id'] == expected_task_id
def test_job_update_pending_force_false(client, mocker):
......@@ -114,9 +118,9 @@ def test_reload_router_config(client, mocker):
'jobs/reload-router-config/ignored###123',
headers=DEFAULT_REQUEST_HEADERS)
assert rv.status_code == 200
task_id_list = json.loads(rv.data.decode('utf-8'))
jsonschema.validate(task_id_list, TASK_ID_LIST_SCHEMA)
assert task_id_list == ['bogus task id']
refresh_task_response = json.loads(rv.data.decode('utf-8'))
jsonschema.validate(refresh_task_response, TASK_ID_RESPONSE_SCHEMA)
assert refresh_task_response['task id'] == 'bogus task id'
def test_check_task_status_success(client, mocker):
......@@ -156,7 +160,6 @@ def test_check_task_status_custom_status(client, mocker):
assert not status['exception']
assert not status['ready']
assert not status['success']
assert 'result' not in status
def test_check_task_status_exception(client, mocker):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment