Skip to content
Snippets Groups Projects
worker.py 7.01 KiB
import json
import logging

from celery import bootsteps, Task
from collections import defaultdict
from lxml import etree

from inventory_provider.tasks.app import app
from inventory_provider.tasks.common import get_redis
from inventory_provider import config
from inventory_provider import constants
from inventory_provider import environment
from inventory_provider.db import db, opsdb, alarmsdb
from inventory_provider import snmp
from inventory_provider import juniper

environment.setup_logging()


class InventoryTask(Task):

    config = None
    logger = None

    def __init__(self):
        pass

    @staticmethod
    def save_key(hostname, key, value):
        assert isinstance(value, str), \
            "sanity failure: expected string data as value"
        r = get_redis(InventoryTask.config)
        r.hset(
            name=hostname,
            key=key,
            value=value)
        InventoryTask.logger.debug(
            "saved %s, key %s" % (hostname, key))
        return "OK"

    @staticmethod
    def save_value(key, value):
        assert isinstance(value, str), \
            "sanity failure: expected string data as value"
        r = get_redis(InventoryTask.config)
        r.set(
            name=key,
            value=value)
        InventoryTask.logger.debug("saved %s" % key)
        return "OK"

    @staticmethod
    def save_value_json(key, data_obj):
        InventoryTask.save_value(
            key,
            json.dumps(data_obj))

    @staticmethod
    def save_key_json(hostname, key, data_obj):
        InventoryTask.save_key(
            hostname,
            key,
            json.dumps(data_obj))

    @staticmethod
    def save_key_etree(hostname, key, xml_doc):
        InventoryTask.save_key(
            hostname,
            key,
            etree.tostring(xml_doc, encoding='unicode'))


class WorkerArgs(bootsteps.Step):
    def __init__(self, worker, config_filename, **options):
        with open(config_filename) as f:
            InventoryTask.config = config.load(f)
        InventoryTask.logger = logging.getLogger(constants.TASK_LOGGER_NAME)


interfaces_key = "interface_services"
equipment_locations_key = "equipment_locations"
service_child_to_parents_key = "child_to_parent_circuit_relations"
service_parent_to_children_key = "parent_to_children_circuit_relations"
interface_status_key = "interface_statuses"


def worker_args(parser):
    parser.add_argument(
        "--config_filename",
        dest="config_filename",
        action='store',
        type=str,
        help="Configuration filename")


app.user_options['worker'].add(worker_args)
app.steps['worker'].add(WorkerArgs)


@app.task(bind=InventoryTask)
def snmp_refresh_interfaces(self, hostname, community):
    logger = logging.getLogger(constants.TASK_LOGGER_NAME)
    logger.debug('STARTING: snmp_refresh_interfaces(%r, %r)'
                 % (hostname, community))

    InventoryTask.save_key_json(
        hostname,
        "snmp-interfaces",
        list(snmp.get_router_interfaces(
            hostname,
            community,
            InventoryTask.config)))

    logger.debug('FINISHED: snmp_refresh_interfaces(%r, %r)'
                 % (hostname, community))


@app.task(bind=InventoryTask)
def netconf_refresh_config(self, hostname):
    logger = logging.getLogger(constants.TASK_LOGGER_NAME)
    logger.debug('STARTING: netconf_refresh_config(%r)' % hostname)

    InventoryTask.save_key_etree(
        hostname,
        "netconf",
        juniper.load_config(hostname, InventoryTask.config["ssh"]))

    logger.debug('FINISHED: netconf_refresh_config(%r)' % hostname)


# @app.task(bind=InventoryTask)
# def update_alarmsdb_cache(self):
#     logger = logging.getLogger(constants.TASK_LOGGER_NAME)
#     logger.debug('STARTING: update_alarmsdb_cache')
#
#     with db.connection(InventoryTask.config["alarms-db"]) as cx:
#         for table_name, data in alarmsdb.load_cache(cx):
#             InventoryTask.save_value_json('alarmsdb:%s' % table_name, data)
#
#     logger.debug('FINISHED: update_alarmsdb_cache')


@app.task()
def update_interfaces_to_services():
    r = get_redis(InventoryTask.config)
    with db.connection(InventoryTask.config["ops-db"]) as cx:
        services = opsdb.get_circuits(cx)

    mapped_interfaces = defaultdict(list)
    for service in services:
        key = "{}::{}".format(
            service["equipment"],
            service["interface_name"]
        )
        mapped_interfaces[key].append(service)
        # Puts lu services under the parent ae as well as their own interface
        # eg. services on ae15.12 would be found under ae15 as well as ae15.12
        if "." in service["interface_name"]:
            key = "{}::{}".format(
                service["equipment"],
                service["interface_name"].split(".")[0]
            )
            mapped_interfaces[key].append(service)

    r.delete(interfaces_key)
    for key, value in mapped_interfaces.items():
        r.hset(interfaces_key, key, json.dumps(value))


@app.task()
def update_equipment_locations():
    r = get_redis(InventoryTask.config)
    r.delete(equipment_locations_key)

    with db.connection(InventoryTask.config["ops-db"]) as cx:
        for ld in opsdb.get_equipment_location_data(cx):
            r.hset(
                equipment_locations_key, ld["equipment_name"], json.dumps(ld))


@app.task()
def update_circuit_hierarchy():
    r = get_redis(InventoryTask.config)
    children_to_parents = defaultdict(list)
    parents_to_children = defaultdict(list)
    with db.connection(InventoryTask.config["ops-db"]) as cx:
        records = opsdb.get_circuit_hierarchy(cx)
        for relation in records:
            parent_id = relation["parent_circuit_id"]
            child_id = relation["child_circuit_id"]
            parents_to_children[parent_id].append(relation)
            children_to_parents[child_id].append(relation)

        r.delete(service_child_to_parents_key)
        for child, parents in children_to_parents.items():
            r.hset(service_child_to_parents_key, child, json.dumps(parents))

        r.delete(service_parent_to_children_key)
        for parent, children in parents_to_children.items():
            r.hset(
                service_parent_to_children_key, parent, json.dumps(children))


@app.task()
def update_interface_statuses():
    r = get_redis(InventoryTask.config)
    with db.connection(InventoryTask.config["ops-db"]) as cx:
            services = opsdb.get_circuits(cx)
    with db.connection(InventoryTask.config["alarms-db"]) as cx:
        with db.cursor(cx) as csr:
            for service in services:
                key = "{}::{}".format(
                    service["equipment"],
                    service["interface_name"]
                )
                if not r.hexists(interface_status_key, key):
                    r.hset(interface_status_key,
                           key,
                           alarmsdb.get_last_known_interface_status(
                               csr,
                               service["equipment"],
                               service["interface_name"]
                           ))