worker.py 15.48 KiB
import json
import logging
import os
import re
from celery import bootsteps, Task, states
from celery.result import AsyncResult
from collections import defaultdict
from lxml import etree
from inventory_provider.tasks.app import app
from inventory_provider.tasks.common import get_redis
from inventory_provider import config
from inventory_provider import environment
from inventory_provider.db import db, opsdb
from inventory_provider import snmp
from inventory_provider import juniper
# TODO: error callback (cf. http://docs.celeryproject.org/en/latest/userguide/calling.html#linking-callbacks-errbacks) # noqa: E501
environment.setup_logging()
class InventoryTaskError(Exception):
pass
class InventoryTask(Task):
config = None
def __init__(self):
pass
def update_state(self, **kwargs):
logger = logging.getLogger(__name__)
logger.debug(json.dumps(
{'state': kwargs['state'], 'meta': kwargs['meta']}
))
super().update_state(**kwargs)
def _save_value(key, value):
assert isinstance(value, str), \
"sanity failure: expected string data as value"
r = get_redis(InventoryTask.config)
r.set(name=key, value=value)
# InventoryTask.logger.debug("saved %s" % key)
return "OK"
def _save_value_json(key, data_obj):
_save_value(
key,
json.dumps(data_obj))
def _save_value_etree(key, xml_doc):
_save_value(
key,
etree.tostring(xml_doc, encoding='unicode'))
class LoadConfig(bootsteps.Step):
assert os.path.isfile(os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME']), (
'config file %r not found' %
os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME'])
with open(os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME']) as f:
logging.info(
"Initializing worker with config from: %r" %
os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME'])
InventoryTask.config = config.load(f)
app.steps['worker'].add(LoadConfig)
@app.task
def snmp_refresh_interfaces(hostname, community):
logger = logging.getLogger(__name__)
logger.debug(
'>>> snmp_refresh_interfaces(%r, %r)' % (hostname, community))
_save_value_json(
'snmp-interfaces:' + hostname,
list(snmp.get_router_snmp_indexes(
hostname,
community)))
logger.debug(
'<<< snmp_refresh_interfaces(%r, %r)' % (hostname, community))
@app.task
def netconf_refresh_config(hostname):
logger = logging.getLogger(__name__)
logger.debug('>>> netconf_refresh_config(%r)' % hostname)
_save_value_etree(
'netconf:' + hostname,
juniper.load_config(hostname, InventoryTask.config["ssh"]))
logger.debug('<<< netconf_refresh_config(%r)' % hostname)
@app.task
def update_interfaces_to_services():
logger = logging.getLogger(__name__)
logger.debug('>>> update_interfaces_to_services')
interface_services = defaultdict(list)
with db.connection(InventoryTask.config["ops-db"]) as cx:
for service in opsdb.get_circuits(cx):
equipment_interface = '%s:%s' % (
service['equipment'], service['interface_name'])
interface_services[equipment_interface].append(service)
r = get_redis(InventoryTask.config)
for key in r.scan_iter('opsdb:interface_services:*'):
r.delete(key)
for equipment_interface, services in interface_services.items():
r.set(
'opsdb:interface_services:' + equipment_interface,
json.dumps(services))
logger.debug('<<< update_interfaces_to_services')
@app.task
def update_equipment_locations():
logger = logging.getLogger(__name__)
logger.debug('>>> update_equipment_locations')
r = get_redis(InventoryTask.config)
for k in r.scan_iter('opsdb:location:*'):
r.delete(k)
with db.connection(InventoryTask.config["ops-db"]) as cx:
for h in _derive_router_hostnames(InventoryTask.config):
# lookup_pop_info returns a list of locations
# (there can sometimes be more than one match)
locations = list(opsdb.lookup_pop_info(cx, h))
r.set('opsdb:location:%s' % h, json.dumps(locations))
logger.debug('<<< update_equipment_locations')
@app.task
def update_circuit_hierarchy():
logger = logging.getLogger(__name__)
logger.debug('>>> update_circuit_hierarchy')
# TODO: integers are not JSON keys
with db.connection(InventoryTask.config["ops-db"]) as cx:
child_to_parents = defaultdict(list)
parent_to_children = defaultdict(list)
for relation in opsdb.get_circuit_hierarchy(cx):
parent_id = relation["parent_circuit_id"]
child_id = relation["child_circuit_id"]
parent_to_children[parent_id].append(relation)
child_to_parents[child_id].append(relation)
r = get_redis(InventoryTask.config)
for key in r.scan_iter('opsdb:services:parents:*'):
r.delete(key)
for cid, parents in child_to_parents.items():
r.set('opsdb:services:parents:%d' % cid, json.dumps(parents))
for key in r.scan_iter('opsdb:services:children:*'):
r.delete(key)
for cid, children in child_to_parents.items():
r.set('opsdb:services:children:%d' % cid, json.dumps(children))
logger.debug('<<< update_circuit_hierarchy')
@app.task
def update_geant_lambdas():
logger = logging.getLogger(__name__)
logger.debug('>>> update_geant_lambdas')
r = get_redis(InventoryTask.config)
for key in r.scan_iter('opsdb:geant_lambdas:*'):
r.delete(key)
with db.connection(InventoryTask.config["ops-db"]) as cx:
for ld in opsdb.get_geant_lambdas(cx):
r.set('opsdb:geant_lambdas:%s' % ld['name'].lower(),
json.dumps(ld))
logger.debug('<<< geant_lambdas')
@app.task(base=InventoryTask, bind=True)
def update_junosspace_device_list(self):
logger = logging.getLogger(__name__)
logger.debug('>>> update_junosspace_device_list')
self.update_state(
state=states.STARTED,
meta={
'task': 'update_junosspace_device_list',
'message': 'querying junosspace for managed routers'
})
r = get_redis(InventoryTask.config)
routers = {}
for d in juniper.load_routers_from_junosspace(
InventoryTask.config['junosspace']):
routers['junosspace:' + d['hostname']] = json.dumps(d).encode('utf-8')
self.update_state(
state=states.STARTED,
meta={
'task': 'update_junosspace_device_list',
'message': 'found %d routers, saving details' % len(routers)
})
for k in r.keys('junosspace:*'):
r.delete(k)
for k, v in routers.items():
r.set(k, v)
logger.debug('<<< update_junosspace_device_list')
return {
'task': 'update_junosspace_device_list',
'message': 'saved %d managed routers' % len(routers)
}
def load_netconf_data(hostname):
"""
this method should only be called from a task
:param hostname:
:return:
"""
r = get_redis(InventoryTask.config)
netconf = r.get('netconf:' + hostname)
if not netconf:
raise InventoryTaskError('no netconf data found for %r' % hostname)
return etree.fromstring(netconf.decode('utf-8'))
def clear_cached_classifier_responses(hostname=None):
logger = logging.getLogger(__name__)
if hostname:
logger.debug(
'removing cached classifier responses for %r' % hostname)
else:
logger.debug('removing all cached classifier responses')
r = get_redis(InventoryTask.config)
def _hostname_keys():
for k in r.keys('classifier-cache:juniper:%s:*' % hostname):
yield k
# TODO: very inefficient ... but logically simplest at this point
for k in r.keys('classifier-cache:peer:*'):
value = r.get(k.decode('utf-8'))
if not value:
# deleted in another thread
continue
value = json.loads(value.decode('utf-8'))
interfaces = value.get('interfaces', [])
if hostname in [i['interface']['router'] for i in interfaces]:
yield k
def _all_keys():
return r.keys('classifier-cache:*')
keys_to_delete = _hostname_keys() if hostname else _all_keys()
for k in keys_to_delete:
r.delete(k)
def _refresh_peers(hostname, key_base, peers):
logger = logging.getLogger(__name__)
logger.debug(
'removing cached %s for %r' % (key_base, hostname))
r = get_redis(InventoryTask.config)
for k in r.keys(key_base + ':*'):
# potential race condition: another proc could have
# delete this element between the time we read the
# keys and the next statement ... check for None below
value = r.get(k.decode('utf-8'))
if value:
value = json.loads(value.decode('utf-8'))
if value['router'] == hostname:
r.delete(k)
for peer in peers:
peer['router'] = hostname
r.set(
'%s:%s' % (key_base, peer['name']),
json.dumps(peer))
def refresh_ix_public_peers(hostname, netconf):
_refresh_peers(
hostname,
'ix_public_peer',
juniper.ix_public_peers(netconf))
def refresh_vpn_rr_peers(hostname, netconf):
_refresh_peers(
hostname,
'vpn_rr_peer',
juniper.vpn_rr_peers(netconf))
def refresh_interface_address_lookups(hostname, netconf):
_refresh_peers(
hostname,
'reverse_interface_addresses',
juniper.interface_addresses(netconf))
def refresh_juniper_interface_list(hostname, netconf):
logger = logging.getLogger(__name__)
logger.debug(
'removing cached netconf-interfaces for %r' % hostname)
r = get_redis(InventoryTask.config)
for k in r.keys('netconf-interfaces:%s:*' % hostname):
r.delete(k)
for k in r.keys('netconf-interface-bundles:%s:*' % hostname):
r.delete(k)
all_bundles = defaultdict(list)
for ifc in juniper.list_interfaces(netconf):
bundles = ifc.get('bundle', None)
for bundle in bundles:
if bundle:
all_bundles[bundle].append(ifc['name'])
r.set(
'netconf-interfaces:%s:%s' % (hostname, ifc['name']),
json.dumps(ifc))
for k, v in all_bundles.items():
r.set(
'netconf-interface-bundles:%s:%s' % (hostname, k),
json.dumps(v))
@app.task(base=InventoryTask, bind=True)
def reload_router_config(self, hostname):
logger = logging.getLogger(__name__)
logger.debug('>>> reload_router_config')
self.update_state(
state=states.STARTED,
meta={
'task': 'reload_router_config',
'hostname': hostname,
'message': 'loading router netconf data'
})
# get the timestamp for the current netconf data
current_netconf_timestamp = None
try:
netconf_doc = load_netconf_data(hostname)
current_netconf_timestamp \
= juniper.netconf_changed_timestamp(netconf_doc)
logger.debug(
'current netconf timestamp: %r' % current_netconf_timestamp)
except InventoryTaskError:
pass # ok at this point if not found
# load new netconf data
netconf_refresh_config.apply(args=[hostname])
netconf_doc = load_netconf_data(hostname)
# return if new timestamp is the same as the original timestamp
new_netconf_timestamp = juniper.netconf_changed_timestamp(netconf_doc)
assert new_netconf_timestamp, \
'no timestamp available for new netconf data'
if new_netconf_timestamp == current_netconf_timestamp:
logger.debug('no netconf change timestamp change, aborting')
logger.debug('<<< reload_router_config')
return {
'task': 'reload_router_config',
'hostname': hostname,
'message': 'OK (no change)'
}
# clear cached classifier responses for this router, and
# refresh peering data
self.update_state(
state=states.STARTED,
meta={
'task': 'reload_router_config',
'hostname': hostname,
'message': 'refreshing peers & clearing cache'
})
refresh_ix_public_peers(hostname, netconf_doc)
refresh_vpn_rr_peers(hostname, netconf_doc)
refresh_interface_address_lookups(hostname, netconf_doc)
refresh_juniper_interface_list(hostname, netconf_doc)
# clear_cached_classifier_responses(hostname)
# load snmp indexes
community = juniper.snmp_community_string(netconf_doc)
if not community:
raise InventoryTaskError(
'error extracting community string for %r' % hostname)
else:
self.update_state(
state=states.STARTED,
meta={
'task': 'reload_router_config',
'hostname': hostname,
'message': 'refreshing snmp interface indexes'
})
snmp_refresh_interfaces.apply(args=[hostname, community])
clear_cached_classifier_responses(None)
logger.debug('<<< reload_router_config')
return {
'task': 'reload_router_config',
'hostname': hostname,
'message': 'OK'
}
def _derive_router_hostnames(config):
logger = logging.getLogger(__name__)
r = get_redis(config)
junosspace_equipment = set()
for k in r.keys('junosspace:*'):
m = re.match('^junosspace:(.*)$', k.decode('utf-8'))
assert m
junosspace_equipment.add(m.group(1))
opsdb_equipment = set()
for k in r.keys('opsdb:interface_services:*'):
m = re.match(
'opsdb:interface_services:([^:]+):.*$',
k.decode('utf-8'))
if m:
opsdb_equipment.add(m.group(1))
else:
logger.info("Unable to derive router name from %s" %
k.decode('utf-8'))
return junosspace_equipment & opsdb_equipment
def launch_refresh_cache_all(config):
"""
utility function intended to be called outside of the worker process
:param config: config structure as defined in config.py
:return:
"""
logger = logging.getLogger(__name__)
# first batch of subtasks: refresh cached opsdb data
subtasks = [
update_junosspace_device_list.apply_async(),
update_interfaces_to_services.apply_async(),
update_geant_lambdas.apply_async(),
update_circuit_hierarchy.apply_async()
]
[x.get() for x in subtasks]
# second batch of subtasks:
# alarms db status cache
# juniper netconf & snmp data
subtasks = [
update_equipment_locations.apply_async(),
]
for hostname in _derive_router_hostnames(config):
logger.debug(
'queueing router refresh jobs for %r' % hostname)
subtasks.append(reload_router_config.apply_async(args=[hostname]))
[x.get() for x in subtasks]
return [x.id for x in subtasks]
def check_task_status(task_id):
r = AsyncResult(task_id, app=app)
result = {
'id': r.id,
'status': r.status,
'exception': r.status in states.EXCEPTION_STATES,
'ready': r.status in states.READY_STATES,
'success': r.status == states.SUCCESS,
}
if r.result:
# TODO: only discovered this case by testing, is this the only one?
# ... otherwise need to pre-test json serialization
if isinstance(r.result, Exception):
result['result'] = {
'error type': type(r.result).__name__,
'message': str(r.result)
}
else:
result['result'] = r.result
return result