Skip to content
Snippets Groups Projects
Commit 82701463 authored by Erik Reid's avatar Erik Reid
Browse files

Finished feature DBOARD3-242-use-cached-netconf-snmp.

parents e5a59df0 085ab2f9
No related branches found
No related tags found
No related merge requests found
......@@ -115,7 +115,7 @@ def _rpc(hostname, ssh):
ssh_private_key_file=ssh['private-key'])
try:
dev.open()
except EzErrors.ConnectError as e:
except [EzErrors.ConnectError, EzErrors.RpcError] as e:
raise ConnectionError(str(e))
return dev.rpc
......
......@@ -10,6 +10,10 @@ from pysnmp.smi import builder, compiler
RFC1213_MIB_IFDESC = '1.3.6.1.2.1.2.2.1.2'
class SNMPWalkError(ConnectionError):
pass
def _v6address_oid2str(dotted_decimal):
hex_params = []
for dec in re.split(r'\.', dotted_decimal):
......@@ -61,15 +65,19 @@ def walk(agent_hostname, community, base_oid): # pragma: no cover
# cf. http://snmplabs.com/
# pysnmp/examples/hlapi/asyncore/sync/contents.html
assert not engineErrorIndication, (
f'snmp response engine error indication: '
f'{str(engineErrorIndication)} - {agent_hostname}')
assert not pduErrorIndication, 'snmp response pdu error %r at %r' % (
pduErrorIndication,
errorIndex and varBinds[int(errorIndex) - 1][0] or '?')
assert errorIndex == 0, (
'sanity failure: errorIndex != 0, '
'but no error indication')
if engineErrorIndication:
raise SNMPWalkError(
f'snmp response engine error indication: '
f'{str(engineErrorIndication)} - {agent_hostname}')
if pduErrorIndication:
raise SNMPWalkError(
'snmp response pdu error %r at %r' % (
pduErrorIndication,
errorIndex and varBinds[int(errorIndex) - 1][0] or '?'))
if errorIndex == 0:
raise SNMPWalkError(
'sanity failure: errorIndex != 0, '
'but no error indication')
# varBinds = [
# rfc1902.ObjectType(rfc1902.ObjectIdentity(x[0]),x[1])
......@@ -84,7 +92,7 @@ def walk(agent_hostname, community, base_oid): # pragma: no cover
def get_router_snmp_indexes(hostname, community):
for ifc in walk(hostname, community, RFC1213_MIB_IFDESC):
m = re.match(r'.*\.(\d+)$', ifc['oid'])
assert m, 'sanity failure parsing oid: %r' % ifc['oid']
assert m, f'sanity failure parsing oid: {ifc["oid"]}'
yield {
'name': ifc['value'],
'index': int(m.group(1))
......
......@@ -24,6 +24,48 @@ DB_LATCH_SCHEMA = {
"additionalProperties": False
}
TASK_LOG_SCHEMA = {
'$schema': 'http://json-schema.org/draft-07/schema#',
'definitions': {
'meta': {
'type': 'object',
'properties': {
'task': {'type': 'string'},
'id': {'type': 'string'},
'worker': {'type': 'string'},
'pid': {'type': 'integer'},
'warning': {'type': 'boolean'},
'error': {'type': 'boolean'},
'args': {'type': 'array'}
},
'required': ['task', 'id', 'worker', 'pid', 'warning', 'error'],
'additionalProperties': False
},
'message': {
'type': 'object',
'properties': {
'message': {'type': 'string'},
'level': {
'type': 'string',
'enum': ['INFO', 'WARNING', 'ERROR']
}
}
}
},
'type': 'object',
'properties': {
'meta': {'$ref': '#/definitions/meta'},
'messages': {
'type': 'array',
'items': {'$ref': '#/definitions/message'}
}
},
'required': ['meta', 'messages'],
'additionalProperties': False
}
def get_latch(r):
latch = r.get('db:latch')
......
......@@ -47,3 +47,4 @@ logger.debug('broker_url: %r' % broker_url)
task_eager_propagates = True
task_track_started = True
worker_send_task_events = True
"""
standalone process that monitors celery task events and
writes them to redis for reporting
as with the other processes, INVENTORY_PROVIDER_CONFIG_FILENAME
must be defined in the environment
"""
import json
import logging
import os
from inventory_provider import config, environment
from inventory_provider.tasks.worker import app
from inventory_provider.tasks.common import get_current_redis
logger = logging.getLogger(__name__)
INFO_EVENT_TYPES = ('task-info', 'task-warning', 'task-error')
def run():
"""
save 'task-*' events to redis, never returns
"""
environment.setup_logging()
with open(os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME']) as f:
logging.info(
'loading config from: %r'
% os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME'])
config_params = config.load(f)
state = app.events.State()
def _log_event(event):
state.event(event)
if not event['type'].startswith('task-'):
return
key = f'joblog:{event["uuid"]}:{event["type"]}'
if event['type'] in INFO_EVENT_TYPES:
key += f':{event["clock"]}'
r = get_current_redis(config_params)
r.set(key, json.dumps(event))
logger.debug(f'{key}: {json.dumps(event)}')
with app.connection() as connection:
recv = app.events.Receiver(connection, handlers={
'*': _log_event
})
recv.capture(limit=None, timeout=None, wakeup=True)
def clear_joblog(r):
"""
:param r:
:return:
"""
rp = r.pipeline()
for key in r.scan_iter('joblog:*'):
rp.delete(key)
rp.execute()
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
run()
......@@ -12,7 +12,8 @@ import jsonschema
from inventory_provider.tasks.app import app
from inventory_provider.tasks.common \
import get_next_redis, latch_db, get_latch, set_latch, update_latch_status
import get_next_redis, get_current_redis, \
latch_db, get_latch, set_latch, update_latch_status
from inventory_provider.tasks import data
from inventory_provider import config
from inventory_provider import environment
......@@ -51,7 +52,7 @@ class InventoryTask(Task):
def __init__(self):
self.pid = os.getpid()
self.args = []
if InventoryTask.config:
return
......@@ -68,62 +69,66 @@ class InventoryTask(Task):
InventoryTask.config = config.load(f)
logging.debug("loaded config: %r" % InventoryTask.config)
def update_state(self, **kwargs):
meta = kwargs.setdefault('meta', dict())
meta['task'] = self.name
meta['worker'] = self.request.hostname
meta['pid'] = self.pid
logger.debug(json.dumps(
{'state': kwargs['state'], 'meta': str(meta)}
))
super().update_state(**kwargs)
def on_failure(self, exc, task_id, args, kwargs, einfo):
logger.exception(exc)
super().on_failure(exc, task_id, args, kwargs, einfo)
def _task_return_value(self, warning, message):
"""
common method for constructing a standard task return value
:param warning: boolean (False for normal, warning-free response)
:param message: text message to include in return value
:return: a serializable dict
"""
return {
'task': self.name,
'id': self.request.id,
'worker': self.request.hostname,
'pid': self.pid,
'warning': warning,
'message': message
}
def log_info(self, message):
logger.debug(message)
self.send_event('task-info', message=message)
def success(self, message='OK'):
return self._task_return_value(warning=False, message=message)
def log_warning(self, message):
logger.warning(message)
self.send_event('task-warning', message=message)
def warning(self, message='WARNING'):
return self._task_return_value(warning=True, message=message)
def log_error(self, message):
logger.error(message)
self.send_event('task-error', message=message)
@app.task(base=InventoryTask, bind=True, name='snmp_refresh_interfaces')
@log_task_entry_and_exit
def snmp_refresh_interfaces(self, hostname, community):
# TODO: [DBOARD3-242] copy from current redis in case of error
value = list(snmp.get_router_snmp_indexes(hostname, community))
try:
value = list(snmp.get_router_snmp_indexes(hostname, community))
except ConnectionError:
msg = f'error loading snmp data from {hostname}'
logger.exception(msg)
self.log_warning(msg)
r = get_current_redis(InventoryTask.config)
value = r.get(f'snmp-interfaces:{hostname}')
if not value:
raise InventoryTaskError(
f'snmp error with {hostname}'
f' and no cached netconf data found')
# unnecessary json encode/decode here ... could be optimized
value = json.loads(value.decode('utf-8'))
self.log_warning(f'using cached snmp data for {hostname}')
r = get_next_redis(InventoryTask.config)
r.set('snmp-interfaces:' + hostname, json.dumps(value))
return self.success(message=f'snmp info loaded from {hostname}')
r.set(f'snmp-interfaces:{hostname}', json.dumps(value))
self.log_info(f'snmp info loaded from {hostname}')
@app.task(base=InventoryTask, bind=True, name='netconf_refresh_config')
@log_task_entry_and_exit
def netconf_refresh_config(self, hostname):
# TODO: [DBOARD3-242] copy from current redis in case of error
netconf_doc = juniper.load_config(hostname, InventoryTask.config["ssh"])
netconf_str = etree.tostring(netconf_doc, encoding='unicode')
try:
netconf_doc = juniper.load_config(
hostname, InventoryTask.config["ssh"])
netconf_str = etree.tostring(netconf_doc, encoding='unicode')
except ConnectionError:
msg = f'error loading netconf data from {hostname}'
logger.exception(msg)
self.log_warning(msg)
r = get_current_redis(InventoryTask.config)
netconf_str = r.get(f'netconf:{hostname}')
if not netconf_str:
raise InventoryTaskError(
f'netconf error with {hostname}'
f' and no cached netconf data found')
self.log_warning(f'using cached netconf data for {hostname}')
r = get_next_redis(InventoryTask.config)
r.set('netconf:' + hostname, netconf_str)
return self.success(message=f'netconf info loaded from {hostname}')
r.set(f'netconf:{hostname}', netconf_str)
self.log_info(f'netconf info loaded from {hostname}')
@app.task(base=InventoryTask, bind=True, name='update_interfaces_to_services')
......@@ -149,8 +154,6 @@ def update_interfaces_to_services(self):
json.dumps(services))
rp.execute()
return self.success()
@app.task(base=InventoryTask, bind=True, name='import_unmanaged_interfaces')
@log_task_entry_and_exit
......@@ -183,8 +186,6 @@ def import_unmanaged_interfaces(self):
json.dumps([ifc]))
rp.execute()
return self.success()
@app.task(base=InventoryTask, bind=True, name='update_access_services')
@log_task_entry_and_exit
......@@ -214,8 +215,6 @@ def update_access_services(self):
json.dumps(service))
rp.execute()
return self.success()
@app.task(base=InventoryTask, bind=True, name='update_lg_routers')
@log_task_entry_and_exit
......@@ -233,8 +232,6 @@ def update_lg_routers(self):
rp.set(f'opsdb:lg:{router["equipment name"]}', json.dumps(router))
rp.execute()
return self.success()
@app.task(base=InventoryTask, bind=True, name='update_equipment_locations')
@log_task_entry_and_exit
......@@ -254,8 +251,6 @@ def update_equipment_locations(self):
rp.set('opsdb:location:%s' % h, json.dumps(locations))
rp.execute()
return self.success()
@app.task(base=InventoryTask, bind=True, name='update_circuit_hierarchy')
@log_task_entry_and_exit
......@@ -286,8 +281,6 @@ def update_circuit_hierarchy(self):
rp.set('opsdb:services:children:%d' % cid, json.dumps(children))
rp.execute()
return self.success()
@app.task(base=InventoryTask, bind=True, name='update_geant_lambdas')
@log_task_entry_and_exit
......@@ -307,32 +300,21 @@ def update_geant_lambdas(self):
json.dumps(ld))
rp.execute()
return self.success()
@app.task(base=InventoryTask, bind=True,
name='update_neteng_managed_device_list')
@log_task_entry_and_exit
def update_neteng_managed_device_list(self):
self.update_state(
state=states.STARTED,
meta={
'message': 'querying netdash for managed routers'
})
self.log_info('querying netdash for managed routers')
routers = list(juniper.load_routers_from_netdash(
InventoryTask.config['managed-routers']))
self.update_state(
state=states.STARTED,
meta={
'message': f'found {len(routers)} routers, saving details'
})
self.log_info(f'found {len(routers)} routers, saving details')
r = get_next_redis(InventoryTask.config)
r.set('netdash', json.dumps(routers).encode('utf-8'))
return self.success(f'saved {len(routers)} managed routers')
self.log_info(f'saved {len(routers)} managed routers')
def load_netconf_data(hostname):
......@@ -467,12 +449,7 @@ def refresh_juniper_interface_list(hostname, netconf):
@app.task(base=InventoryTask, bind=True, name='reload_router_config')
@log_task_entry_and_exit
def reload_router_config(self, hostname):
self.update_state(
state=states.STARTED,
meta={
'hostname': hostname,
'message': f'loading netconf data for {hostname}'
})
self.log_info(f'loading netconf data for {hostname}')
# get the timestamp for the current netconf data
current_netconf_timestamp = None
......@@ -497,16 +474,12 @@ def reload_router_config(self, hostname):
if new_netconf_timestamp == current_netconf_timestamp:
msg = f'no timestamp change for {hostname} netconf data'
logger.debug(msg)
return self.success(msg)
self.log_info(msg)
return
# clear cached classifier responses for this router, and
# refresh peering data
self.update_state(
state=states.STARTED,
meta={
'hostname': hostname,
'message': f'refreshing peers & clearing cache for {hostname}'
})
self.log_info(f'refreshing peers & clearing cache for {hostname}')
refresh_ix_public_peers(hostname, netconf_doc)
refresh_vpn_rr_peers(hostname, netconf_doc)
refresh_interface_address_lookups(hostname, netconf_doc)
......@@ -519,18 +492,12 @@ def reload_router_config(self, hostname):
raise InventoryTaskError(
f'error extracting community string for {hostname}')
else:
self.update_state(
state=states.STARTED,
meta={
'hostname': hostname,
'message': f'refreshing snmp interface indexes for {hostname}'
})
self.log_info(f'refreshing snmp interface indexes for {hostname}')
# load snmp data, in this thread
snmp_refresh_interfaces.apply(args=[hostname, community])
clear_cached_classifier_responses(None)
return self.success(f'updated configuration for {hostname}')
self.log_info(f'updated configuration for {hostname}')
def _erase_next_db(config):
......@@ -572,8 +539,6 @@ def internal_refresh_phase_2(self):
t = refresh_finalizer.apply_async(args=[json.dumps(pending_task_ids)])
pending_task_ids.append(t.id)
return self.success()
@log_task_entry_and_exit
def launch_refresh_cache_all(config):
......@@ -651,22 +616,14 @@ def refresh_finalizer(self, pending_task_ids_json):
"items": {"type": "string"}
}
def _update(s):
logger.debug(s)
self.update_state(
state=states.STARTED,
meta={
'message': s
})
try:
task_ids = json.loads(pending_task_ids_json)
logger.debug('task_ids: %r' % task_ids)
jsonschema.validate(task_ids, input_schema)
_wait_for_tasks(task_ids, update_callback=_update)
_build_subnet_db(update_callback=_update)
_build_service_category_interface_list(update_callback=_update)
_wait_for_tasks(task_ids, update_callback=self.log_info)
_build_subnet_db(update_callback=self.log_info)
_build_service_category_interface_list(update_callback=self.log_info)
except (jsonschema.ValidationError,
json.JSONDecodeError,
......@@ -675,9 +632,7 @@ def refresh_finalizer(self, pending_task_ids_json):
raise e
latch_db(InventoryTask.config)
_update('latched current/next dbs')
return self.success()
self.log_info('latched current/next dbs')
@log_task_entry_and_exit
......
......@@ -209,6 +209,12 @@ def mocked_worker_module(
with open(data_config_filename) as f:
worker.InventoryTask.config = config.load(f)
def _mocked_send_event(*kargs, **kwargs):
pass
mocker.patch(
'inventory_provider.tasks.worker.InventoryTask.send_event',
_mocked_send_event)
def _mocked_snmp_interfaces(hostname, community):
return json.loads(cached_test_data['snmp-interfaces:' + hostname])
mocker.patch(
......
import contextlib
import json
import os
from unittest import mock
from inventory_provider.tasks import monitor
from inventory_provider.tasks.common import _get_redis
CONNECTION_FINGERPRINT = "SDF@#$@#"
TEST_MANAGEMENT_EVENTS = [
{'type': 'task-aaaa', 'uuid': 'AAAA', 'clock': 999},
{'type': 'task-infox', 'uuid': 'AAAA', 'clock': 999}
]
TEST_LOG_EVENTS = [
{'type': 'task-info', 'uuid': 'AAAA', 'clock': 99},
{'type': 'task-info', 'uuid': 'AAAA', 'clock': 999},
{'type': 'task-warning', 'uuid': 'AAAA', 'clock': 88},
{'type': 'task-warning', 'uuid': 'AAAA', 'clock': 888},
{'type': 'task-error', 'uuid': 'AAAA', 'clock': 77},
{'type': 'task-error', 'uuid': 'AAAA', 'clock': 777}
]
@contextlib.contextmanager
def mocked_connection():
yield CONNECTION_FINGERPRINT
class MockedState():
def __init__(self):
pass
def event(self, e):
pass
class MockedReceiver():
def __init__(self, connection, handlers):
assert connection == CONNECTION_FINGERPRINT
self.handlers = handlers
def capture(self, **kwargs):
# write test events to log, check in the test case
for e in TEST_MANAGEMENT_EVENTS + TEST_LOG_EVENTS:
self.handlers['*'](e)
def backend_db():
return _get_redis({
'redis': {
'hostname': None,
'port': None
},
'redis-databases': [0, 7]
}).db
@mock.patch('inventory_provider.tasks.monitor.app.events.State', MockedState)
@mock.patch(
'inventory_provider.tasks.monitor.app.connection', mocked_connection)
@mock.patch(
'inventory_provider.tasks.monitor.app.events.Receiver', MockedReceiver)
def test_latchdb(data_config_filename, mocked_redis):
os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME'] = data_config_filename
monitor.run()
db = backend_db()
for e in TEST_MANAGEMENT_EVENTS:
expected_key = f'joblog:{e["uuid"]}:{e["type"]}'
assert e == json.loads(db[expected_key])
for e in TEST_LOG_EVENTS:
expected_key = f'joblog:{e["uuid"]}:{e["type"]}:{e["clock"]}'
assert e == json.loads(db[expected_key])
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment