Skip to content
Snippets Groups Projects
Commit c7ace6b7 authored by Robert Latta's avatar Robert Latta
Browse files

Long overdue commit :( with most of the opsdb etl implementation

parent c70e88b2
Branches
Tags
No related merge requests found
import redis
from flask import current_app, g
def get_redis():
if 'redis_db' not in g:
config = current_app.config['INVENTORY_PROVIDER_CONFIG']
g.redis_db = redis.Redis(
host=config['redis']['hostname'],
port=config['redis']['port'])
return g.redis_db
import contextlib
import logging
import mysql.connector
from inventory_provider.constants import DATABASE_LOGGER_NAME
equipment_location_query = "SELECT " \
" e.absid, " \
" e.name AS equipment_name, " \
" p.name AS pop_name, " \
" p.abbreviation AS pop_abbreviation, " \
" p.site_id AS pop_site_id, " \
" p.country, " \
" g.longitude, " \
" g.latitude " \
"FROM " \
" equipment e " \
"INNER JOIN pop p " \
" ON p.absid = e.PTR_pop " \
"INNER JOIN geocoding g " \
" ON g.absid = p.PTR_geocoding " \
"WHERE " \
" e.status != 'terminated' " \
" AND e.status != 'disposed'"
circuit_hierarchy_query = "SELECT " \
" pc.name AS parent_circuit, " \
" pc.absid AS parent_circuit_id, " \
" pc.status AS parent_circuit_status, " \
" cc.name AS child_circuit, " \
" cc.absid AS child_circuit_id, " \
" cc.status AS child_circuit_status, " \
" cg.segment_group AS segment_group " \
"FROM circuit_glue cg " \
"INNER JOIN circuit pc ON pc.absid = cg.PTR_circuit " \
"INNER JOIN circuit cc ON cc.absid = cg.PTR_component"
retrieve_services_query_template = "SELECT " \
" c.absid AS id, " \
" c.name, " \
" c.status, " \
" c.circuit_type, " \
" c.service_type, " \
" events.short_descr AS project, " \
" e.name AS equipment, " \
" cc.{} AS port, " \
" cc.{} AS logical_unit, " \
" LOWER(o.name) AS manufacturer, " \
" LOWER(ec.card_id) AS card_id, " \
" LOWER(IF(pp.interface_name IS NULL," \
" ''," \
" pp.interface_name))" \
" AS interface_name " \
"FROM circuit c " \
"INNER JOIN circuit_connections cc " \
" ON cc.circ_absid = c.absid " \
"INNER JOIN equipment e " \
" ON e.absid = cc.{} " \
"LEFT JOIN events " \
" ON events.absid = cc.PTR_project " \
"INNER JOIN equipment_card ec " \
" ON ec.absid = cc.{} " \
"LEFT JOIN organisation o " \
" ON o.absid = ec.manufacturer " \
"LEFT JOIN port_plugin pp " \
" ON pp.PTR_card = cc.{} " \
" AND pp.port = cc.{} " \
"WHERE " \
" c.status != 'terminated' " \
" AND is_circuit = 1 "
order_services_outer_query = "SELECT * FROM ({}) AS inner_query " \
" ORDER BY FIELD(status, 'spare', 'planned'," \
" 'ordered', 'installed', 'operational')"
connection_variants = [
['port_a',
'int_LU_a',
'PTR_equip_a',
'PTR_card_a',
'PTR_card_a',
'port_a'],
['port_b',
'int_LU_b',
'PTR_equip_b',
'PTR_card_b',
'PTR_card_b',
'port_b'],
['port_a_OUT',
'int_LU_a',
'PTR_equip_a',
'PTR_card_a_OUT',
'PTR_card_a_OUT',
'port_a_OUT'],
['port_b_OUT',
'int_LU_b',
'PTR_equip_b',
'PTR_card_b_OUT',
'PTR_card_b_OUT',
'port_b_OUT']
]
@contextlib.contextmanager
......@@ -32,12 +128,70 @@ def cursor(cnx): # pragma: no cover
csr.close()
def _db_test(db, router):
database_logger = logging.getLogger(DATABASE_LOGGER_NAME)
def _convert_to_dict(crs):
return [dict((crs.description[i][0], "" if value is None else value)
for i, value in enumerate(row)) for row in crs.fetchall()]
def _infinera_field_update(record):
equipment_parts = record["equipment"].rsplit("-", 1)
card_parts = record["card_id"].split("-", 1)
try:
record["equipment"] = equipment_parts[0]
record["interface_name"] = equipment_parts[1] + "-" + card_parts[1]
except IndexError:
pass # Nothing to see here
if record["port"] is not None and record["port"] != "":
record["interface_name"] += "-" + record["port"]
record["interface_name"] = record["interface_name"]\
.replace("--", "-").upper()
return record
def _juniper_field_update(record):
if not record["interface_name"]:
record["interface_name"] = record["card_id"]
if record["port"] is not None and record["port"] != "":
separator = "/" if "-" in record["interface_name"] else ""
record["interface_name"] += separator + record["port"]
if record["logical_unit"] is not None and record["logical_unit"] != "":
record["interface_name"] += "." + str(record["logical_unit"])
return record
def _update_fields(r):
func = globals().get("_" + r["manufacturer"] + "_field_update")
return func(r) if func else r
def get_circuits(db):
retrieve_services_query = _generate_get_circuits_sql()
with cursor(db) as crs:
query = "select model, manufacturer from equipment where name = %s"
crs.execute(query, (router['hostname'],))
for (model, manufacturer) in crs:
database_logger.debug("%s: %s %s" % (
router['hostname'], model, manufacturer))
yield {"model": model, "manufacturer": manufacturer}
crs.execute(retrieve_services_query)
r = _convert_to_dict(crs)
r = list(map(_update_fields, r))
return r
def _generate_get_circuits_sql():
retrieve_services_inner_query = " UNION ".join(
map(lambda e: retrieve_services_query_template.format(*e),
connection_variants))
retrieve_services_query = order_services_outer_query.\
format(retrieve_services_inner_query)
return retrieve_services_query
def get_circuit_hierarchy(db):
with cursor(db) as crs:
crs.execute(circuit_hierarchy_query)
r = _convert_to_dict(crs)
return r
def get_equipment_location_data(db):
with cursor(db) as crs:
crs.execute(equipment_location_query)
r = _convert_to_dict(crs)
return r
from flask import Blueprint, Response, current_app
import inventory_provider.storage.external_inventory as external_inventory
from inventory_provider import opsdb
from inventory_provider import router_details
routes = Blueprint("inventory-data-job-routes", __name__)
......@@ -10,3 +12,55 @@ def update():
router_details.update_network_details(
current_app.config["INVENTORY_PROVIDER_CONFIG"])
return Response("OK")
@routes.route("update-services", methods=['GET'])
def update_service():
config = current_app.config['INVENTORY_PROVIDER_CONFIG']
with opsdb.connection(config['ops-db']) as db:
result = opsdb.get_circuits(db)
external_inventory.update_services_to_monitor(result)
return Response("OK")
@routes.route("update-interfaces", methods=['GET'])
def update_interfaces():
config = current_app.config['INVENTORY_PROVIDER_CONFIG']
with opsdb.connection(config['ops-db']) as db:
result = opsdb.get_circuits(db)
external_inventory.update_interfaces_to_services(result)
return Response("OK")
@routes.route("update-equipment-locations", methods=['GET'])
def update_equipment_locations():
config = current_app.config['INVENTORY_PROVIDER_CONFIG']
with opsdb.connection(config['ops-db']) as db:
result = opsdb.get_equipment_location_data(db)
external_inventory.update_equipment_locations(result)
return Response("OK")
@routes.route("update-from-inventory-system", methods=['GET'])
def update_from_inventory_system():
config = current_app.config['INVENTORY_PROVIDER_CONFIG']
with opsdb.connection(config['ops-db']) as db:
result = opsdb.get_circuits(db)
external_inventory.update_services_to_monitor(result)
external_inventory.update_interfaces_to_services(result)
# todo - add other updates
return Response("OK")
@routes.route("update-service-hierarchy", methods=['GET'])
def update_service_hierarchy():
config = current_app.config['INVENTORY_PROVIDER_CONFIG']
with opsdb.connection(config['ops-db']) as db:
result = opsdb.get_circuit_hierarchy(db)
external_inventory.update_service_hierarchy(result)
return Response("OK")
......@@ -3,9 +3,18 @@ import json
from flask import Blueprint, request, Response, current_app
from inventory_provider import opsdb
from inventory_provider import db
routes = Blueprint("inventory-opsdb-query-routes", __name__)
services_key_template = "inv_services::{}"
interfaces_key_template = "inv_interfaces::{}::{}"
equipment_locations_key_template = "inv_eq_locations::{}"
services_key = "inv_services"
interfaces_key = "inv_interfaces"
equipment_locations_key = "inv_eq_locations"
def require_accepts_json(f):
"""
......@@ -26,16 +35,88 @@ def require_accepts_json(f):
return decorated_function
@routes.route("/test", methods=['GET', 'POST'])
def _decode_utf8_dict(d):
return {k.decode('utf8'): json.loads(v) for k, v in d.items()}
@routes.route("/circuit-hierarchy", methods=['GET', ])
@require_accepts_json
def opsdb_test():
config = current_app.config['INVENTORY_PROVIDER_CONFIG']
def opsdb_circuit_hierarchy():
# todo - Retrieve data from redis
result = [{"Parent Name": "circuit 1"}, {"Parent Name": "Circuit 2"}]
return Response(
json.dumps(result),
mimetype="application/json")
@routes.route("/interfaces")
def get_all_interface_details():
r = db.get_redis()
result = _decode_utf8_dict(
r.hgetall(interfaces_key))
return Response(
json.dumps(result),
mimetype="application/json")
@routes.route("/interfaces/<equipment_name>")
def get_interface_details_for_equipment(equipment_name):
r = db.get_redis()
result = {}
with opsdb.connection(config['ops-db']) as db:
for r in config['routers']:
result[r['hostname']] = list(opsdb._db_test(db, r))
for t in r.hscan_iter(interfaces_key, "{}::*".format(equipment_name)):
result[t[0].decode("utf8")] = json.loads(t[1])
return Response(
json.dumps(result),
mimetype="application/json")
@routes.route("/interfaces/<equipment_name>/<path:interface>")
def get_interface_details(equipment_name, interface):
r = db.get_redis()
return Response(
r.hget(
interfaces_key,
"{}::{}".format(equipment_name, interface)),
mimetype="application/json")
@routes.route("/equipment-location")
def get_all_equipment_locations():
r = db.get_redis()
result = list(
_decode_utf8_dict(
r.hgetall(equipment_locations_key)).values())
return Response(
json.dumps(result),
mimetype="application/json")
@routes.route("/equipment-location/<equipment_name>")
def get_equipment_location(equipment_name):
r = db.get_redis()
return Response(
r.hget(equipment_locations_key, equipment_name),
mimetype="application/json")
# todo - Add in the routes for the circuit hierarchy, the actual load of the data is already done
# todo - Below are temporary routes that need to be removed
@routes.route("/circuit-sql")
def get_circuits_sql():
return Response(opsdb._generate_get_circuits_sql(), mimetype="text/plain")
@routes.route("/infinera-circuit-sql", methods=['GET',])
def get_infinera_circuits_sql():
return Response(opsdb.generate_infinera_sql(), mimetype="text/plain")
@routes.route("/equipment-location-sql", methods=['GET',])
def get_equipment_location_sql():
return Response(opsdb.equipment_location_query, mimetype="text/plain")
\ No newline at end of file
import json
from inventory_provider import db
services_key = "inv_services"
interfaces_key = "inv_interfaces"
equipment_locations_key = "inv_eq_locations"
service_child_to_parents_key = "inv_service_child_to_parents"
service_parent_to_children_key = "inv_service_parent_to_children"
def update_services_to_monitor(services):
r = db.get_redis()
relevant_types = ('path', 'service', 'l2circuit')
for service in services:
if service['circuit_type'].lower() in relevant_types:
r.hset(services_key, service['id'], json.dumps(service))
def update_interfaces_to_services(services):
r = db.get_redis()
mapped_interfaces = {}
for service in services:
key = "{}::{}".format(
service['equipment'],
service['interface_name']
)
r.hset(interfaces_key, key, json.dumps(mapped_interfaces[key]))
def update_service_hierarchy(records):
r = db.get_redis()
children_to_parents = {}
parents_to_children = {}
for relation in records:
parent_id = relation["parent_circuit_id"]
child_id = relation["child_circuit_id"]
if child_id not in children_to_parents:
children_to_parents[child_id] = []
if parent_id not in parents_to_children:
parents_to_children[parent_id] = []
parents_to_children[parent_id].append(relation)
children_to_parents[child_id].append(relation)
for child, parents in children_to_parents.items():
r.hset(service_child_to_parents_key, child, json.dumps(parents))
for parent, children in parents_to_children.items():
r.hset(service_parent_to_children_key, parent, json.dumps(children))
def update_equipment_locations(equipment_location_data):
r = db.get_redis()
for ld in equipment_location_data:
r.hset(equipment_locations_key, ld['equipment_name'], json.dumps(ld))
import inventory_provider.opsdb
def test_get_circuits_to_monitor(mocker):
mocked_get_cursor = mocker.patch('inventory_provider.opsdbdb.cursor')
mocked_get_cursor.return_value.__enter__.return_value = None
mocked_execute = mocked_get_cursor. \
return_value.__enter__.return_value.execute
pass
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment