Skip to content
Snippets Groups Projects
Commit ba05d954 authored by Robert Latta's avatar Robert Latta
Browse files

Moved MySQL connection logic to db module

parent 77697a74
No related branches found
No related tags found
No related merge requests found
import contextlib
import mysql.connector
from inventory_provider import db
@contextlib.contextmanager
def connection(alarmsdb): # pragma: no cover
cx = None
try:
cx = mysql.connector.connect(
host=alarmsdb["hostname"],
user=alarmsdb["username"],
passwd=alarmsdb["password"],
db=alarmsdb["dbname"])
yield cx
finally:
if cx:
cx.close()
@contextlib.contextmanager
def cursor(cnx): # pragma: no cover
csr = None
try:
csr = cnx.cursor()
yield csr
finally:
if csr:
csr.close()
def get_last_known_infinera_interface_status(db, equipment, interface):
def get_last_known_infinera_interface_status(connection, equipment, interface):
query = "SELECT status FROM infinera_alarms" \
" WHERE" \
" CONCAT(ne_name, '-', REPLACE(object_name, 'T', '')) = %s" \
" ORDER BY ne_init_time DESC, ne_clear_time DESC LIMIT 1"
search_string = equipment + "-" + interface
with cursor(db) as crs:
with db.cursor(connection) as crs:
crs.execute(query, (search_string,))
result = crs.fetchone()
if not result:
......@@ -45,11 +18,11 @@ def get_last_known_infinera_interface_status(db, equipment, interface):
return "up"
def get_last_known_coriant_interface_status(db, equipment, interface):
def get_last_known_coriant_interface_status(connection, equipment, interface):
query = "SELECT status FROM coriant_alarms" \
" WHERE ne_id_name = %s AND entity_string LIKE %s" \
" ORDER BY last_event_time DESC LIMIT 1"
with cursor(db) as crs:
with db.cursor(connection) as crs:
crs.execute(query, (equipment, interface + "-%"))
result = crs.fetchone()
if not result:
......@@ -60,12 +33,13 @@ def get_last_known_coriant_interface_status(db, equipment, interface):
return "up"
def get_last_known_juniper_link_interface_status(db, equipment, interface):
def get_last_known_juniper_link_interface_status(
connection, equipment, interface):
query = "SELECT IF(link_admin_status = 'up'" \
" AND link_oper_status = 'up', 1, 0) AS up FROM juniper_alarms" \
" WHERE equipment_name = %s AND link_interface_name = %s" \
" ORDER BY alarm_id DESC LIMIT 1"
with cursor(db) as crs:
with db.cursor(connection) as crs:
crs.execute(query, ('lo0.' + equipment, interface))
result = crs.fetchone()
if not result:
......@@ -76,13 +50,13 @@ def get_last_known_juniper_link_interface_status(db, equipment, interface):
return "up"
def get_last_known_interface_status(db, equipment, interface):
def get_last_known_interface_status(connection, equipment, interface):
result = get_last_known_infinera_interface_status(
db, equipment, interface)
connection, equipment, interface)
if result == "unknown":
result = get_last_known_coriant_interface_status(
db, equipment, interface)
connection, equipment, interface)
if result == "unknown":
result = get_last_known_juniper_link_interface_status(
db, equipment, interface)
connection, equipment, interface)
return result
import contextlib
import mysql.connector
import redis
from flask import current_app, g
......@@ -11,3 +13,29 @@ def get_redis(): # pragma: no cover
port=config['redis']['port'])
return g.redis_db
@contextlib.contextmanager
def connection(db_params):
cx = None
try:
cx = mysql.connector.connect(
host=db_params["hostname"],
user=db_params["username"],
passwd=db_params["password"],
db=db_params["dbname"])
yield cx
finally:
if cx:
cx.close()
@contextlib.contextmanager
def cursor(cnx): # pragma: no cover
csr = None
try:
csr = cnx.cursor()
yield csr
finally:
if csr:
csr.close()
import contextlib
import mysql.connector
from inventory_provider import db
equipment_location_query = """SELECT
......@@ -169,32 +168,6 @@ retrieve_services_query = """SELECT *
'operational')"""
@contextlib.contextmanager
def connection(opsdb): # pragma: no cover
cx = None
try:
cx = mysql.connector.connect(
host=opsdb["hostname"],
user=opsdb["username"],
passwd=opsdb["password"],
db=opsdb["dbname"])
yield cx
finally:
if cx:
cx.close()
@contextlib.contextmanager
def cursor(cnx): # pragma: no cover
csr = None
try:
csr = cnx.cursor()
yield csr
finally:
if csr:
csr.close()
def _convert_to_dict(crs):
return [dict((crs.description[i][0], "" if value is None else value)
for i, value in enumerate(row)) for row in crs.fetchall()]
......@@ -236,23 +209,23 @@ def _update_fields(r):
return func(r) if func else r
def get_circuits(db):
with cursor(db) as crs:
def get_circuits(connection):
with db.cursor(connection) as crs:
crs.execute(retrieve_services_query)
r = _convert_to_dict(crs)
r = list(map(_update_fields, r))
return r
def get_circuit_hierarchy(db):
with cursor(db) as crs:
def get_circuit_hierarchy(connection):
with db.cursor(connection) as crs:
crs.execute(circuit_hierarchy_query)
r = _convert_to_dict(crs)
return r
def get_equipment_location_data(db):
with cursor(db) as crs:
def get_equipment_location_data(connection):
with db.cursor(connection) as crs:
crs.execute(equipment_location_query)
r = _convert_to_dict(crs)
return r
......@@ -2,7 +2,7 @@ import functools
import json
from flask import Blueprint, request, Response, current_app
from inventory_provider import alarmsdb
from inventory_provider import alarmsdb, db
routes = Blueprint("inventory-alarmsdb-query-routes", __name__)
......@@ -34,9 +34,9 @@ def get_interface_status():
equipment = request.args.get("equipment")
interface = request.args.get("interface")
with alarmsdb.connection(config['alarms-db']) as db:
with db.connection(config['alarms-db']) as connection:
result = {"status": alarmsdb.get_last_known_interface_status(
db, equipment, interface)}
connection, equipment, interface)}
return Response(
json.dumps(result),
......
......@@ -2,7 +2,7 @@ import logging
from flask import Blueprint, Response, current_app
import inventory_provider.storage.external_inventory as external_inventory
from inventory_provider import opsdb
from inventory_provider import db, opsdb
from inventory_provider.tasks.app import app
from inventory_provider.constants import TASK_LOGGER_NAME
......@@ -42,8 +42,8 @@ def update():
def update_service():
config = current_app.config['INVENTORY_PROVIDER_CONFIG']
with opsdb.connection(config['ops-db']) as db:
result = opsdb.get_circuits(db)
with db.connection(config['ops-db']) as connection:
result = opsdb.get_circuits(connection)
external_inventory.update_services_to_monitor(result)
return Response("OK")
......@@ -52,8 +52,8 @@ def update_service():
def update_interfaces():
config = current_app.config['INVENTORY_PROVIDER_CONFIG']
with opsdb.connection(config['ops-db']) as db:
result = opsdb.get_circuits(db)
with db.connection(config['ops-db']) as connection:
result = opsdb.get_circuits(connection)
external_inventory.update_interfaces_to_services(result)
return Response("OK")
......@@ -62,8 +62,8 @@ def update_interfaces():
def update_service_hierarchy():
config = current_app.config['INVENTORY_PROVIDER_CONFIG']
with opsdb.connection(config['ops-db']) as db:
result = opsdb.get_circuit_hierarchy(db)
with db.connection(config['ops-db']) as connection:
result = opsdb.get_circuit_hierarchy(connection)
external_inventory.update_service_hierarchy(result)
return Response("OK")
......@@ -72,8 +72,8 @@ def update_service_hierarchy():
def update_equipment_locations():
config = current_app.config['INVENTORY_PROVIDER_CONFIG']
with opsdb.connection(config['ops-db']) as db:
result = opsdb.get_equipment_location_data(db)
with db.connection(config['ops-db']) as connection:
result = opsdb.get_equipment_location_data(connection)
external_inventory.update_equipment_locations(result)
return Response("OK")
......@@ -82,10 +82,10 @@ def update_equipment_locations():
def update_from_inventory_system():
config = current_app.config['INVENTORY_PROVIDER_CONFIG']
with opsdb.connection(config['ops-db']) as db:
circuits = opsdb.get_circuits(db)
hierarchy = opsdb.get_circuit_hierarchy(db)
equipment_locations = opsdb.get_equipment_location_data(db)
with db.connection(config['ops-db']) as connection:
circuits = opsdb.get_circuits(connection)
hierarchy = opsdb.get_circuit_hierarchy(connection)
equipment_locations = opsdb.get_equipment_location_data(connection)
external_inventory.update_services_to_monitor(circuits)
external_inventory.update_interfaces_to_services(circuits)
external_inventory.update_service_hierarchy(hierarchy)
......
......@@ -7,10 +7,6 @@ from inventory_provider.storage import external_inventory
routes = Blueprint("inventory-opsdb-query-routes", __name__)
services_key_template = "inv_services::{}"
interfaces_key_template = "inv_interfaces::{}::{}"
equipment_locations_key_template = "inv_eq_locations::{}"
services_key = "inv_services"
interfaces_key = "inv_interfaces"
equipment_locations_key = "inv_eq_locations"
......
......@@ -9,7 +9,7 @@ DEFAULT_REQUEST_HEADERS = {
def test_get_interface_status(mocker, client):
mocked_conn = mocker.patch('inventory_provider.routes.alarmsdb'
'.alarmsdb.connection')
'.db.connection')
mocked_conn.return_value.__enter__.return_value = None
mocked_inteface_status = mocker.patch(
......
......@@ -2,7 +2,7 @@ import inventory_provider.alarmsdb as alarmsdb
def test_infinera_interface_status(mocker):
mocked_get_cursor = mocker.patch('inventory_provider.alarmsdb.cursor')
mocked_get_cursor = mocker.patch('inventory_provider.alarmsdb.db.cursor')
mocked_execute = mocked_get_cursor. \
return_value.__enter__.return_value.execute
mocked_fetchone = mocked_get_cursor.return_value.__enter__. \
......@@ -30,7 +30,7 @@ def test_infinera_interface_status(mocker):
def test_coriant_interface_status(mocker):
mocked_get_cursor = mocker.patch('inventory_provider.alarmsdb.cursor')
mocked_get_cursor = mocker.patch('inventory_provider.alarmsdb.db.cursor')
mocked_execute = mocked_get_cursor. \
return_value.__enter__.return_value.execute
mocked_fetchone = mocked_get_cursor.return_value.__enter__. \
......@@ -58,7 +58,7 @@ def test_coriant_interface_status(mocker):
def test_juniper_interface_status(mocker):
mocked_get_cursor = mocker.patch('inventory_provider.alarmsdb.cursor')
mocked_get_cursor = mocker.patch('inventory_provider.alarmsdb.db.cursor')
mocked_execute = mocked_get_cursor. \
return_value.__enter__.return_value.execute
mocked_fetchone = mocked_get_cursor.return_value.__enter__. \
......
......@@ -98,7 +98,7 @@ def test_juniper_field_update():
def test_get_circuits(mocker):
mocker.patch("inventory_provider.opsdb.cursor")
mocker.patch("inventory_provider.opsdb.db.cursor")
mocked_convert_to_dict = mocker.patch(
"inventory_provider.opsdb._convert_to_dict")
i = {"manufacturer": "infinera"}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment