Skip to content
Snippets Groups Projects
Commit 293bdbf5 authored by Erik Reid's avatar Erik Reid
Browse files

Merge branch 'use-events-api' into feature/DBOARD3-242-use-cached-netconf-snmp

parents 1f34f1d8 13044021
No related branches found
Tags 0.19
No related merge requests found
......@@ -115,7 +115,7 @@ def _rpc(hostname, ssh):
ssh_private_key_file=ssh['private-key'])
try:
dev.open()
except EzErrors.ConnectError as e:
except [EzErrors.ConnectError, EzErrors.RpcError] as e:
raise ConnectionError(str(e))
return dev.rpc
......
......@@ -203,9 +203,3 @@ def get_next_redis(config):
'derived next id: {}'.format(next_id))
return _get_redis(config, next_id)
def save_task_log(config, log):
jsonschema.validate(log, TASK_LOG_SCHEMA)
r = get_current_redis(config)
r.set(f'db:log:{log["meta"]["id"]}', json.dumps(log))
......@@ -47,3 +47,4 @@ logger.debug('broker_url: %r' % broker_url)
task_eager_propagates = True
task_track_started = True
worker_send_task_events = True
"""
standalone process that monitors celery task events and
writes them to redis for reporting
as with the other processes, INVENTORY_PROVIDER_CONFIG_FILENAME
must be defined in the environment
"""
import json
import logging
import os
from inventory_provider import config, environment
from inventory_provider.tasks.worker import app
from inventory_provider.tasks.common import get_current_redis
logger = logging.getLogger(__name__)
INFO_EVENT_TYPES = ('task-info', 'task-warning', 'task-error')
def run():
"""
save 'task-*' events to redis, never returns
"""
environment.setup_logging()
with open(os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME']) as f:
logging.info(
'loading config from: %r'
% os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME'])
config_params = config.load(f)
state = app.events.State()
def _log_event(event):
state.event(event)
if not event['type'].startswith('task-'):
return
key = f'joblog:{event["uuid"]}:{event["type"]}'
if event['type'] in INFO_EVENT_TYPES:
key += f':{event["clock"]}'
r = get_current_redis(config_params)
r.set(key, json.dumps(event))
logger.debug(f'{key}: {json.dumps(event)}')
with app.connection() as connection:
recv = app.events.Receiver(connection, handlers={
'*': _log_event
})
recv.capture(limit=None, timeout=None, wakeup=True)
def clear_joblog(r):
"""
:param r:
:return:
"""
rp = r.pipeline()
for key in r.scan_iter('joblog:*'):
rp.delete(key)
rp.execute()
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
run()
......@@ -12,7 +12,7 @@ import jsonschema
from inventory_provider.tasks.app import app
from inventory_provider.tasks.common \
import get_next_redis, get_current_redis, save_task_log, \
import get_next_redis, get_current_redis, \
latch_db, get_latch, set_latch, update_latch_status
from inventory_provider.tasks import data
from inventory_provider import config
......@@ -53,7 +53,6 @@ class InventoryTask(Task):
def __init__(self):
self.args = []
self.messages = []
if InventoryTask.config:
return
......@@ -70,75 +69,17 @@ class InventoryTask(Task):
InventoryTask.config = config.load(f)
logging.debug("loaded config: %r" % InventoryTask.config)
def _append_to_log(self, level, message):
info = {
'message': message,
'level': level
}
logger.debug(json.dumps(info))
self.messages.append(info)
def _log(self, level, state, message):
self._append_to_log(level, message)
super().update_state(state=state, meta={'message': message})
def log_info(self, message, state=states.STARTED):
self._log('INFO', state, message)
def log_warning(self, message, state=states.STARTED):
self._log('WARNING', state, message)
def log_error(self, message, state=states.STARTED):
self._log('ERROR', state, message)
# def update_state(self, **kwargs):
# meta = {'message': 'none', 'level': 'INFO'}
# meta.update(kwargs.setdefault('meta', dict()))
# meta['task'] = self.name
# meta['worker'] = self.request.hostname
# meta['pid'] = self.pid
# logger.debug(json.dumps(
# {'state': kwargs['state'], 'meta': str(meta)}
# ))
# super().update_state(**kwargs)
def on_failure(self, exc, task_id, args, kwargs, einfo):
self._append_to_log('ERROR', str(exc))
save_task_log(
InventoryTask.config,
{'messages': self.messages, 'meta': self._meta(args=args)})
super().on_failure(exc, task_id, args, kwargs, einfo)
def on_success(self, retval, task_id, args, kwargs):
self._append_to_log('INFO', 'DONE')
save_task_log(
InventoryTask.config,
{'messages': self.messages, 'meta': self._meta(args=args)})
super().on_success(retval, task_id, args, kwargs)
def _meta(self, message=None, args=None):
meta_struct = {
'task': self.name,
'id': self.request.id,
'worker': self.request.hostname,
'pid': os.getpid(),
'warning': 'WARNING' in [m['level'] for m in self.messages],
'error': 'ERROR' in [m['level'] for m in self.messages],
}
if message:
meta_struct['message'] = message
if args is not None:
meta_struct['args'] = args
return meta_struct
def return_value(self, message='OK'):
"""
common method for constructing a standard task return value
:param message: text message to include in return value
:return: a serializable dict
"""
self.log_info(message)
return self._meta(message=message)
def log_info(self, message):
logger.debug(message)
self.send_event('task-info', message=message)
def log_warning(self, message):
logger.warning(message)
self.send_event('task-warning', message=message)
def log_error(self, message):
logger.error(message)
self.send_event('task-error', message=message)
@app.task(base=InventoryTask, bind=True, name='snmp_refresh_interfaces')
......@@ -162,7 +103,7 @@ def snmp_refresh_interfaces(self, hostname, community):
r = get_next_redis(InventoryTask.config)
r.set(f'snmp-interfaces:{hostname}', json.dumps(value))
return self.return_value(message=f'snmp info loaded from {hostname}')
self.log_info(f'snmp info loaded from {hostname}')
@app.task(base=InventoryTask, bind=True, name='netconf_refresh_config')
......@@ -186,7 +127,7 @@ def netconf_refresh_config(self, hostname):
r = get_next_redis(InventoryTask.config)
r.set(f'netconf:{hostname}', netconf_str)
return self.return_value(message=f'netconf info loaded from {hostname}')
self.log_info(f'netconf info loaded from {hostname}')
@app.task(base=InventoryTask, bind=True, name='update_interfaces_to_services')
......@@ -212,8 +153,6 @@ def update_interfaces_to_services(self):
json.dumps(services))
rp.execute()
return self.return_value()
@app.task(base=InventoryTask, bind=True, name='import_unmanaged_interfaces')
@log_task_entry_and_exit
......@@ -246,8 +185,6 @@ def import_unmanaged_interfaces(self):
json.dumps([ifc]))
rp.execute()
return self.return_value()
@app.task(base=InventoryTask, bind=True, name='update_access_services')
@log_task_entry_and_exit
......@@ -277,8 +214,6 @@ def update_access_services(self):
json.dumps(service))
rp.execute()
return self.return_value()
@app.task(base=InventoryTask, bind=True, name='update_lg_routers')
@log_task_entry_and_exit
......@@ -296,8 +231,6 @@ def update_lg_routers(self):
rp.set(f'opsdb:lg:{router["equipment name"]}', json.dumps(router))
rp.execute()
return self.return_value()
@app.task(base=InventoryTask, bind=True, name='update_equipment_locations')
@log_task_entry_and_exit
......@@ -317,8 +250,6 @@ def update_equipment_locations(self):
rp.set('opsdb:location:%s' % h, json.dumps(locations))
rp.execute()
return self.return_value()
@app.task(base=InventoryTask, bind=True, name='update_circuit_hierarchy')
@log_task_entry_and_exit
......@@ -349,8 +280,6 @@ def update_circuit_hierarchy(self):
rp.set('opsdb:services:children:%d' % cid, json.dumps(children))
rp.execute()
return self.return_value()
@app.task(base=InventoryTask, bind=True, name='update_geant_lambdas')
@log_task_entry_and_exit
......@@ -370,8 +299,6 @@ def update_geant_lambdas(self):
json.dumps(ld))
rp.execute()
return self.return_value()
@app.task(base=InventoryTask, bind=True,
name='update_neteng_managed_device_list')
......@@ -386,8 +313,7 @@ def update_neteng_managed_device_list(self):
r = get_next_redis(InventoryTask.config)
r.set('netdash', json.dumps(routers).encode('utf-8'))
return self.return_value(f'saved {len(routers)} managed routers')
self.log_info(f'saved {len(routers)} managed routers')
def load_netconf_data(hostname):
......@@ -547,7 +473,8 @@ def reload_router_config(self, hostname):
if new_netconf_timestamp == current_netconf_timestamp:
msg = f'no timestamp change for {hostname} netconf data'
logger.debug(msg)
return self.return_value(msg)
self.log_info(msg)
return
# clear cached classifier responses for this router, and
# refresh peering data
......@@ -569,8 +496,7 @@ def reload_router_config(self, hostname):
snmp_refresh_interfaces.apply(args=[hostname, community])
clear_cached_classifier_responses(None)
return self.return_value(f'updated configuration for {hostname}')
self.log_info(f'updated configuration for {hostname}')
def _erase_next_db(config):
......@@ -612,8 +538,6 @@ def internal_refresh_phase_2(self):
t = refresh_finalizer.apply_async(args=[json.dumps(pending_task_ids)])
pending_task_ids.append(t.id)
return self.return_value()
@log_task_entry_and_exit
def launch_refresh_cache_all(config):
......@@ -709,8 +633,6 @@ def refresh_finalizer(self, pending_task_ids_json):
latch_db(InventoryTask.config)
self.log_info('latched current/next dbs')
return self.return_value()
@log_task_entry_and_exit
def _build_service_category_interface_list(update_callback=lambda s: None):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment