Skip to content
Snippets Groups Projects
Commit 61ed86ff authored by Erik Reid's avatar Erik Reid
Browse files

Finished feature improve-worker-logging.

parents 993250bd cf96092b
No related branches found
No related tags found
No related merge requests found
recursive-include inventory_provider/static * recursive-include inventory_provider/static *
include inventory_provider/logging_default_config.json
...@@ -25,3 +25,4 @@ ...@@ -25,3 +25,4 @@
read snmp community string from netconf read snmp community string from netconf
derive active router list from junosspace derive active router list from junosspace
cache ix public & vpn rr peers cache ix public & vpn rr peers
use external logging config file
...@@ -38,6 +38,9 @@ def create_app(): ...@@ -38,6 +38,9 @@ def create_app():
if "SETTINGS_FILENAME" not in os.environ: if "SETTINGS_FILENAME" not in os.environ:
assert False, \ assert False, \
"environment variable SETTINGS_FILENAME' must be defined" "environment variable SETTINGS_FILENAME' must be defined"
logging.info("initializing Flask with config from: %r"
% os.environ["SETTINGS_FILENAME"])
app.config.from_envvar("SETTINGS_FILENAME") app.config.from_envvar("SETTINGS_FILENAME")
assert "INVENTORY_PROVIDER_CONFIG_FILENAME" in app.config, ( assert "INVENTORY_PROVIDER_CONFIG_FILENAME" in app.config, (
...@@ -51,8 +54,10 @@ def create_app(): ...@@ -51,8 +54,10 @@ def create_app():
from inventory_provider import config from inventory_provider import config
with open(app.config["INVENTORY_PROVIDER_CONFIG_FILENAME"]) as f: with open(app.config["INVENTORY_PROVIDER_CONFIG_FILENAME"]) as f:
# test the config file can be loaded # test the config file can be loaded
logging.info("loading config from: %r"
% app.config["INVENTORY_PROVIDER_CONFIG_FILENAME"])
app.config["INVENTORY_PROVIDER_CONFIG"] = config.load(f) app.config["INVENTORY_PROVIDER_CONFIG"] = config.load(f)
logging.debug(app.config) logging.info('Inventory Provider Flask app initialized')
return app return app
SNMP_LOGGER_NAME = "snmp-logger" SNMP_LOGGER_NAME = "inventory_provider.snmp"
JUNIPER_LOGGER_NAME = "juniper-logger" JUNIPER_LOGGER_NAME = "inventory_provider.juniper"
DATABASE_LOGGER_NAME = "database-logger" DATABASE_LOGGER_NAME = "inventory_provider.database"
TASK_LOGGER_NAME = "task-logger" TASK_LOGGER_NAME = "inventory_provider.task"
import logging import json
import logging.config
import os import os
import sys
from inventory_provider import constants
def _level_from_env(var_name, default_level=logging.INFO):
level_str = os.getenv(var_name, logging.getLevelName(default_level))
numeric_level = getattr(logging, level_str.upper(), default_level)
logging.debug('setting %s logging level to %s'
% (var_name, logging.getLevelName(numeric_level)))
return numeric_level
def setup_logging(): def setup_logging():
logging.basicConfig( """
stream=sys.stderr, set up logging using the configured filename
level=_level_from_env('DEFAULT_LOGGING', logging.INFO))
logging.getLogger(constants.SNMP_LOGGER_NAME).setLevel( if LOGGING_CONFIG is defined in the environment, use this for
_level_from_env('SNMP_LOGGING', logging.INFO)) the filename, otherwise use logging_default_config.json
logging.getLogger(constants.TASK_LOGGER_NAME).setLevel( """
_level_from_env('TASK_LOGGING', logging.INFO)) default_filename = os.path.join(
logging.getLogger(constants.JUNIPER_LOGGER_NAME).setLevel( os.path.dirname(__file__),
_level_from_env('JUNIPER_LOGGING', logging.INFO)) 'logging_default_config.json')
logging.getLogger(constants.DATABASE_LOGGER_NAME).setLevel( filename = os.getenv('LOGGING_CONFIG', default_filename)
_level_from_env('DATABASE_LOGGING', logging.INFO)) with open(filename) as f:
# TODO: this mac workaround should be removed ...
d = json.loads(f.read())
import platform
if platform.system() == 'Darwin':
d['handlers']['syslog_handler']['address'] = '/var/run/syslog'
logging.config.dictConfig(d)
# logging.config.dictConfig(json.loads(f.read()))
{
"version": 1,
"disable_existing_loggers": false,
"formatters": {
"simple": {
"format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
}
},
"handlers": {
"console": {
"class": "logging.StreamHandler",
"level": "DEBUG",
"formatter": "simple",
"stream": "ext://sys.stdout"
},
"syslog_handler": {
"class": "logging.handlers.SysLogHandler",
"level": "DEBUG",
"address": "/dev/log",
"facility": "user",
"formatter": "simple"
},
"info_file_handler": {
"class": "logging.handlers.RotatingFileHandler",
"level": "INFO",
"formatter": "simple",
"filename": "info.log",
"maxBytes": 10485760,
"backupCount": 20,
"encoding": "utf8"
},
"error_file_handler": {
"class": "logging.handlers.RotatingFileHandler",
"level": "ERROR",
"formatter": "simple",
"filename": "errors.log",
"maxBytes": 10485760,
"backupCount": 20,
"encoding": "utf8"
}
},
"loggers": {
"inventory_provider": {
"level": "DEBUG",
"handlers": ["console", "syslog_handler"],
"propagate": false
},
"inventory_provider.snmp": {
"level": "INFO"
},
"inventory_provider.juniper": {
"level": "INFO"
},
"inventory_provider.database": {
"level": "INFO"
},
"inventory_provider.task": {
"level": "DEBUG"
},
"celery.app.trace": {
"level": "INFO",
"handlers": ["console", "syslog_handler"],
"propagate": false
}
},
"root": {
"level": "WARNING",
"handlers": ["console", "syslog_handler"]
}
}
\ No newline at end of file
from flask import Blueprint, Response, current_app from flask import Blueprint, Response, current_app, jsonify
from inventory_provider.tasks import worker from inventory_provider.tasks import worker
routes = Blueprint("inventory-data-job-routes", __name__) routes = Blueprint("inventory-data-job-routes", __name__)
...@@ -6,9 +6,9 @@ routes = Blueprint("inventory-data-job-routes", __name__) ...@@ -6,9 +6,9 @@ routes = Blueprint("inventory-data-job-routes", __name__)
@routes.route("/update", methods=['GET', 'POST']) @routes.route("/update", methods=['GET', 'POST'])
def update(): def update():
worker.start_refresh_cache_all( job_ids = worker.launch_refresh_cache_all(
current_app.config["INVENTORY_PROVIDER_CONFIG"]) current_app.config["INVENTORY_PROVIDER_CONFIG"])
return Response("OK") return jsonify(job_ids)
@routes.route("update-interface-statuses") @routes.route("update-interface-statuses")
...@@ -19,5 +19,10 @@ def update_interface_statuses(): ...@@ -19,5 +19,10 @@ def update_interface_statuses():
@routes.route("reload-router-config/<equipment_name>") @routes.route("reload-router-config/<equipment_name>")
def reload_router_config(equipment_name): def reload_router_config(equipment_name):
worker.reload_router_config.delay(equipment_name) result = worker.reload_router_config.delay(equipment_name)
return Response("OK") return jsonify([result.id])
@routes.route("check-task-status/<task_id>")
def check_task_status(task_id):
return jsonify(worker.check_task_status(task_id))
...@@ -6,3 +6,4 @@ broker_url = getenv( ...@@ -6,3 +6,4 @@ broker_url = getenv(
result_backend = getenv( result_backend = getenv(
'CELERY_BROKER_URL', 'CELERY_BROKER_URL',
default='redis://test-dashboard02.geant.org:6379/1') default='redis://test-dashboard02.geant.org:6379/1')
task_eager_propagates = True
...@@ -2,7 +2,8 @@ import json ...@@ -2,7 +2,8 @@ import json
import logging import logging
import re import re
from celery import bootsteps, Task, group from celery import bootsteps, Task, group, states
from celery.result import AsyncResult
from collections import defaultdict from collections import defaultdict
from lxml import etree from lxml import etree
...@@ -21,52 +22,52 @@ from inventory_provider import juniper ...@@ -21,52 +22,52 @@ from inventory_provider import juniper
environment.setup_logging() environment.setup_logging()
class InventoryTaskError(Exception):
pass
class InventoryTask(Task): class InventoryTask(Task):
config = None config = None
# logger = None
def __init__(self): def __init__(self):
pass pass
# @staticmethod def update_state(self, **kwargs):
# def save_key(hostname, key, value): task_logger = logging.getLogger(constants.TASK_LOGGER_NAME)
# assert isinstance(value, str), \ task_logger.debug(json.dumps(
# "sanity failure: expected string data as value" {'state': kwargs['state'], 'meta': kwargs['meta']}
# r = get_redis(InventoryTask.config) ))
# r.hset( super().update_state(**kwargs)
# name=hostname,
# key=key,
# value=value) def _save_value(key, value):
# InventoryTask.logger.debug( assert isinstance(value, str), \
# "saved %s, key %s" % (hostname, key)) "sanity failure: expected string data as value"
# return "OK" r = get_redis(InventoryTask.config)
r.set(name=key, value=value)
@staticmethod # InventoryTask.logger.debug("saved %s" % key)
def save_value(key, value): return "OK"
assert isinstance(value, str), \
"sanity failure: expected string data as value"
r = get_redis(InventoryTask.config)
r.set(name=key, value=value)
# InventoryTask.logger.debug("saved %s" % key)
return "OK"
@staticmethod
def save_value_json(key, data_obj):
InventoryTask.save_value(
key,
json.dumps(data_obj))
@staticmethod def _save_value_json(key, data_obj):
def save_value_etree(key, xml_doc): _save_value(
InventoryTask.save_value( key,
key, json.dumps(data_obj))
etree.tostring(xml_doc, encoding='unicode'))
def _save_value_etree(key, xml_doc):
_save_value(
key,
etree.tostring(xml_doc, encoding='unicode'))
class WorkerArgs(bootsteps.Step): class WorkerArgs(bootsteps.Step):
def __init__(self, worker, config_filename, **options): def __init__(self, worker, config_filename, **options):
with open(config_filename) as f: with open(config_filename) as f:
task_logger = logging.getLogger(constants.TASK_LOGGER_NAME)
task_logger.info(
"Initializing worker with config from: %r" % config_filename)
InventoryTask.config = config.load(f) InventoryTask.config = config.load(f)
...@@ -89,7 +90,7 @@ def snmp_refresh_interfaces(hostname, community): ...@@ -89,7 +90,7 @@ def snmp_refresh_interfaces(hostname, community):
task_logger.debug( task_logger.debug(
'>>> snmp_refresh_interfaces(%r, %r)' % (hostname, community)) '>>> snmp_refresh_interfaces(%r, %r)' % (hostname, community))
InventoryTask.save_value_json( _save_value_json(
'snmp-interfaces:' + hostname, 'snmp-interfaces:' + hostname,
list(snmp.get_router_interfaces( list(snmp.get_router_interfaces(
hostname, hostname,
...@@ -105,7 +106,7 @@ def netconf_refresh_config(hostname): ...@@ -105,7 +106,7 @@ def netconf_refresh_config(hostname):
task_logger = logging.getLogger(constants.TASK_LOGGER_NAME) task_logger = logging.getLogger(constants.TASK_LOGGER_NAME)
task_logger.debug('>>> netconf_refresh_config(%r)' % hostname) task_logger.debug('>>> netconf_refresh_config(%r)' % hostname)
InventoryTask.save_value_etree( _save_value_etree(
'netconf:' + hostname, 'netconf:' + hostname,
juniper.load_config(hostname, InventoryTask.config["ssh"])) juniper.load_config(hostname, InventoryTask.config["ssh"]))
...@@ -195,25 +196,49 @@ def update_interface_statuses(): ...@@ -195,25 +196,49 @@ def update_interface_statuses():
csr, csr,
service["equipment"], service["equipment"],
service["interface_name"]) service["interface_name"])
InventoryTask.save_value(key, status) _save_value(key, status)
task_logger.debug('<<< update_interface_statuses') task_logger.debug('<<< update_interface_statuses')
@app.task @app.task(base=InventoryTask, bind=True)
def update_junosspace_device_list(): def update_junosspace_device_list(self):
task_logger = logging.getLogger(constants.TASK_LOGGER_NAME) task_logger = logging.getLogger(constants.TASK_LOGGER_NAME)
task_logger.debug('>>> update_junosspace_device_list') task_logger.debug('>>> update_junosspace_device_list')
self.update_state(
state=states.STARTED,
meta={
'task': 'update_junosspace_device_list',
'message': 'querying junosspace for managed routers'
})
r = get_redis(InventoryTask.config) r = get_redis(InventoryTask.config)
routers = {}
for d in juniper.load_routers_from_junosspace( for d in juniper.load_routers_from_junosspace(
InventoryTask.config["junosspace"]): InventoryTask.config['junosspace']):
r.set( routers['junosspace:' + d['hostname']] = json.dumps(d).endode('utf-8')
'junosspace:' + d['hostname'],
json.dumps(d).encode('utf-8')) self.update_state(
state=states.STARTED,
meta={
'task': 'update_junosspace_device_list',
'message': 'found %d routers, saving details' % len(routers)
})
for k in r.keys('junosspace:*'):
r.delete(k)
for k, v in routers.items():
r.set(k, v)
task_logger.debug('<<< update_junosspace_device_list') task_logger.debug('<<< update_junosspace_device_list')
return {
'task': 'update_junosspace_device_list',
'message': 'saved %d managed routers' % len(routers)
}
def load_netconf_data(hostname): def load_netconf_data(hostname):
""" """
...@@ -274,32 +299,60 @@ def refresh_vpn_rr_peers(hostname, netconf): ...@@ -274,32 +299,60 @@ def refresh_vpn_rr_peers(hostname, netconf):
juniper.vpn_rr_peers(netconf)) juniper.vpn_rr_peers(netconf))
@app.task @app.task(base=InventoryTask, bind=True)
def reload_router_config(hostname): def reload_router_config(self, hostname):
task_logger = logging.getLogger(constants.TASK_LOGGER_NAME) task_logger = logging.getLogger(constants.TASK_LOGGER_NAME)
task_logger.debug('>>> update_router_config') task_logger.debug('>>> reload_router_config')
self.update_state(
state=states.STARTED,
meta={
'task': 'reload_router_config',
'hostname': hostname,
'message': 'loading router netconf data'
})
netconf_refresh_config.apply(args=[hostname]) netconf_refresh_config.apply(args=[hostname])
netconf_doc = load_netconf_data(hostname) netconf_doc = load_netconf_data(hostname)
if netconf_doc is None: if netconf_doc is None:
task_logger.error('no netconf data available for %r' % hostname) raise InventoryTaskError(
'no netconf data available for %r' % hostname)
else: else:
self.update_state(
state=states.STARTED,
meta={
'task': 'reload_router_config',
'hostname': hostname,
'message': 'refreshing peers'
})
refresh_ix_public_peers(hostname, netconf_doc) refresh_ix_public_peers(hostname, netconf_doc)
refresh_vpn_rr_peers(hostname, netconf_doc) refresh_vpn_rr_peers(hostname, netconf_doc)
community = juniper.snmp_community_string(netconf_doc) community = juniper.snmp_community_string(netconf_doc)
if not community: if not community:
task_logger.error( raise InventoryTaskError(
'error extracting community string for %r' % hostname) 'error extracting community string for %r' % hostname)
else: else:
self.update_state(
state=states.STARTED,
meta={
'task': 'reload_router_config',
'hostname': hostname,
'message': 'refreshing snmp interface indexes'
})
snmp_refresh_interfaces.apply(args=[hostname, community]) snmp_refresh_interfaces.apply(args=[hostname, community])
# TODO: move this out of else? (i.e. clear even if netconf fails?) # TODO: move this out of else? (i.e. clear even if netconf fails?)
clear_cached_classifier_responses(hostname) clear_cached_classifier_responses(hostname)
task_logger.debug('<<< update_router_config') task_logger.debug('<<< reload_router_config')
return {
'task': 'reload_router_config',
'hostname': hostname,
'message': 'OK'
}
def _derive_router_hostnames(config): def _derive_router_hostnames(config):
...@@ -320,7 +373,7 @@ def _derive_router_hostnames(config): ...@@ -320,7 +373,7 @@ def _derive_router_hostnames(config):
return junosspace_equipment & opsdb_equipment return junosspace_equipment & opsdb_equipment
def start_refresh_cache_all(config): def launch_refresh_cache_all(config):
""" """
utility function intended to be called outside of the worker process utility function intended to be called outside of the worker process
:param config: config structure as defined in config.py :param config: config structure as defined in config.py
...@@ -350,4 +403,16 @@ def start_refresh_cache_all(config): ...@@ -350,4 +403,16 @@ def start_refresh_cache_all(config):
'queueing router refresh jobs for %r' % hostname) 'queueing router refresh jobs for %r' % hostname)
subtasks.append(reload_router_config.s(hostname)) subtasks.append(reload_router_config.s(hostname))
return group(subtasks).apply_async() return [r.id for r in group(subtasks).apply_async()]
def check_task_status(task_id):
r = AsyncResult(task_id, app=app)
return {
'id': task_id,
'status': r.status,
'exception': r.status in states.EXCEPTION_STATES,
'ready': r.status in states.READY_STATES,
'success': r.status == states.SUCCESS,
'result': r.result
}
...@@ -133,9 +133,11 @@ def test_reload_router_config(mocked_worker_module, router, mocker): ...@@ -133,9 +133,11 @@ def test_reload_router_config(mocked_worker_module, router, mocker):
'inventory_provider.tasks.worker.snmp_refresh_interfaces.apply', 'inventory_provider.tasks.worker.snmp_refresh_interfaces.apply',
_mocked_snmp_refresh_interfaces_apply) _mocked_snmp_refresh_interfaces_apply)
def _mocked_update_status(self, **kwargs):
pass
mocker.patch( mocker.patch(
'inventory_provider.tasks.worker.snmp_refresh_interfaces.apply', 'inventory_provider.tasks.worker.InventoryTask.update_state',
_mocked_snmp_refresh_interfaces_apply) _mocked_update_status)
worker.reload_router_config(router) worker.reload_router_config(router)
assert 'netconf:' + router in MockedRedis.db assert 'netconf:' + router in MockedRedis.db
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment