import inventory_provider.db.opsdb def test_update_fields(mocker): mocker.patch("inventory_provider" ".db.opsdb._juniper_field_update") t = {"manufacturer": "juniper"} inventory_provider.db.opsdb._update_fields(t) inventory_provider.db.opsdb \ ._juniper_field_update.assert_called_once_with(t) mocker.patch("inventory_provider" ".db.opsdb._infinera_field_update") t = {"manufacturer": "infinera"} inventory_provider.db.opsdb._update_fields(t) inventory_provider.db.opsdb \ ._infinera_field_update.assert_called_once_with(t) f = {"manufacturer": "non-existent"} r = inventory_provider.db.opsdb._update_fields(f) assert f == r def test_infinera_field_update(): i = { "equipment": "AMS01-DTNX10-1-1", "card_id": "tim-b-5-7", "port": "1", "other_end_equipment": "LON01-DTNX10-1-1", "other_end_card_id": "tim-b-5-8", "other_end_port": "2" } r = inventory_provider.db.opsdb._infinera_field_update(i) assert r["equipment"] == "AMS01-DTNX10-1" assert r["interface_name"] == "1-B-5-7-1" assert r["other_end_equipment"] == "LON01-DTNX10-1" assert r["other_end_interface_name"] == "1-B-5-8-2" i = { "equipment": "BUD01_CX_01", "card_id": "tim-1/2", "port": "1", "other_end_equipment": "LON01_CX_01", "other_end_card_id": "tim-2/3", "other_end_port": "4" } r = inventory_provider.db.opsdb._infinera_field_update(i) assert r["equipment"] == "BUD01_CX_01" assert r["interface_name"] == "1/2-1" assert r["other_end_equipment"] == "LON01_CX_01" assert r["other_end_interface_name"] == "2/3-4" i = { "equipment": "irrelevant", "card_id": "tim_1/2", "port": "1", "other_end_equipment": "irrelevant", "other_end_card_id": "tim_2/3", "other_end_port": "4" } r = inventory_provider.db.opsdb._infinera_field_update(i) assert r["interface_name"] == "TIM_1/2-1" assert r["other_end_interface_name"] == "TIM_2/3-4" def test_juniper_field_update(): i = { "interface_name": "xe-1/2", "logical_unit": None, "other_end_interface_name": "xe-3/2", "other_end_logical_unit": None } r = inventory_provider.db.opsdb._juniper_field_update(i) assert r["interface_name"] == "xe-1/2" assert r["other_end_interface_name"] == "xe-3/2" i["interface_name"] = "xe-1/2" i["logical_unit"] = 101 i["other_end_interface_name"] = "xe-3/2" i["other_end_logical_unit"] = 301 r = inventory_provider.db.opsdb._juniper_field_update(i) assert r["interface_name"] == "xe-1/2.101" assert r["other_end_interface_name"] == "xe-3/2.301" i["interface_name"] = "xe-1/2" i["logical_unit"] = 0 i["other_end_interface_name"] = "xe-3/2" i["other_end_logical_unit"] = 0 r = inventory_provider.db.opsdb._juniper_field_update(i) assert r["interface_name"] == "xe-1/2.0" assert r["other_end_interface_name"] == "xe-3/2.0" i["interface_name"] = "xe-1/2" i["logical_unit"] = None i["port"] = 0 i["other_end_interface_name"] = "xe-3/2" i["other_end_logical_unit"] = None i["other_end_port"] = 0 r = inventory_provider.db.opsdb._juniper_field_update(i) assert r["interface_name"] == "xe-1/2" assert r["other_end_interface_name"] == "xe-3/2" i["interface_name"] = None i["card_id"] = "xe-2/0" i["logical_unit"] = None i["port"] = None i["other_end_interface_name"] = None i["other_end_card_id"] = "xe-3/0" i["other_end_logical_unit"] = None i["other_end_port"] = None r = inventory_provider.db.opsdb._juniper_field_update(i) assert r["interface_name"] == "xe-2/0" assert r["other_end_interface_name"] == "xe-3/0" i["interface_name"] = None i["port"] = "0" i["other_end_interface_name"] = None i["other_end_port"] = "0" r = inventory_provider.db.opsdb._juniper_field_update(i) assert r["interface_name"] == "xe-2/0/0" assert r["other_end_interface_name"] == "xe-3/0/0" i["interface_name"] = None i["port"] = 0 i["other_end_interface_name"] = None i["other_end_port"] = 0 r = inventory_provider.db.opsdb._juniper_field_update(i) assert r["interface_name"] == "xe-2/0/0" assert r["other_end_interface_name"] == "xe-3/0/0" i["interface_name"] = None i["logical_unit"] = "123" i["other_end_interface_name"] = None i["other_end_logical_unit"] = "323" r = inventory_provider.db.opsdb._juniper_field_update(i) assert r["interface_name"] == "xe-2/0/0.123" assert r["other_end_interface_name"] == "xe-3/0/0.323" i["interface_name"] = None i["logical_unit"] = 123 i["other_end_interface_name"] = None i["other_end_logical_unit"] = 323 r = inventory_provider.db.opsdb._juniper_field_update(i) assert r["interface_name"] == "xe-2/0/0.123" assert r["other_end_interface_name"] == "xe-3/0/0.323" def test_coriant_update_fields(): i = { "equipment": "groove-1", "card_id": "2-3", "port": None } r = inventory_provider.db.opsdb._coriant_field_update(i) assert r["interface_name"] == "2-3" i = { "equipment": "groove-1", "card_id": "2-3", "port": "4" } r = inventory_provider.db.opsdb._coriant_field_update(i) assert r["interface_name"] == "2-3/4" def test_get_circuits(mocker): mocker.patch("inventory_provider.db.db.cursor") mocked_convert_to_dict = mocker.patch( "inventory_provider.db.opsdb._convert_to_dict") i = {"manufacturer": "infinera"} j = {"manufacturer": "juniper"} mocked_convert_to_dict.return_value = [i, j] mocked_infinera_update = mocker.patch( "inventory_provider.db.opsdb._infinera_field_update") mocked_juniper_update = mocker.patch( "inventory_provider.db.opsdb._juniper_field_update") inventory_provider.db.opsdb.get_circuits(None) mocked_infinera_update.assert_called_once_with(i) mocked_juniper_update.assert_called_once_with(j)