worker.py 7.01 KiB
import json
import logging
from celery import bootsteps, Task
from collections import defaultdict
from lxml import etree
from inventory_provider.tasks.app import app
from inventory_provider.tasks.common import get_redis
from inventory_provider import config
from inventory_provider import constants
from inventory_provider import environment
from inventory_provider.db import db, opsdb, alarmsdb
from inventory_provider import snmp
from inventory_provider import juniper
environment.setup_logging()
class InventoryTask(Task):
config = None
logger = None
def __init__(self):
pass
@staticmethod
def save_key(hostname, key, value):
assert isinstance(value, str), \
"sanity failure: expected string data as value"
r = get_redis(InventoryTask.config)
r.hset(
name=hostname,
key=key,
value=value)
InventoryTask.logger.debug(
"saved %s, key %s" % (hostname, key))
return "OK"
@staticmethod
def save_value(key, value):
assert isinstance(value, str), \
"sanity failure: expected string data as value"
r = get_redis(InventoryTask.config)
r.set(
name=key,
value=value)
InventoryTask.logger.debug("saved %s" % key)
return "OK"
@staticmethod
def save_value_json(key, data_obj):
InventoryTask.save_value(
key,
json.dumps(data_obj))
@staticmethod
def save_key_json(hostname, key, data_obj):
InventoryTask.save_key(
hostname,
key,
json.dumps(data_obj))
@staticmethod
def save_key_etree(hostname, key, xml_doc):
InventoryTask.save_key(
hostname,
key,
etree.tostring(xml_doc, encoding='unicode'))
class WorkerArgs(bootsteps.Step):
def __init__(self, worker, config_filename, **options):
with open(config_filename) as f:
InventoryTask.config = config.load(f)
InventoryTask.logger = logging.getLogger(constants.TASK_LOGGER_NAME)
interfaces_key = "interface_services"
equipment_locations_key = "equipment_locations"
service_child_to_parents_key = "child_to_parent_circuit_relations"
service_parent_to_children_key = "parent_to_children_circuit_relations"
interface_status_key = "interface_statuses"
def worker_args(parser):
parser.add_argument(
"--config_filename",
dest="config_filename",
action='store',
type=str,
help="Configuration filename")
app.user_options['worker'].add(worker_args)
app.steps['worker'].add(WorkerArgs)
@app.task(bind=InventoryTask)
def snmp_refresh_interfaces(self, hostname, community):
logger = logging.getLogger(constants.TASK_LOGGER_NAME)
logger.debug('STARTING: snmp_refresh_interfaces(%r, %r)'
% (hostname, community))
InventoryTask.save_key_json(
hostname,
"snmp-interfaces",
list(snmp.get_router_interfaces(
hostname,
community,
InventoryTask.config)))
logger.debug('FINISHED: snmp_refresh_interfaces(%r, %r)'
% (hostname, community))
@app.task(bind=InventoryTask)
def netconf_refresh_config(self, hostname):
logger = logging.getLogger(constants.TASK_LOGGER_NAME)
logger.debug('STARTING: netconf_refresh_config(%r)' % hostname)
InventoryTask.save_key_etree(
hostname,
"netconf",
juniper.load_config(hostname, InventoryTask.config["ssh"]))
logger.debug('FINISHED: netconf_refresh_config(%r)' % hostname)
# @app.task(bind=InventoryTask)
# def update_alarmsdb_cache(self):
# logger = logging.getLogger(constants.TASK_LOGGER_NAME)
# logger.debug('STARTING: update_alarmsdb_cache')
#
# with db.connection(InventoryTask.config["alarms-db"]) as cx:
# for table_name, data in alarmsdb.load_cache(cx):
# InventoryTask.save_value_json('alarmsdb:%s' % table_name, data)
#
# logger.debug('FINISHED: update_alarmsdb_cache')
@app.task()
def update_interfaces_to_services():
r = get_redis(InventoryTask.config)
with db.connection(InventoryTask.config["ops-db"]) as cx:
services = opsdb.get_circuits(cx)
mapped_interfaces = defaultdict(list)
for service in services:
key = "{}::{}".format(
service["equipment"],
service["interface_name"]
)
mapped_interfaces[key].append(service)
# Puts lu services under the parent ae as well as their own interface
# eg. services on ae15.12 would be found under ae15 as well as ae15.12
if "." in service["interface_name"]:
key = "{}::{}".format(
service["equipment"],
service["interface_name"].split(".")[0]
)
mapped_interfaces[key].append(service)
r.delete(interfaces_key)
for key, value in mapped_interfaces.items():
r.hset(interfaces_key, key, json.dumps(value))
@app.task()
def update_equipment_locations():
r = get_redis(InventoryTask.config)
r.delete(equipment_locations_key)
with db.connection(InventoryTask.config["ops-db"]) as cx:
for ld in opsdb.get_equipment_location_data(cx):
r.hset(
equipment_locations_key, ld["equipment_name"], json.dumps(ld))
@app.task()
def update_circuit_hierarchy():
r = get_redis(InventoryTask.config)
children_to_parents = defaultdict(list)
parents_to_children = defaultdict(list)
with db.connection(InventoryTask.config["ops-db"]) as cx:
records = opsdb.get_circuit_hierarchy(cx)
for relation in records:
parent_id = relation["parent_circuit_id"]
child_id = relation["child_circuit_id"]
parents_to_children[parent_id].append(relation)
children_to_parents[child_id].append(relation)
r.delete(service_child_to_parents_key)
for child, parents in children_to_parents.items():
r.hset(service_child_to_parents_key, child, json.dumps(parents))
r.delete(service_parent_to_children_key)
for parent, children in parents_to_children.items():
r.hset(
service_parent_to_children_key, parent, json.dumps(children))
@app.task()
def update_interface_statuses():
r = get_redis(InventoryTask.config)
with db.connection(InventoryTask.config["ops-db"]) as cx:
services = opsdb.get_circuits(cx)
with db.connection(InventoryTask.config["alarms-db"]) as cx:
with db.cursor(cx) as csr:
for service in services:
key = "{}::{}".format(
service["equipment"],
service["interface_name"]
)
if not r.hexists(interface_status_key, key):
r.hset(interface_status_key,
key,
alarmsdb.get_last_known_interface_status(
csr,
service["equipment"],
service["interface_name"]
))