diff --git a/MANIFEST.in b/MANIFEST.in index be4a825561e7c94813eaf2b9db2cab18f9ba7cb8..a8495446b771f87686bf6e09307093d7edd9e5d6 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1 +1,2 @@ recursive-include inventory_provider/static * +include inventory_provider/logging_default_config.json diff --git a/changelog b/changelog index c5d29d948e99d30748ddd49ad8a061404964b093..660c7b905d70b45d96006b5582f949aa82000e98 100644 --- a/changelog +++ b/changelog @@ -25,3 +25,4 @@ read snmp community string from netconf derive active router list from junosspace cache ix public & vpn rr peers + use external logging config file diff --git a/inventory_provider/__init__.py b/inventory_provider/__init__.py index 485cf81ce4288c4d353520c2245104d0fe839e2d..cdfb86083f614d423ef75faa187b17fb76609e07 100644 --- a/inventory_provider/__init__.py +++ b/inventory_provider/__init__.py @@ -38,6 +38,9 @@ def create_app(): if "SETTINGS_FILENAME" not in os.environ: assert False, \ "environment variable SETTINGS_FILENAME' must be defined" + + logging.info("initializing Flask with config from: %r" + % os.environ["SETTINGS_FILENAME"]) app.config.from_envvar("SETTINGS_FILENAME") assert "INVENTORY_PROVIDER_CONFIG_FILENAME" in app.config, ( @@ -51,8 +54,10 @@ def create_app(): from inventory_provider import config with open(app.config["INVENTORY_PROVIDER_CONFIG_FILENAME"]) as f: # test the config file can be loaded + logging.info("loading config from: %r" + % app.config["INVENTORY_PROVIDER_CONFIG_FILENAME"]) app.config["INVENTORY_PROVIDER_CONFIG"] = config.load(f) - logging.debug(app.config) + logging.info('Inventory Provider Flask app initialized') return app diff --git a/inventory_provider/constants.py b/inventory_provider/constants.py index f88475574e7b44dd3d17116027a2ba6c8588a0a6..3925c65bb0a7e8410de907d77899e5c92cf6b882 100644 --- a/inventory_provider/constants.py +++ b/inventory_provider/constants.py @@ -1,4 +1,4 @@ -SNMP_LOGGER_NAME = "snmp-logger" -JUNIPER_LOGGER_NAME = "juniper-logger" -DATABASE_LOGGER_NAME = "database-logger" -TASK_LOGGER_NAME = "task-logger" +SNMP_LOGGER_NAME = "inventory_provider.snmp" +JUNIPER_LOGGER_NAME = "inventory_provider.juniper" +DATABASE_LOGGER_NAME = "inventory_provider.database" +TASK_LOGGER_NAME = "inventory_provider.task" diff --git a/inventory_provider/environment.py b/inventory_provider/environment.py index ad4bcadcf93f39e75e403fab6fd2545ff46284d9..989c0a1355ebb7b0e44f110842b25f045868a47d 100644 --- a/inventory_provider/environment.py +++ b/inventory_provider/environment.py @@ -1,26 +1,24 @@ -import logging +import json +import logging.config import os -import sys -from inventory_provider import constants - - -def _level_from_env(var_name, default_level=logging.INFO): - level_str = os.getenv(var_name, logging.getLevelName(default_level)) - numeric_level = getattr(logging, level_str.upper(), default_level) - logging.debug('setting %s logging level to %s' - % (var_name, logging.getLevelName(numeric_level))) - return numeric_level def setup_logging(): - logging.basicConfig( - stream=sys.stderr, - level=_level_from_env('DEFAULT_LOGGING', logging.INFO)) - logging.getLogger(constants.SNMP_LOGGER_NAME).setLevel( - _level_from_env('SNMP_LOGGING', logging.INFO)) - logging.getLogger(constants.TASK_LOGGER_NAME).setLevel( - _level_from_env('TASK_LOGGING', logging.INFO)) - logging.getLogger(constants.JUNIPER_LOGGER_NAME).setLevel( - _level_from_env('JUNIPER_LOGGING', logging.INFO)) - logging.getLogger(constants.DATABASE_LOGGER_NAME).setLevel( - _level_from_env('DATABASE_LOGGING', logging.INFO)) + """ + set up logging using the configured filename + + if LOGGING_CONFIG is defined in the environment, use this for + the filename, otherwise use logging_default_config.json + """ + default_filename = os.path.join( + os.path.dirname(__file__), + 'logging_default_config.json') + filename = os.getenv('LOGGING_CONFIG', default_filename) + with open(filename) as f: + # TODO: this mac workaround should be removed ... + d = json.loads(f.read()) + import platform + if platform.system() == 'Darwin': + d['handlers']['syslog_handler']['address'] = '/var/run/syslog' + logging.config.dictConfig(d) + # logging.config.dictConfig(json.loads(f.read())) diff --git a/inventory_provider/logging_default_config.json b/inventory_provider/logging_default_config.json new file mode 100644 index 0000000000000000000000000000000000000000..2ab40c772446f65e022057395ab9a07cd0e91d51 --- /dev/null +++ b/inventory_provider/logging_default_config.json @@ -0,0 +1,76 @@ +{ + "version": 1, + "disable_existing_loggers": false, + "formatters": { + "simple": { + "format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + } + }, + + "handlers": { + "console": { + "class": "logging.StreamHandler", + "level": "DEBUG", + "formatter": "simple", + "stream": "ext://sys.stdout" + }, + + "syslog_handler": { + "class": "logging.handlers.SysLogHandler", + "level": "DEBUG", + "address": "/dev/log", + "facility": "user", + "formatter": "simple" + }, + + "info_file_handler": { + "class": "logging.handlers.RotatingFileHandler", + "level": "INFO", + "formatter": "simple", + "filename": "info.log", + "maxBytes": 10485760, + "backupCount": 20, + "encoding": "utf8" + }, + + "error_file_handler": { + "class": "logging.handlers.RotatingFileHandler", + "level": "ERROR", + "formatter": "simple", + "filename": "errors.log", + "maxBytes": 10485760, + "backupCount": 20, + "encoding": "utf8" + } + }, + + "loggers": { + "inventory_provider": { + "level": "DEBUG", + "handlers": ["console", "syslog_handler"], + "propagate": false + }, + "inventory_provider.snmp": { + "level": "INFO" + }, + "inventory_provider.juniper": { + "level": "INFO" + }, + "inventory_provider.database": { + "level": "INFO" + }, + "inventory_provider.task": { + "level": "DEBUG" + }, + "celery.app.trace": { + "level": "INFO", + "handlers": ["console", "syslog_handler"], + "propagate": false + } + }, + + "root": { + "level": "WARNING", + "handlers": ["console", "syslog_handler"] + } +} \ No newline at end of file diff --git a/inventory_provider/routes/jobs.py b/inventory_provider/routes/jobs.py index c26ab6a588041272a855065d38d13e1d028b4474..99be6cff92fa5b59389b144c02823b4aed4d0038 100644 --- a/inventory_provider/routes/jobs.py +++ b/inventory_provider/routes/jobs.py @@ -1,4 +1,4 @@ -from flask import Blueprint, Response, current_app +from flask import Blueprint, Response, current_app, jsonify from inventory_provider.tasks import worker routes = Blueprint("inventory-data-job-routes", __name__) @@ -6,9 +6,9 @@ routes = Blueprint("inventory-data-job-routes", __name__) @routes.route("/update", methods=['GET', 'POST']) def update(): - worker.start_refresh_cache_all( + job_ids = worker.launch_refresh_cache_all( current_app.config["INVENTORY_PROVIDER_CONFIG"]) - return Response("OK") + return jsonify(job_ids) @routes.route("update-interface-statuses") @@ -19,5 +19,10 @@ def update_interface_statuses(): @routes.route("reload-router-config/<equipment_name>") def reload_router_config(equipment_name): - worker.reload_router_config.delay(equipment_name) - return Response("OK") + result = worker.reload_router_config.delay(equipment_name) + return jsonify([result.id]) + + +@routes.route("check-task-status/<task_id>") +def check_task_status(task_id): + return jsonify(worker.check_task_status(task_id)) diff --git a/inventory_provider/tasks/config.py b/inventory_provider/tasks/config.py index 84d4878ed1377547e9164b3764ef21f10239dd36..9e59a19335b81715406ef31c9acec14e81158909 100644 --- a/inventory_provider/tasks/config.py +++ b/inventory_provider/tasks/config.py @@ -6,3 +6,4 @@ broker_url = getenv( result_backend = getenv( 'CELERY_BROKER_URL', default='redis://test-dashboard02.geant.org:6379/1') +task_eager_propagates = True diff --git a/inventory_provider/tasks/worker.py b/inventory_provider/tasks/worker.py index cdfc648fff7e5d261919f4fdda0a99b4420eb4ae..95163f7607b6aeeba7279bfee92a353f496ecce9 100644 --- a/inventory_provider/tasks/worker.py +++ b/inventory_provider/tasks/worker.py @@ -2,7 +2,8 @@ import json import logging import re -from celery import bootsteps, Task, group +from celery import bootsteps, Task, group, states +from celery.result import AsyncResult from collections import defaultdict from lxml import etree @@ -21,52 +22,52 @@ from inventory_provider import juniper environment.setup_logging() +class InventoryTaskError(Exception): + pass + + class InventoryTask(Task): config = None - # logger = None def __init__(self): pass - # @staticmethod - # def save_key(hostname, key, value): - # assert isinstance(value, str), \ - # "sanity failure: expected string data as value" - # r = get_redis(InventoryTask.config) - # r.hset( - # name=hostname, - # key=key, - # value=value) - # InventoryTask.logger.debug( - # "saved %s, key %s" % (hostname, key)) - # return "OK" - - @staticmethod - def save_value(key, value): - assert isinstance(value, str), \ - "sanity failure: expected string data as value" - r = get_redis(InventoryTask.config) - r.set(name=key, value=value) - # InventoryTask.logger.debug("saved %s" % key) - return "OK" + def update_state(self, **kwargs): + task_logger = logging.getLogger(constants.TASK_LOGGER_NAME) + task_logger.debug(json.dumps( + {'state': kwargs['state'], 'meta': kwargs['meta']} + )) + super().update_state(**kwargs) + + +def _save_value(key, value): + assert isinstance(value, str), \ + "sanity failure: expected string data as value" + r = get_redis(InventoryTask.config) + r.set(name=key, value=value) + # InventoryTask.logger.debug("saved %s" % key) + return "OK" - @staticmethod - def save_value_json(key, data_obj): - InventoryTask.save_value( - key, - json.dumps(data_obj)) - @staticmethod - def save_value_etree(key, xml_doc): - InventoryTask.save_value( - key, - etree.tostring(xml_doc, encoding='unicode')) +def _save_value_json(key, data_obj): + _save_value( + key, + json.dumps(data_obj)) + + +def _save_value_etree(key, xml_doc): + _save_value( + key, + etree.tostring(xml_doc, encoding='unicode')) class WorkerArgs(bootsteps.Step): def __init__(self, worker, config_filename, **options): with open(config_filename) as f: + task_logger = logging.getLogger(constants.TASK_LOGGER_NAME) + task_logger.info( + "Initializing worker with config from: %r" % config_filename) InventoryTask.config = config.load(f) @@ -89,7 +90,7 @@ def snmp_refresh_interfaces(hostname, community): task_logger.debug( '>>> snmp_refresh_interfaces(%r, %r)' % (hostname, community)) - InventoryTask.save_value_json( + _save_value_json( 'snmp-interfaces:' + hostname, list(snmp.get_router_interfaces( hostname, @@ -105,7 +106,7 @@ def netconf_refresh_config(hostname): task_logger = logging.getLogger(constants.TASK_LOGGER_NAME) task_logger.debug('>>> netconf_refresh_config(%r)' % hostname) - InventoryTask.save_value_etree( + _save_value_etree( 'netconf:' + hostname, juniper.load_config(hostname, InventoryTask.config["ssh"])) @@ -195,25 +196,49 @@ def update_interface_statuses(): csr, service["equipment"], service["interface_name"]) - InventoryTask.save_value(key, status) + _save_value(key, status) task_logger.debug('<<< update_interface_statuses') -@app.task -def update_junosspace_device_list(): +@app.task(base=InventoryTask, bind=True) +def update_junosspace_device_list(self): task_logger = logging.getLogger(constants.TASK_LOGGER_NAME) task_logger.debug('>>> update_junosspace_device_list') + self.update_state( + state=states.STARTED, + meta={ + 'task': 'update_junosspace_device_list', + 'message': 'querying junosspace for managed routers' + }) + r = get_redis(InventoryTask.config) + + routers = {} for d in juniper.load_routers_from_junosspace( - InventoryTask.config["junosspace"]): - r.set( - 'junosspace:' + d['hostname'], - json.dumps(d).encode('utf-8')) + InventoryTask.config['junosspace']): + routers['junosspace:' + d['hostname']] = json.dumps(d).endode('utf-8') + + self.update_state( + state=states.STARTED, + meta={ + 'task': 'update_junosspace_device_list', + 'message': 'found %d routers, saving details' % len(routers) + }) + + for k in r.keys('junosspace:*'): + r.delete(k) + for k, v in routers.items(): + r.set(k, v) task_logger.debug('<<< update_junosspace_device_list') + return { + 'task': 'update_junosspace_device_list', + 'message': 'saved %d managed routers' % len(routers) + } + def load_netconf_data(hostname): """ @@ -274,32 +299,60 @@ def refresh_vpn_rr_peers(hostname, netconf): juniper.vpn_rr_peers(netconf)) -@app.task -def reload_router_config(hostname): +@app.task(base=InventoryTask, bind=True) +def reload_router_config(self, hostname): task_logger = logging.getLogger(constants.TASK_LOGGER_NAME) - task_logger.debug('>>> update_router_config') + task_logger.debug('>>> reload_router_config') + + self.update_state( + state=states.STARTED, + meta={ + 'task': 'reload_router_config', + 'hostname': hostname, + 'message': 'loading router netconf data' + }) netconf_refresh_config.apply(args=[hostname]) netconf_doc = load_netconf_data(hostname) if netconf_doc is None: - task_logger.error('no netconf data available for %r' % hostname) + raise InventoryTaskError( + 'no netconf data available for %r' % hostname) else: - + self.update_state( + state=states.STARTED, + meta={ + 'task': 'reload_router_config', + 'hostname': hostname, + 'message': 'refreshing peers' + }) refresh_ix_public_peers(hostname, netconf_doc) refresh_vpn_rr_peers(hostname, netconf_doc) community = juniper.snmp_community_string(netconf_doc) if not community: - task_logger.error( + raise InventoryTaskError( 'error extracting community string for %r' % hostname) else: + self.update_state( + state=states.STARTED, + meta={ + 'task': 'reload_router_config', + 'hostname': hostname, + 'message': 'refreshing snmp interface indexes' + }) snmp_refresh_interfaces.apply(args=[hostname, community]) # TODO: move this out of else? (i.e. clear even if netconf fails?) clear_cached_classifier_responses(hostname) - task_logger.debug('<<< update_router_config') + task_logger.debug('<<< reload_router_config') + + return { + 'task': 'reload_router_config', + 'hostname': hostname, + 'message': 'OK' + } def _derive_router_hostnames(config): @@ -320,7 +373,7 @@ def _derive_router_hostnames(config): return junosspace_equipment & opsdb_equipment -def start_refresh_cache_all(config): +def launch_refresh_cache_all(config): """ utility function intended to be called outside of the worker process :param config: config structure as defined in config.py @@ -350,4 +403,16 @@ def start_refresh_cache_all(config): 'queueing router refresh jobs for %r' % hostname) subtasks.append(reload_router_config.s(hostname)) - return group(subtasks).apply_async() + return [r.id for r in group(subtasks).apply_async()] + + +def check_task_status(task_id): + r = AsyncResult(task_id, app=app) + return { + 'id': task_id, + 'status': r.status, + 'exception': r.status in states.EXCEPTION_STATES, + 'ready': r.status in states.READY_STATES, + 'success': r.status == states.SUCCESS, + 'result': r.result + } diff --git a/test/per_router/test_celery_worker.py b/test/per_router/test_celery_worker.py index 81ad1be8b87c8637d70694c4464239229e9d855d..fa8037043e5e8b3ba3e85ae2489c0f0f14bcb0d9 100644 --- a/test/per_router/test_celery_worker.py +++ b/test/per_router/test_celery_worker.py @@ -133,9 +133,11 @@ def test_reload_router_config(mocked_worker_module, router, mocker): 'inventory_provider.tasks.worker.snmp_refresh_interfaces.apply', _mocked_snmp_refresh_interfaces_apply) + def _mocked_update_status(self, **kwargs): + pass mocker.patch( - 'inventory_provider.tasks.worker.snmp_refresh_interfaces.apply', - _mocked_snmp_refresh_interfaces_apply) + 'inventory_provider.tasks.worker.InventoryTask.update_state', + _mocked_update_status) worker.reload_router_config(router) assert 'netconf:' + router in MockedRedis.db