test_alarmsdb.py 3.00 KiB
import inventory_provider.db.alarmsdb as alarmsdb
def test_infinera_interface_status(mocker):
mock = mocker.Mock()
mock.fetchone.return_value = ('Raised',)
assert alarmsdb.get_last_known_infinera_interface_status(
mock, 'eq1', 'intfc1'
) == "down"
mock.fetchone.return_value = ("Clear",)
assert alarmsdb.get_last_known_infinera_interface_status(
mock, 'eq1', 'intfc1'
) == "up"
mock.fetchone.return_value = ()
assert alarmsdb.get_last_known_infinera_interface_status(
mock, 'eq1', 'intfc1'
) == "unknown"
mock.execute.assert_called_with(
"SELECT status FROM infinera_alarms WHERE"
" CONCAT(ne_name, '-', REPLACE(object_name, 'T', '')) = %s"
" ORDER BY ne_init_time DESC, ne_clear_time DESC LIMIT 1",
("eq1-intfc1",))
def test_coriant_interface_status(mocker):
mock = mocker.Mock()
mock.fetchone.return_value = ('Raised',)
assert alarmsdb.get_last_known_coriant_interface_status(
mock, 'eq1', 'intfc1'
) == "down"
mock.fetchone.return_value = ("Clear",)
assert alarmsdb.get_last_known_coriant_interface_status(
mock, 'eq1', 'intfc1'
) == "up"
mock.fetchone.return_value = ()
assert alarmsdb.get_last_known_coriant_interface_status(
mock, 'eq1', 'intfc1'
) == "unknown"
mock.execute.assert_called_with(
"SELECT status FROM coriant_alarms"
" WHERE ne_id_name = %s AND entity_string LIKE %s"
" ORDER BY last_event_time DESC LIMIT 1",
("eq1", "intfc1-%"))
def test_juniper_interface_status(mocker):
mock = mocker.Mock()
mock.fetchone.return_value = (0,)
assert alarmsdb.get_last_known_juniper_link_interface_status(
mock, 'eq1', 'intfc1'
) == "down"
mock.fetchone.return_value = (1,)
assert alarmsdb.get_last_known_juniper_link_interface_status(
mock, 'eq1', 'intfc1'
) == "up"
mock.fetchone.return_value = ()
assert alarmsdb.get_last_known_juniper_link_interface_status(
mock, 'eq1', 'intfc1'
) == "unknown"
mock.execute.assert_called_with(
"SELECT IF(link_admin_status = 'up'"
" AND link_oper_status = 'up', 1, 0)"
" AS up FROM juniper_alarms"
" WHERE equipment_name = %s AND link_interface_name = %s"
" ORDER BY alarm_id DESC LIMIT 1",
("lo0.eq1", "intfc1"))
def test_interface_status(mocker):
mocked_infinera = mocker.patch(
'inventory_provider.db.alarmsdb'
'.get_last_known_infinera_interface_status')
mocked_infinera.return_value = "unknown"
mocked_coriant = mocker.patch(
'inventory_provider.db.alarmsdb'
'.get_last_known_coriant_interface_status')
mocked_coriant.return_value = "unknown"
mocked_juniper = mocker.patch(
'inventory_provider.db.alarmsdb.'
'get_last_known_juniper_link_interface_status')
mocked_juniper.return_value = "unknown"
assert alarmsdb.get_last_known_interface_status(None, '', '') == "unknown"