Skip to content
Snippets Groups Projects
Commit dbf3562b authored by Erik Reid's avatar Erik Reid
Browse files

added a layer for saving log messages to current db

parent 901923e6
No related branches found
No related tags found
No related merge requests found
......@@ -24,6 +24,46 @@ DB_LATCH_SCHEMA = {
"additionalProperties": False
}
TASK_LOG_SCHEMA = {
'$schema': 'http://json-schema.org/draft-07/schema#',
'definitions': {
'meta': {
'type': 'object',
'properties': {
'task': {'type': ['string', 'null']},
'id': {'type': ['string', 'null']},
'worker': {'type': ['string', 'null']},
'pid': {'type': 'integer'},
'warning': {'type': 'boolean'},
'error': {'type': 'boolean'}
},
'required': ['task', 'id', 'worker', 'pid', 'warning', 'error'],
'additionalProperties': False
},
'message': {
'type': 'object',
'properties': {
'message': {'type': 'string'},
'level': {
'type': 'string',
'enum': ['INFO', 'WARNING', 'ERROR']
}
}
}
},
'type': 'object',
'properties': {
'meta': {'$ref': '#/definitions/meta'},
'messages': {
'type': 'array',
'items': {'$ref': '#/definitions/message'}
}
},
'required': ['meta', 'messages'],
'additionalProperties': False
}
def get_latch(r):
latch = r.get('db:latch')
......@@ -162,3 +202,9 @@ def get_next_redis(config):
'derived next id: {}'.format(next_id))
return _get_redis(config, next_id)
def save_task_log(config, log):
jsonschema.validate(log, TASK_LOG_SCHEMA)
r = get_current_redis(config)
r.set(f'db:log:{log["meta"]["id"]}', json.dumps(log))
......@@ -12,7 +12,7 @@ import jsonschema
from inventory_provider.tasks.app import app
from inventory_provider.tasks.common \
import get_next_redis, get_current_redis, \
import get_next_redis, get_current_redis, save_task_log, \
latch_db, get_latch, set_latch, update_latch_status
from inventory_provider.tasks import data
from inventory_provider import config
......@@ -52,7 +52,8 @@ class InventoryTask(Task):
def __init__(self):
self.pid = os.getpid()
self.args = []
self.messages = []
if InventoryTask.config:
return
......@@ -69,52 +70,84 @@ class InventoryTask(Task):
InventoryTask.config = config.load(f)
logging.debug("loaded config: %r" % InventoryTask.config)
def update_state(self, **kwargs):
meta = kwargs.setdefault('meta', dict())
meta['task'] = self.name
meta['worker'] = self.request.hostname
meta['pid'] = self.pid
logger.debug(json.dumps(
{'state': kwargs['state'], 'meta': str(meta)}
))
super().update_state(**kwargs)
def _append_to_log(self, level, message):
info = {
'message': message,
'level': level
}
logger.debug(json.dumps(info))
self.messages.append(info)
def _log(self, level, state, message):
self._append_to_log(level, message)
super().update_state(state=state, meta={'message': message})
def log_info(self, message, state=states.STARTED):
self._log('INFO', state, message)
def log_warning(self, message, state=states.STARTED):
self._log('WARNING', state, message)
def log_error(self, message, state=states.STARTED):
self._log('ERROR', state, message)
# def update_state(self, **kwargs):
# meta = {'message': 'none', 'level': 'INFO'}
# meta.update(kwargs.setdefault('meta', dict()))
# meta['task'] = self.name
# meta['worker'] = self.request.hostname
# meta['pid'] = self.pid
# logger.debug(json.dumps(
# {'state': kwargs['state'], 'meta': str(meta)}
# ))
# super().update_state(**kwargs)
def on_failure(self, exc, task_id, args, kwargs, einfo):
logger.exception(exc)
self._append_to_log('ERROR', str(exc))
save_task_log(
InventoryTask.config,
{'messages': self.messages, 'meta': self._meta()})
super().on_failure(exc, task_id, args, kwargs, einfo)
def _task_return_value(self, warning, message):
"""
common method for constructing a standard task return value
:param warning: boolean (False for normal, warning-free response)
:param message: text message to include in return value
:return: a serializable dict
"""
return {
def on_success(self, retval, task_id, args, kwargs):
self._append_to_log('INFO', 'DONE')
save_task_log(
InventoryTask.config,
{'messages': self.messages, 'meta': self._meta()})
super().on_success(retval, task_id, args, kwargs)
def _meta(self, message = None):
meta_struct = {
'task': self.name,
'id': self.request.id,
'worker': self.request.hostname,
'pid': self.pid,
'warning': warning,
'message': message
'pid': os.getpid(),
'warning': 'WARNING' in [m['level'] for m in self.messages],
'error': 'ERROR' in [m['level'] for m in self.messages],
}
if message:
meta_struct['message'] = message
return meta_struct
def success(self, message='OK'):
return self._task_return_value(warning=False, message=message)
def warning(self, message='WARNING'):
return self._task_return_value(warning=True, message=message)
def return_value(self, message='OK'):
"""
common method for constructing a standard task return value
:param message: text message to include in return value
:return: a serializable dict
"""
self.log_info(message)
return self._meta(message=message)
@app.task(base=InventoryTask, bind=True, name='snmp_refresh_interfaces')
@log_task_entry_and_exit
def snmp_refresh_interfaces(self, hostname, community):
warning = False
try:
value = list(snmp.get_router_snmp_indexes(hostname, community))
except ConnectionError:
logger.exception(f'error loading snmp data from {hostname}')
warning = True
msg = f'error loading snmp data from {hostname}'
logger.exception(msg)
self.log_warning(msg)
r = get_current_redis(InventoryTask.config)
value = r.get(f'snmp-interfaces:{hostname}')
if not value:
......@@ -123,43 +156,35 @@ def snmp_refresh_interfaces(self, hostname, community):
f' and no cached netconf data found')
# unnecessary json encode/decode here ... could be optimized
value = json.loads(value.decode('utf-8'))
self.log_warning(f'using cached snmp data for {hostname}')
r = get_next_redis(InventoryTask.config)
r.set(f'snmp-interfaces:{hostname}', json.dumps(value))
if warning:
return self.warning(
message=f'i/o error with {hostname}, using cached snmp info')
else:
return self.success(message=f'snmp info loaded from {hostname}')
return self.return_value(message=f'snmp info loaded from {hostname}')
@app.task(base=InventoryTask, bind=True, name='netconf_refresh_config')
@log_task_entry_and_exit
def netconf_refresh_config(self, hostname):
warning = False
try:
netconf_doc = juniper.load_config(hostname, InventoryTask.config["ssh"])
netconf_str = etree.tostring(netconf_doc, encoding='unicode')
except ConnectionError:
logger.exception(f'error loading netconf data from {hostname}')
warning = True
msg = f'error loading netconf data from {hostname}'
logger.exception(msg)
self.log_warning(msg)
r = get_current_redis(InventoryTask.config)
netconf_str = r.get(f'netconf:{hostname}')
if not netconf_str:
raise InventoryTaskError(
f'netconf error with {hostname}'
f' and no cached netconf data found')
self.log_warning(f'using cached netconf data for {hostname}')
r = get_next_redis(InventoryTask.config)
r.set(f'netconf:{hostname}', netconf_str)
if warning:
return self.warning(
message=f'i/o error with {hostname}, using cached netconf info')
else:
return self.success(message=f'netconf info loaded from {hostname}')
return self.return_value(message=f'netconf info loaded from {hostname}')
@app.task(base=InventoryTask, bind=True, name='update_interfaces_to_services')
......@@ -185,7 +210,7 @@ def update_interfaces_to_services(self):
json.dumps(services))
rp.execute()
return self.success()
return self.return_value()
@app.task(base=InventoryTask, bind=True, name='import_unmanaged_interfaces')
......@@ -219,7 +244,7 @@ def import_unmanaged_interfaces(self):
json.dumps([ifc]))
rp.execute()
return self.success()
return self.return_value()
@app.task(base=InventoryTask, bind=True, name='update_access_services')
......@@ -250,7 +275,7 @@ def update_access_services(self):
json.dumps(service))
rp.execute()
return self.success()
return self.return_value()
@app.task(base=InventoryTask, bind=True, name='update_lg_routers')
......@@ -269,7 +294,7 @@ def update_lg_routers(self):
rp.set(f'opsdb:lg:{router["equipment name"]}', json.dumps(router))
rp.execute()
return self.success()
return self.return_value()
@app.task(base=InventoryTask, bind=True, name='update_equipment_locations')
......@@ -290,7 +315,7 @@ def update_equipment_locations(self):
rp.set('opsdb:location:%s' % h, json.dumps(locations))
rp.execute()
return self.success()
return self.return_value()
@app.task(base=InventoryTask, bind=True, name='update_circuit_hierarchy')
......@@ -322,7 +347,7 @@ def update_circuit_hierarchy(self):
rp.set('opsdb:services:children:%d' % cid, json.dumps(children))
rp.execute()
return self.success()
return self.return_value()
@app.task(base=InventoryTask, bind=True, name='update_geant_lambdas')
......@@ -343,32 +368,24 @@ def update_geant_lambdas(self):
json.dumps(ld))
rp.execute()
return self.success()
return self.return_value()
@app.task(base=InventoryTask, bind=True,
name='update_neteng_managed_device_list')
@log_task_entry_and_exit
def update_neteng_managed_device_list(self):
self.update_state(
state=states.STARTED,
meta={
'message': 'querying netdash for managed routers'
})
self.log_info('querying netdash for managed routers')
routers = list(juniper.load_routers_from_netdash(
InventoryTask.config['managed-routers']))
self.update_state(
state=states.STARTED,
meta={
'message': f'found {len(routers)} routers, saving details'
})
self.log_info(f'found {len(routers)} routers, saving details')
r = get_next_redis(InventoryTask.config)
r.set('netdash', json.dumps(routers).encode('utf-8'))
return self.success(f'saved {len(routers)} managed routers')
return self.return_value(f'saved {len(routers)} managed routers')
def load_netconf_data(hostname):
......@@ -503,12 +520,7 @@ def refresh_juniper_interface_list(hostname, netconf):
@app.task(base=InventoryTask, bind=True, name='reload_router_config')
@log_task_entry_and_exit
def reload_router_config(self, hostname):
self.update_state(
state=states.STARTED,
meta={
'hostname': hostname,
'message': f'loading netconf data for {hostname}'
})
self.log_info(f'loading netconf data for {hostname}')
# get the timestamp for the current netconf data
current_netconf_timestamp = None
......@@ -533,16 +545,11 @@ def reload_router_config(self, hostname):
if new_netconf_timestamp == current_netconf_timestamp:
msg = f'no timestamp change for {hostname} netconf data'
logger.debug(msg)
return self.success(msg)
return self.return_value(msg)
# clear cached classifier responses for this router, and
# refresh peering data
self.update_state(
state=states.STARTED,
meta={
'hostname': hostname,
'message': f'refreshing peers & clearing cache for {hostname}'
})
self.log_info(f'refreshing peers & clearing cache for {hostname}')
refresh_ix_public_peers(hostname, netconf_doc)
refresh_vpn_rr_peers(hostname, netconf_doc)
refresh_interface_address_lookups(hostname, netconf_doc)
......@@ -555,18 +562,13 @@ def reload_router_config(self, hostname):
raise InventoryTaskError(
f'error extracting community string for {hostname}')
else:
self.update_state(
state=states.STARTED,
meta={
'hostname': hostname,
'message': f'refreshing snmp interface indexes for {hostname}'
})
self.log_info(f'refreshing snmp interface indexes for {hostname}')
# load snmp data, in this thread
snmp_refresh_interfaces.apply(args=[hostname, community])
clear_cached_classifier_responses(None)
return self.success(f'updated configuration for {hostname}')
return self.return_value(f'updated configuration for {hostname}')
def _erase_next_db(config):
......@@ -608,7 +610,7 @@ def internal_refresh_phase_2(self):
t = refresh_finalizer.apply_async(args=[json.dumps(pending_task_ids)])
pending_task_ids.append(t.id)
return self.success()
return self.return_value()
@log_task_entry_and_exit
......@@ -687,22 +689,14 @@ def refresh_finalizer(self, pending_task_ids_json):
"items": {"type": "string"}
}
def _update(s):
logger.debug(s)
self.update_state(
state=states.STARTED,
meta={
'message': s
})
try:
task_ids = json.loads(pending_task_ids_json)
logger.debug('task_ids: %r' % task_ids)
jsonschema.validate(task_ids, input_schema)
_wait_for_tasks(task_ids, update_callback=_update)
_build_subnet_db(update_callback=_update)
_build_service_category_interface_list(update_callback=_update)
_wait_for_tasks(task_ids, update_callback=self.log_info)
_build_subnet_db(update_callback=self.log_info)
_build_service_category_interface_list(update_callback=self.log_info)
except (jsonschema.ValidationError,
json.JSONDecodeError,
......@@ -711,9 +705,9 @@ def refresh_finalizer(self, pending_task_ids_json):
raise e
latch_db(InventoryTask.config)
_update('latched current/next dbs')
self.log_info('latched current/next dbs')
return self.success()
return self.return_value()
@log_task_entry_and_exit
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment