diff --git a/inventory_provider/config.py b/inventory_provider/config.py index 288cdc2b04ded42bcac18a9ec85293188a060131..f2cac43591ebb490e19e5d45da858de1fba3f4e0 100644 --- a/inventory_provider/config.py +++ b/inventory_provider/config.py @@ -63,6 +63,7 @@ CONFIG_SCHEMA = { "properties": { "hostname": {"type": "string"}, "port": {"type": "integer"}, + "celery-db-index": {"type": "integer"}, "socket_timeout": {"$ref": "#/definitions/timeout"} }, "required": ["hostname", "port"], @@ -73,6 +74,7 @@ CONFIG_SCHEMA = { "properties": { "hostname": {"type": "string"}, "port": {"type": "integer"}, + "celery-db-index": {"type": "integer"}, "name": {"type": "string"}, "redis_socket_timeout": {"$ref": "#/definitions/timeout"}, "sentinel_socket_timeout": {"$ref": "#/definitions/timeout"} diff --git a/inventory_provider/routes/jobs.py b/inventory_provider/routes/jobs.py index 9d3328581ea109c35a5ce79afe783f3bd38459b1..c46064366b53cbfc5a966d5c02b650490cb20c28 100644 --- a/inventory_provider/routes/jobs.py +++ b/inventory_provider/routes/jobs.py @@ -32,16 +32,16 @@ def update(): status=503, mimetype="text/html") - job_ids = worker.launch_refresh_cache_all( + phase2_task_id = worker.launch_refresh_cache_all( current_app.config["INVENTORY_PROVIDER_CONFIG"]) - return jsonify(job_ids) + return jsonify({'task id': phase2_task_id}) @routes.route("reload-router-config/<equipment_name>", methods=['GET', 'POST']) @common.require_accepts_json def reload_router_config(equipment_name): result = worker.reload_router_config.delay(equipment_name) - return jsonify([result.id]) + return jsonify({'task id': result.id}) @routes.route("check-task-status/<task_id>", methods=['GET', 'POST']) diff --git a/inventory_provider/tasks/config.py b/inventory_provider/tasks/config.py index 0b1879a95f3c7d40399d1c8fa36c2304ab06840a..bcca3f85fc14a0c71a65f830ab7661963eb6deb7 100644 --- a/inventory_provider/tasks/config.py +++ b/inventory_provider/tasks/config.py @@ -1,14 +1,18 @@ import logging import os from inventory_provider import config -import redis.sentinel logger = logging.getLogger(__name__) +DEFAULT_CELERY_DB_INDEX = 1 +DEFAULT_SENTINEL_SOCKET_TIMEOUT = 0.5 +DEFAULT_REDIS_SOCKET_TIMEOUT = 5.0 assert os.path.isfile(os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME']), ( 'config file %r not found' % os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME']) +broker_transport_options = result_backend_transport_options = dict() + with open(os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME']) as f: logging.info( 'loading config from: %r' @@ -17,27 +21,29 @@ with open(os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME']) as f: if 'sentinel' in inventory_provider_config: params = inventory_provider_config['sentinel'] - sentinel = redis.sentinel.Sentinel( - [(params['hostname'], int(params['port']))], - socket_timeout=0.5) - master = sentinel.discover_master(params['name']) - assert master - _broker_hostname = master[0] - _broker_port = master[1] + _broker_scheme = 'sentinel' + broker_transport_options['master_name'] = params['name'] + sentinel_socket_timeout = params.get( + 'sentinel_socket_timeout', DEFAULT_SENTINEL_SOCKET_TIMEOUT) else: params = inventory_provider_config['redis'] - _broker_hostname = params['hostname'] - _broker_port = int(params['port']) + _broker_scheme = 'redis' + +redis_socket_timeout = params.get( + 'redis_socket_timeout', DEFAULT_REDIS_SOCKET_TIMEOUT) -_broker_db_index = 1 # TODO: this should be a config param +_broker_hostname = params['hostname'] +_broker_port = params['port'] + +_broker_db_index = params.get('celery-db-index', DEFAULT_CELERY_DB_INDEX) if ':' in _broker_hostname: # assume this means hostname is an ipv6 address - _broker_hostname = '[%s]' % _broker_hostname - -broker_url = result_backend = 'redis://%s:%d/%d' % ( - _broker_hostname, _broker_port, _broker_db_index) + _broker_hostname = f'[_broker_hostname]' +broker_url = result_backend = (f'{_broker_scheme}://{_broker_hostname}' + f':{_broker_port}/{_broker_db_index}') logger.debug('broker_url: %r' % broker_url) task_eager_propagates = True +task_track_started = True diff --git a/inventory_provider/tasks/worker.py b/inventory_provider/tasks/worker.py index b82e263e5987ce1349042cd1797b3458baa57593..c6d75ee608ab17dbe1c15f6fcd62a5378afb3711 100644 --- a/inventory_provider/tasks/worker.py +++ b/inventory_provider/tasks/worker.py @@ -67,8 +67,10 @@ class InventoryTask(Task): logging.debug("loaded config: %r" % InventoryTask.config) def update_state(self, **kwargs): + meta = kwargs.setdefault('meta', dict()) + meta['task'] = self.name logger.debug(json.dumps( - {'state': kwargs['state'], 'meta': str(kwargs['meta'])} + {'state': kwargs['state'], 'meta': str(meta)} )) super().update_state(**kwargs) @@ -271,7 +273,6 @@ def update_neteng_managed_device_list(self): self.update_state( state=states.STARTED, meta={ - 'task': 'update_neteng_managed_device_list', 'message': 'querying netdash for managed routers' }) @@ -281,7 +282,6 @@ def update_neteng_managed_device_list(self): self.update_state( state=states.STARTED, meta={ - 'task': 'update_neteng_managed_device_list', 'message': f'found {len(routers)} routers, saving details' }) @@ -425,7 +425,6 @@ def reload_router_config(self, hostname): self.update_state( state=states.STARTED, meta={ - 'task': 'reload_router_config', 'hostname': hostname, 'message': 'loading router netconf data' }) @@ -463,7 +462,6 @@ def reload_router_config(self, hostname): self.update_state( state=states.STARTED, meta={ - 'task': 'reload_router_config', 'hostname': hostname, 'message': 'refreshing peers & clearing cache' }) @@ -482,7 +480,6 @@ def reload_router_config(self, hostname): self.update_state( state=states.STARTED, meta={ - 'task': 'reload_router_config', 'hostname': hostname, 'message': 'refreshing snmp interface indexes' }) @@ -515,6 +512,29 @@ def _erase_next_db(config): new_next=saved_latch['next']) +@app.task(base=InventoryTask, bind=True, name='internal_refresh_phase_2') +@log_task_entry_and_exit +def internal_refresh_phase_2(self): + # second batch of subtasks: + # alarms db status cache + # juniper netconf & snmp data + subtasks = [ + update_equipment_locations.apply_async(), + update_lg_routers.apply_async(), + update_access_services.apply_async(), + import_unmanaged_interfaces.apply_async() + ] + for hostname in data.derive_router_hostnames(config): + logger.debug('queueing router refresh jobs for %r' % hostname) + subtasks.append(reload_router_config.apply_async(args=[hostname])) + + pending_task_ids = [x.id for x in subtasks] + + t = refresh_finalizer.apply_async(args=[json.dumps(pending_task_ids)]) + pending_task_ids.append(t.id) + return pending_task_ids + + def launch_refresh_cache_all(config): """ utility function intended to be called outside of the worker process @@ -534,24 +554,10 @@ def launch_refresh_cache_all(config): ] [x.get() for x in subtasks] - # second batch of subtasks: - # alarms db status cache - # juniper netconf & snmp data - subtasks = [ - update_equipment_locations.apply_async(), - update_lg_routers.apply_async(), - update_access_services.apply_async(), - import_unmanaged_interfaces.apply_async() - ] - for hostname in data.derive_router_hostnames(config): - logger.debug('queueing router refresh jobs for %r' % hostname) - subtasks.append(reload_router_config.apply_async(args=[hostname])) - - pending_task_ids = [x.id for x in subtasks] - - t = refresh_finalizer.apply_async(args=[json.dumps(pending_task_ids)]) - pending_task_ids.append(t.id) - return pending_task_ids + # now launch the task whose only purpose is to + # act as a convenient parent for all of the remaining tasks + t = internal_refresh_phase_2.apply_async() + return t.id def _wait_for_tasks(task_ids, update_callback=lambda s: None): @@ -601,7 +607,6 @@ def refresh_finalizer(self, pending_task_ids_json): self.update_state( state=states.STARTED, meta={ - 'task': 'refresh_finalizer', 'message': s }) @@ -683,14 +688,13 @@ def check_task_status(task_id): 'ready': r.status in states.READY_STATES, 'success': r.status == states.SUCCESS, } - if r.result: - # TODO: only discovered this case by testing, is this the only one? - # ... otherwise need to pre-test json serialization - if isinstance(r.result, Exception): - result['result'] = { - 'error type': type(r.result).__name__, - 'message': str(r.result) - } - else: - result['result'] = r.result + # TODO: only discovered this case by testing, is this the only one? + # ... otherwise need to pre-test json serialization + if isinstance(r.result, Exception): + result['result'] = { + 'error type': type(r.result).__name__, + 'message': str(r.result) + } + else: + result['result'] = r.result return result diff --git a/test/test_ims_worker.py b/test/test_ims_worker.py index 136f37a82895795b753c03417b3528eb0677e5b7..7906c5bb49d15049d10d88691a494e7ae749b94a 100644 --- a/test/test_ims_worker.py +++ b/test/test_ims_worker.py @@ -33,7 +33,7 @@ def test_otrs_exports(data_config_filename, data_config, mocker): args, kwargs = mocked_run.call_args called_with = args[0] - t = r'^rsync -aP --rsh="ssh -l {user} -p 22 -i {key_file} -o \'UserKnownHostsFile {known_hosts}\'" /\w+/\w+/\* {destination}$' # noqa + t = r'^rsync -aP --rsh="ssh -l {user} -p 22 -i {key_file} -o \'UserKnownHostsFile {known_hosts}\'" /\S+/\* {destination}$' # noqa p = t.format( user=otrs_config['username'], diff --git a/test/test_job_routes.py b/test/test_job_routes.py index 07a8e61b2abea21129c371592bfad42aac33b715..7f1a20f1ebd70703116abb8c4f7d9ed9ddeade37 100644 --- a/test/test_job_routes.py +++ b/test/test_job_routes.py @@ -7,10 +7,14 @@ DEFAULT_REQUEST_HEADERS = { "Accept": ["application/json"] } -TASK_ID_LIST_SCHEMA = { +TASK_ID_RESPONSE_SCHEMA = { "$schema": "http://json-schema.org/draft-07/schema#", - "type": "array", - "items": {"type": "string"} + "type": "object", + "properties": { + "task id": {"type": "string"} + }, + "required": ["task id"], + "additionalProperties": False } TASK_STATUS_SCHEMA = { @@ -22,7 +26,7 @@ TASK_STATUS_SCHEMA = { "exception": {"type": "boolean"}, "ready": {"type": "boolean"}, "success": {"type": "boolean"}, - "result": {"type": "object"} + "result": {"type": ["object", "null"]} }, "required": ["id", "status", "exception", "ready", "success"], "additionalProperties": False @@ -30,25 +34,25 @@ TASK_STATUS_SCHEMA = { def test_job_update_all(client, mocker): - expected_task_ids = ['abc', 'def', 'xyz@123#456'] + expected_task_id = 'xyz@123#456' launch_refresh_cache_all = mocker.patch( 'inventory_provider.tasks.worker.launch_refresh_cache_all') - launch_refresh_cache_all.return_value = expected_task_ids + launch_refresh_cache_all.return_value = expected_task_id rv = client.post( 'jobs/update', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 - task_id_list = json.loads(rv.data.decode('utf-8')) - jsonschema.validate(task_id_list, TASK_ID_LIST_SCHEMA) - assert set(task_id_list) == set(expected_task_ids) + refresh_task_response = json.loads(rv.data.decode('utf-8')) + jsonschema.validate(refresh_task_response, TASK_ID_RESPONSE_SCHEMA) + assert refresh_task_response['task id'] == expected_task_id def test_job_update_force_pending(client, mocker): - expected_task_ids = ['22', 'agfafg', 'asf#asdf%111', '234'] + expected_task_id = 'asf#asdf%111' launch_refresh_cache_all = mocker.patch( 'inventory_provider.tasks.worker.launch_refresh_cache_all') - launch_refresh_cache_all.return_value = expected_task_ids + launch_refresh_cache_all.return_value = expected_task_id mocked_get_latch = mocker.patch( 'inventory_provider.routes.jobs.get_latch') @@ -58,9 +62,9 @@ def test_job_update_force_pending(client, mocker): 'jobs/update?force=true', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 - task_id_list = json.loads(rv.data.decode('utf-8')) - jsonschema.validate(task_id_list, TASK_ID_LIST_SCHEMA) - assert set(task_id_list) == set(expected_task_ids) + refresh_task_response = json.loads(rv.data.decode('utf-8')) + jsonschema.validate(refresh_task_response, TASK_ID_RESPONSE_SCHEMA) + assert refresh_task_response['task id'] == expected_task_id def test_job_update_pending_force_false(client, mocker): @@ -114,9 +118,9 @@ def test_reload_router_config(client, mocker): 'jobs/reload-router-config/ignored###123', headers=DEFAULT_REQUEST_HEADERS) assert rv.status_code == 200 - task_id_list = json.loads(rv.data.decode('utf-8')) - jsonschema.validate(task_id_list, TASK_ID_LIST_SCHEMA) - assert task_id_list == ['bogus task id'] + refresh_task_response = json.loads(rv.data.decode('utf-8')) + jsonschema.validate(refresh_task_response, TASK_ID_RESPONSE_SCHEMA) + assert refresh_task_response['task id'] == 'bogus task id' def test_check_task_status_success(client, mocker): @@ -156,7 +160,6 @@ def test_check_task_status_custom_status(client, mocker): assert not status['exception'] assert not status['ready'] assert not status['success'] - assert 'result' not in status def test_check_task_status_exception(client, mocker):