Skip to content
Snippets Groups Projects
Commit 364ebb9b authored by Erik Reid's avatar Erik Reid
Browse files

Finished feature DBOARD3-244-add-structured-subtask-reponses.

parents 09734075 55b9d8ba
No related branches found
No related tags found
No related merge requests found
......@@ -8,7 +8,7 @@ CONFIG_SCHEMA = {
"definitions": {
"timeout": {
"type": "number",
"maximum": 10, # sanity
"maximum": 60, # sanity
"exclusiveMinimum": 0
},
"database-credentials": {
......
......@@ -35,7 +35,7 @@ def log_task_entry_and_exit(f):
def _w(*args, **kwargs):
logger.debug(f'>>> {f.__name__}{args}')
try:
return f(*args, *kwargs)
return f(*args, **kwargs)
finally:
logger.debug(f'<<< {f.__name__}{args}')
return _w
......@@ -51,6 +51,8 @@ class InventoryTask(Task):
def __init__(self):
self.pid = os.getpid()
if InventoryTask.config:
return
......@@ -69,6 +71,8 @@ class InventoryTask(Task):
def update_state(self, **kwargs):
meta = kwargs.setdefault('meta', dict())
meta['task'] = self.name
meta['worker'] = self.request.hostname
meta['pid'] = self.pid
logger.debug(json.dumps(
{'state': kwargs['state'], 'meta': str(meta)}
))
......@@ -78,22 +82,48 @@ class InventoryTask(Task):
logger.exception(exc)
super().on_failure(exc, task_id, args, kwargs, einfo)
def _task_return_value(self, warning, message):
"""
common method for constructing a standard task return value
:param warning: boolean (False for normal, warning-free response)
:param message: text message to include in return value
:return: a serializable dict
"""
return {
'task': self.name,
'id': self.request.id,
'worker': self.request.hostname,
'pid': self.pid,
'warning': warning,
'message': message
}
def success(self, message='OK'):
return self._task_return_value(warning=False, message=message)
def warning(self, message='WARNING'):
return self._task_return_value(warning=True, message=message)
@app.task(base=InventoryTask, bind=True, name='snmp_refresh_interfaces')
@log_task_entry_and_exit
def snmp_refresh_interfaces(self, hostname, community):
# TODO: [DBOARD3-242] copy from current redis in case of error
value = list(snmp.get_router_snmp_indexes(hostname, community))
r = get_next_redis(InventoryTask.config)
r.set('snmp-interfaces:' + hostname, json.dumps(value))
return self.success(message=f'snmp info loaded from {hostname}')
@app.task(base=InventoryTask, bind=True, name='netconf_refresh_config')
@log_task_entry_and_exit
def netconf_refresh_config(self, hostname):
# TODO: [DBOARD3-242] copy from current redis in case of error
netconf_doc = juniper.load_config(hostname, InventoryTask.config["ssh"])
netconf_str = etree.tostring(netconf_doc, encoding='unicode')
r = get_next_redis(InventoryTask.config)
r.set('netconf:' + hostname, netconf_str)
return self.success(message=f'netconf info loaded from {hostname}')
@app.task(base=InventoryTask, bind=True, name='update_interfaces_to_services')
......@@ -119,6 +149,8 @@ def update_interfaces_to_services(self):
json.dumps(services))
rp.execute()
return self.success()
@app.task(base=InventoryTask, bind=True, name='import_unmanaged_interfaces')
@log_task_entry_and_exit
......@@ -151,6 +183,8 @@ def import_unmanaged_interfaces(self):
json.dumps([ifc]))
rp.execute()
return self.success()
@app.task(base=InventoryTask, bind=True, name='update_access_services')
@log_task_entry_and_exit
......@@ -180,6 +214,8 @@ def update_access_services(self):
json.dumps(service))
rp.execute()
return self.success()
@app.task(base=InventoryTask, bind=True, name='update_lg_routers')
@log_task_entry_and_exit
......@@ -197,6 +233,8 @@ def update_lg_routers(self):
rp.set(f'opsdb:lg:{router["equipment name"]}', json.dumps(router))
rp.execute()
return self.success()
@app.task(base=InventoryTask, bind=True, name='update_equipment_locations')
@log_task_entry_and_exit
......@@ -216,6 +254,8 @@ def update_equipment_locations(self):
rp.set('opsdb:location:%s' % h, json.dumps(locations))
rp.execute()
return self.success()
@app.task(base=InventoryTask, bind=True, name='update_circuit_hierarchy')
@log_task_entry_and_exit
......@@ -246,6 +286,8 @@ def update_circuit_hierarchy(self):
rp.set('opsdb:services:children:%d' % cid, json.dumps(children))
rp.execute()
return self.success()
@app.task(base=InventoryTask, bind=True, name='update_geant_lambdas')
@log_task_entry_and_exit
......@@ -265,6 +307,8 @@ def update_geant_lambdas(self):
json.dumps(ld))
rp.execute()
return self.success()
@app.task(base=InventoryTask, bind=True,
name='update_neteng_managed_device_list')
......@@ -288,10 +332,7 @@ def update_neteng_managed_device_list(self):
r = get_next_redis(InventoryTask.config)
r.set('netdash', json.dumps(routers).encode('utf-8'))
return {
'task': 'update_neteng_managed_device_list',
'message': 'saved %d managed routers' % len(routers)
}
return self.success(f'saved {len(routers)} managed routers')
def load_netconf_data(hostname):
......@@ -368,6 +409,7 @@ def _refresh_peers(hostname, key_base, peers):
rp.execute()
@log_task_entry_and_exit
def refresh_ix_public_peers(hostname, netconf):
_refresh_peers(
hostname,
......@@ -375,6 +417,7 @@ def refresh_ix_public_peers(hostname, netconf):
juniper.ix_public_peers(netconf))
@log_task_entry_and_exit
def refresh_vpn_rr_peers(hostname, netconf):
_refresh_peers(
hostname,
......@@ -382,6 +425,7 @@ def refresh_vpn_rr_peers(hostname, netconf):
juniper.vpn_rr_peers(netconf))
@log_task_entry_and_exit
def refresh_interface_address_lookups(hostname, netconf):
_refresh_peers(
hostname,
......@@ -389,6 +433,7 @@ def refresh_interface_address_lookups(hostname, netconf):
juniper.interface_addresses(netconf))
@log_task_entry_and_exit
def refresh_juniper_interface_list(hostname, netconf):
logger.debug(
'removing cached netconf-interfaces for %r' % hostname)
......@@ -440,7 +485,7 @@ def reload_router_config(self, hostname):
except InventoryTaskError:
pass # ok at this point if not found
# load new netconf data
# load new netconf data, in this thread
netconf_refresh_config.apply(args=[hostname])
netconf_doc = load_netconf_data(hostname)
......@@ -451,11 +496,7 @@ def reload_router_config(self, hostname):
'no timestamp available for new netconf data'
if new_netconf_timestamp == current_netconf_timestamp:
logger.debug('no netconf change timestamp change, aborting')
return {
'task': 'reload_router_config',
'hostname': hostname,
'message': 'OK (no change)'
}
return self.success(f'no change (timestamp not updated)')
# clear cached classifier responses for this router, and
# refresh peering data
......@@ -487,11 +528,7 @@ def reload_router_config(self, hostname):
clear_cached_classifier_responses(None)
return {
'task': 'reload_router_config',
'hostname': hostname,
'message': 'OK'
}
return self.success(f'updated config for {hostname}')
def _erase_next_db(config):
......@@ -532,9 +569,11 @@ def internal_refresh_phase_2(self):
t = refresh_finalizer.apply_async(args=[json.dumps(pending_task_ids)])
pending_task_ids.append(t.id)
return pending_task_ids
return self.success()
@log_task_entry_and_exit
def launch_refresh_cache_all(config):
"""
utility function intended to be called outside of the worker process
......@@ -628,7 +667,10 @@ def refresh_finalizer(self, pending_task_ids_json):
latch_db(InventoryTask.config)
_update('latched current/next dbs')
return self.success()
@log_task_entry_and_exit
def _build_service_category_interface_list(update_callback=lambda s: None):
def _classify(ifc):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment