-
Robert Latta authoredRobert Latta authored
test_opsdb.py 6.03 KiB
import inventory_provider.db.opsdb
def test_update_fields(mocker):
mocker.patch("inventory_provider"
".db.opsdb._juniper_field_update")
t = {"manufacturer": "juniper"}
inventory_provider.db.opsdb._update_fields(t)
inventory_provider.db.opsdb \
._juniper_field_update.assert_called_once_with(t)
mocker.patch("inventory_provider"
".db.opsdb._infinera_field_update")
t = {"manufacturer": "infinera"}
inventory_provider.db.opsdb._update_fields(t)
inventory_provider.db.opsdb \
._infinera_field_update.assert_called_once_with(t)
f = {"manufacturer": "non-existent"}
r = inventory_provider.db.opsdb._update_fields(f)
assert f == r
def test_infinera_field_update():
i = {
"equipment": "AMS01-DTNX10-1-1",
"card_id": "tim-b-5-7",
"port": "1",
"other_end_equipment": "LON01-DTNX10-1-1",
"other_end_card_id": "tim-b-5-8",
"other_end_port": "2"
}
r = inventory_provider.db.opsdb._infinera_field_update(i)
assert r["equipment"] == "AMS01-DTNX10-1"
assert r["interface_name"] == "1-B-5-7-1"
assert r["other_end_equipment"] == "LON01-DTNX10-1"
assert r["other_end_interface_name"] == "1-B-5-8-2"
i = {
"equipment": "BUD01_CX_01",
"card_id": "tim-1/2",
"port": "1",
"other_end_equipment": "LON01_CX_01",
"other_end_card_id": "tim-2/3",
"other_end_port": "4"
}
r = inventory_provider.db.opsdb._infinera_field_update(i)
assert r["equipment"] == "BUD01_CX_01"
assert r["interface_name"] == "1/2-1"
assert r["other_end_equipment"] == "LON01_CX_01"
assert r["other_end_interface_name"] == "2/3-4"
i = {
"equipment": "irrelevant",
"card_id": "tim_1/2",
"port": "1",
"other_end_equipment": "irrelevant",
"other_end_card_id": "tim_2/3",
"other_end_port": "4"
}
r = inventory_provider.db.opsdb._infinera_field_update(i)
assert r["interface_name"] == "TIM_1/2-1"
assert r["other_end_interface_name"] == "TIM_2/3-4"
def test_juniper_field_update():
i = {
"interface_name": "xe-1/2",
"logical_unit": None,
"other_end_interface_name": "xe-3/2",
"other_end_logical_unit": None
}
r = inventory_provider.db.opsdb._juniper_field_update(i)
assert r["interface_name"] == "xe-1/2"
assert r["other_end_interface_name"] == "xe-3/2"
i["interface_name"] = "xe-1/2"
i["logical_unit"] = 101
i["other_end_interface_name"] = "xe-3/2"
i["other_end_logical_unit"] = 301
r = inventory_provider.db.opsdb._juniper_field_update(i)
assert r["interface_name"] == "xe-1/2.101"
assert r["other_end_interface_name"] == "xe-3/2.301"
i["interface_name"] = "xe-1/2"
i["logical_unit"] = 0
i["other_end_interface_name"] = "xe-3/2"
i["other_end_logical_unit"] = 0
r = inventory_provider.db.opsdb._juniper_field_update(i)
assert r["interface_name"] == "xe-1/2.0"
assert r["other_end_interface_name"] == "xe-3/2.0"
i["interface_name"] = "xe-1/2"
i["logical_unit"] = None
i["port"] = 0
i["other_end_interface_name"] = "xe-3/2"
i["other_end_logical_unit"] = None
i["other_end_port"] = 0
r = inventory_provider.db.opsdb._juniper_field_update(i)
assert r["interface_name"] == "xe-1/2"
assert r["other_end_interface_name"] == "xe-3/2"
i["interface_name"] = None
i["card_id"] = "xe-2/0"
i["logical_unit"] = None
i["port"] = None
i["other_end_interface_name"] = None
i["other_end_card_id"] = "xe-3/0"
i["other_end_logical_unit"] = None
i["other_end_port"] = None
r = inventory_provider.db.opsdb._juniper_field_update(i)
assert r["interface_name"] == "xe-2/0"
assert r["other_end_interface_name"] == "xe-3/0"
i["interface_name"] = None
i["port"] = "0"
i["other_end_interface_name"] = None
i["other_end_port"] = "0"
r = inventory_provider.db.opsdb._juniper_field_update(i)
assert r["interface_name"] == "xe-2/0/0"
assert r["other_end_interface_name"] == "xe-3/0/0"
i["interface_name"] = None
i["port"] = 0
i["other_end_interface_name"] = None
i["other_end_port"] = 0
r = inventory_provider.db.opsdb._juniper_field_update(i)
assert r["interface_name"] == "xe-2/0/0"
assert r["other_end_interface_name"] == "xe-3/0/0"
i["interface_name"] = None
i["logical_unit"] = "123"
i["other_end_interface_name"] = None
i["other_end_logical_unit"] = "323"
r = inventory_provider.db.opsdb._juniper_field_update(i)
assert r["interface_name"] == "xe-2/0/0.123"
assert r["other_end_interface_name"] == "xe-3/0/0.323"
i["interface_name"] = None
i["logical_unit"] = 123
i["other_end_interface_name"] = None
i["other_end_logical_unit"] = 323
r = inventory_provider.db.opsdb._juniper_field_update(i)
assert r["interface_name"] == "xe-2/0/0.123"
assert r["other_end_interface_name"] == "xe-3/0/0.323"
def test_coriant_update_fields():
i = {
"equipment": "groove-1",
"card_id": "2-3",
"port": None
}
r = inventory_provider.db.opsdb._coriant_field_update(i)
assert r["interface_name"] == "2-3"
i = {
"equipment": "groove-1",
"card_id": "2-3",
"port": "4"
}
r = inventory_provider.db.opsdb._coriant_field_update(i)
assert r["interface_name"] == "2-3/4"
def test_get_circuits(mocker):
mocker.patch("inventory_provider.db.db.cursor")
mocked_convert_to_dict = mocker.patch(
"inventory_provider.db.opsdb._convert_to_dict")
i = {"manufacturer": "infinera"}
j = {"manufacturer": "juniper"}
mocked_convert_to_dict.return_value = [i, j]
mocked_infinera_update = mocker.patch(
"inventory_provider.db.opsdb._infinera_field_update")
mocked_juniper_update = mocker.patch(
"inventory_provider.db.opsdb._juniper_field_update")
inventory_provider.db.opsdb.get_circuits(None)
mocked_infinera_update.assert_called_once_with(i)
mocked_juniper_update.assert_called_once_with(j)