diff --git a/inventory_provider/opsdb.py b/inventory_provider/opsdb.py index c0a6417abbc10b803ba4ee39053d8e1498f64533..1d330354e8791a8e2eb2cc462a4192cb3e06ed93 100644 --- a/inventory_provider/opsdb.py +++ b/inventory_provider/opsdb.py @@ -18,7 +18,14 @@ equipment_location_query = """SELECT ON g.absid = p.PTR_geocoding WHERE e.status != 'terminated' - AND e.status != 'disposed'""" + AND e.status != 'disposed' + ORDER BY + FIELD(e.status, + 'spare', + 'planned', + 'ordered', + 'installed', + 'operational')""" circuit_hierarchy_query = """SELECT diff --git a/inventory_provider/routes/jobs.py b/inventory_provider/routes/jobs.py index 315c3e8d7c960228304a6a5020497cde0bbe1f42..06f3d5c17b27f5fe271e10d322fc235b642fe7d9 100644 --- a/inventory_provider/routes/jobs.py +++ b/inventory_provider/routes/jobs.py @@ -1,8 +1,7 @@ import logging + from flask import Blueprint, Response, current_app -import inventory_provider.storage.external_inventory as external_inventory -from inventory_provider import db, opsdb from inventory_provider.tasks.app import app from inventory_provider.constants import TASK_LOGGER_NAME @@ -49,56 +48,44 @@ def startup_update(): return Response("OK") -@routes.route("update-services", methods=['GET']) -def update_service(): - config = current_app.config['INVENTORY_PROVIDER_CONFIG'] - - with db.connection(config['ops-db']) as connection: - result = opsdb.get_circuits(connection) - external_inventory.update_services_to_monitor(result) +@routes.route("update-services-to-monitor", methods=['GET']) +def update_services_to_monitor(): + app.send_task( + 'inventory_provider.tasks.worker.update_service_to_monitor') return Response("OK") -@routes.route("update-interfaces", methods=['GET']) -def update_interfaces(): - config = current_app.config['INVENTORY_PROVIDER_CONFIG'] - - with db.connection(config['ops-db']) as connection: - result = opsdb.get_circuits(connection) - external_inventory.update_interfaces_to_services(result) +@routes.route("update-interfaces-to-services", methods=['GET']) +def update_interfaces_to_services(): + app.send_task( + 'inventory_provider.tasks.worker.update_interfaces_to_services') return Response("OK") @routes.route("update-service-hierarchy", methods=['GET']) def update_service_hierarchy(): - config = current_app.config['INVENTORY_PROVIDER_CONFIG'] - - with db.connection(config['ops-db']) as connection: - result = opsdb.get_circuit_hierarchy(connection) - external_inventory.update_service_hierarchy(result) + app.send_task('inventory_provider.tasks.worker.update_circuit_hierarchy') return Response("OK") @routes.route("update-equipment-locations", methods=['GET']) def update_equipment_locations(): - config = current_app.config['INVENTORY_PROVIDER_CONFIG'] - - with db.connection(config['ops-db']) as connection: - result = opsdb.get_equipment_location_data(connection) - external_inventory.update_equipment_locations(result) + app.send_task('inventory_provider.tasks.worker.update_equipment_locations') return Response("OK") @routes.route("update-from-inventory-system", methods=['GET']) def update_from_inventory_system(): - config = current_app.config['INVENTORY_PROVIDER_CONFIG'] - - with db.connection(config['ops-db']) as connection: - circuits = opsdb.get_circuits(connection) - hierarchy = opsdb.get_circuit_hierarchy(connection) - equipment_locations = opsdb.get_equipment_location_data(connection) - external_inventory.update_services_to_monitor(circuits) - external_inventory.update_interfaces_to_services(circuits) - external_inventory.update_service_hierarchy(hierarchy) - external_inventory.update_equipment_locations(equipment_locations) + app.send_task( + 'inventory_provider.tasks.worker.update_service_to_monitor') + app.send_task( + 'inventory_provider.tasks.worker.update_interfaces_to_services') + app.send_task('inventory_provider.tasks.worker.update_circuit_hierarchy') + app.send_task('inventory_provider.tasks.worker.update_equipment_locations') return Response("OK") + + +@routes.route("update-interface-statuses") +def update_interface_statuses(): + app.send_task( + 'inventory_provider.tasks.worker.update_interface_statuses') diff --git a/inventory_provider/tasks/worker.py b/inventory_provider/tasks/worker.py index 8f2efb84f0badfac81a87851d541f9297c6a1bd0..4fec01f456c04654d4487bd1c22630f99e74abc3 100644 --- a/inventory_provider/tasks/worker.py +++ b/inventory_provider/tasks/worker.py @@ -2,6 +2,7 @@ import json import logging from celery import bootsteps, Task +from collections import defaultdict import redis from lxml import etree @@ -11,6 +12,7 @@ from inventory_provider import config from inventory_provider import constants from inventory_provider import db from inventory_provider import environment +from inventory_provider import opsdb from inventory_provider import snmp from inventory_provider import juniper @@ -81,6 +83,14 @@ class WorkerArgs(bootsteps.Step): InventoryTask.logger = logging.getLogger(constants.TASK_LOGGER_NAME) +monitored_services_key = "monitored_services" +interfaces_key = "interface_services" +equipment_locations_key = "equipment_locations" +service_child_to_parents_key = "child_to_parent_circuit_relations" +service_parent_to_children_key = "parent_to_children_circuit_relations" +interface_status_key = "interface_statuses" + + def worker_args(parser): parser.add_argument( "--config_filename", @@ -135,3 +145,106 @@ def update_alarmsdb_cache(self): InventoryTask.save_value_json('alarmsdb:%s' % table_name, data) logger.debug('FINISHED: update_alarmsdb_cache') + + +@app.task() +def update_service_to_monitor(): + # todo - factor this connection stuff out + r = redis.StrictRedis( + host=InventoryTask.config["redis"]["hostname"], + port=InventoryTask.config["redis"]["port"]) + r.delete(monitored_services_key) + + relevant_types = ("path", "service", "l2circuit") + with db.connection(InventoryTask.config["ops-db"]) as cx: + for circuit in opsdb.get_circuits(cx): + if circuit["circuit_type"].lower() in relevant_types: + r.hset( + monitored_services_key, circuit["id"], json.dumps(circuit)) + + +@app.task() +def update_interfaces_to_services(): + # todo - factor this connection stuff out + r = redis.StrictRedis( + host=InventoryTask.config["redis"]["hostname"], + port=InventoryTask.config["redis"]["port"]) + with db.connection(InventoryTask.config["ops-db"]) as cx: + services = opsdb.get_circuits(cx) + + mapped_interfaces = defaultdict(list) + for service in services: + key = "{}::{}".format( + service["equipment"], + service["interface_name"] + ) + mapped_interfaces[key].append(service) + + r.delete(interfaces_key) + for key, value in mapped_interfaces.items(): + r.hset(interfaces_key, key, json.dumps(value)) + + +@app.task() +def update_equipment_locations(): + # todo - factor this connection stuff out + r = redis.StrictRedis( + host=InventoryTask.config["redis"]["hostname"], + port=InventoryTask.config["redis"]["port"]) + r.delete(equipment_locations_key) + + with db.connection(InventoryTask.config["ops-db"]) as cx: + for ld in opsdb.get_equipment_location_data(cx): + r.hset( + equipment_locations_key, ld["equipment_name"], json.dumps(ld)) + + +@app.task() +def update_circuit_hierarchy(): + # todo - factor this connection stuff out + r = redis.StrictRedis( + host=InventoryTask.config["redis"]["hostname"], + port=InventoryTask.config["redis"]["port"]) + children_to_parents = defaultdict(list) + parents_to_children = defaultdict(list) + with db.connection(InventoryTask.config["ops-db"]) as cx: + records = opsdb.get_circuit_hierarchy(cx) + for relation in records: + parent_id = relation["parent_circuit_id"] + child_id = relation["child_circuit_id"] + parents_to_children[parent_id].append(relation) + children_to_parents[child_id].append(relation) + + r.delete(service_child_to_parents_key) + for child, parents in children_to_parents.items(): + r.hset(service_child_to_parents_key, child, json.dumps(parents)) + + r.delete(service_parent_to_children_key) + for parent, children in parents_to_children.items(): + r.hset( + service_parent_to_children_key, parent, json.dumps(children)) + + +@app.task() +def update_interface_statuses(): + # todo - factor this connection stuff out + r = redis.StrictRedis( + host=InventoryTask.config["redis"]["hostname"], + port=InventoryTask.config["redis"]["port"]) + with db.connection(InventoryTask.config["ops-db"]) as cx: + services = opsdb.get_circuits(cx) + with db.connection(InventoryTask.config["alarms-db"]) as cx: + with db.cursor(cx) as csr: + for service in services: + key = "{}::{}".format( + service["equipment"], + service["interface_name"] + ) + if not r.hexists(interface_status_key, key): + r.hset(interface_status_key, + key, + alarmsdb.get_last_known_interface_status( + csr, + service["equipment"], + service["interface_name"] + ))