Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision

Target

Select target project
  • goat/gap/geant-service-orchestrator
1 result
Select Git revision
Show changes
Showing
with 958 additions and 1 deletion
from os import PathLike
from unittest.mock import patch
import pytest
from pydantic_forms.exceptions import FormValidationError
from gso.products import EdgePort, ProductName, Router
from gso.products.product_blocks.edge_port import EdgePortType, EncapsulationType
from gso.services.subscriptions import get_product_id_by_name
from gso.utils.types.interfaces import PhysicalPortCapacity
from test.services.conftest import MockedNetboxClient
from test.workflows import (
assert_complete,
assert_lso_interaction_success,
extract_state,
run_workflow,
)
@pytest.fixture()
def _netbox_client_mock():
# Mock NetboxClient methods
with (
patch("gso.services.netbox_client.NetboxClient.get_device_by_name") as mock_get_device_by_name,
patch("gso.services.netbox_client.NetboxClient.get_available_interfaces") as mock_get_available_interfaces,
patch("gso.services.netbox_client.NetboxClient.get_available_services_lags") as mock_available_services_lags,
patch("gso.services.netbox_client.NetboxClient.create_interface") as mock_create_interface,
patch("gso.services.netbox_client.NetboxClient.attach_interface_to_lag") as mock_attach_interface_to_lag,
patch("gso.services.netbox_client.NetboxClient.reserve_interface") as mock_reserve_interface,
patch("gso.services.netbox_client.NetboxClient.allocate_interface") as mock_allocate_interface,
):
mock_get_device_by_name.return_value = MockedNetboxClient().get_device_by_name()
mock_get_available_interfaces.return_value = MockedNetboxClient().get_available_interfaces()
mock_available_services_lags.return_value = MockedNetboxClient().get_available_services_lags()
mock_create_interface.return_value = MockedNetboxClient().create_interface()
mock_attach_interface_to_lag.return_value = MockedNetboxClient().attach_interface_to_lag()
mock_reserve_interface.return_value = MockedNetboxClient().reserve_interface()
mock_allocate_interface.return_value = MockedNetboxClient().allocate_interface()
yield
@pytest.fixture()
def input_form_wizard_data(request, nokia_router_subscription_factory, partner_factory, faker):
create_edge_port_step = {
"tt_number": faker.tt_number(),
"node": nokia_router_subscription_factory(),
"partner": partner_factory(name="GAAR", email=faker.email())["partner_id"],
"service_type": EdgePortType.PUBLIC,
"geant_ga_id": faker.geant_gid(),
"enable_lacp": True,
"speed": PhysicalPortCapacity.HUNDRED_GIGABIT_PER_SECOND,
"encapsulation": EncapsulationType.DOT1Q,
"number_of_members": 2,
"minimum_links": 1,
}
create_edge_port_interface_step = {
"name": "lag-21",
"description": faker.sentence(),
"ae_members": [
{
"interface_name": f"Interface{interface}",
"interface_description": faker.sentence(),
}
for interface in range(2)
],
}
return [
create_edge_port_step,
create_edge_port_interface_step,
]
@pytest.mark.workflow()
@patch("gso.services.lso_client._send_request")
def test_successful_edge_port_creation(
mock_execute_playbook,
responses,
input_form_wizard_data,
faker,
_netbox_client_mock, # noqa: PT019
data_config_filename: PathLike,
test_client,
):
product_id = get_product_id_by_name(ProductName.EDGE_PORT)
initial_data = [{"product": product_id}, *input_form_wizard_data]
result, process_stat, step_log = run_workflow("create_edge_port", initial_data)
for _ in range(2):
result, step_log = assert_lso_interaction_success(result, process_stat, step_log)
assert_complete(result)
state = extract_state(result)
subscription_id = state["subscription_id"]
subscription = EdgePort.from_subscription(subscription_id)
assert subscription.status == "active"
ga_id = input_form_wizard_data[0]["geant_ga_id"]
router_fqdn = Router.from_subscription(input_form_wizard_data[0]["node"]).router.router_fqdn
assert subscription.description == f"Edge Port lag-21 on {router_fqdn}, GAAR, {ga_id}"
assert len(subscription.edge_port.edge_port_ae_members) == 2
assert mock_execute_playbook.call_count == 2
def test_edge_port_creation_with_invalid_input(
input_form_wizard_data,
faker,
_netbox_client_mock, # noqa: PT019
data_config_filename: PathLike,
test_client,
):
product_id = get_product_id_by_name(ProductName.EDGE_PORT)
# If the number of members is greater than 1 then :term:`LACP` must be enabled.
input_form_wizard_data[0]["enable_lacp"] = False
initial_data = [{"product": product_id}, *input_form_wizard_data]
with pytest.raises(FormValidationError) as error:
run_workflow("create_edge_port", initial_data)
error = error.value.errors[0]
assert error["msg"] == "Number of members must be 1 if LACP is disabled."
assert error["loc"][0] == "__root__"
import pytest
from orchestrator.types import SubscriptionLifecycle
from gso.products import ImportedEdgePort
from gso.products.product_blocks.edge_port import EdgePortType, EncapsulationType
from gso.utils.types.interfaces import PhysicalPortCapacity
from test.workflows import assert_complete, extract_state, run_workflow
@pytest.fixture()
def imported_edge_port_creation_input_form_data(nokia_router_subscription_factory, partner_factory, faker):
return {
"node": nokia_router_subscription_factory(),
"service_type": EdgePortType.CUSTOMER,
"speed": PhysicalPortCapacity.TEN_GIGABIT_PER_SECOND,
"encapsulation": EncapsulationType.DOT1Q,
"name": "lag34",
"minimum_links": 2,
"geant_ga_id": faker.geant_gid(),
"mac_address": faker.mac_address(),
"partner": partner_factory()["name"],
"enable_lacp": True,
"ignore_if_down": False,
"ae_members": [
{
"interface_name": faker.network_interface(),
"interface_description": faker.sentence(),
},
{
"interface_name": faker.network_interface(),
"interface_description": faker.sentence(),
},
],
"description": faker.sentence(),
}
def test_create_imported_edge_port_success(faker, imported_edge_port_creation_input_form_data):
result, _, _ = run_workflow("create_imported_edge_port", [imported_edge_port_creation_input_form_data])
state = extract_state(result)
subscription = ImportedEdgePort.from_subscription(state["subscription_id"])
assert_complete(result)
assert subscription.status == SubscriptionLifecycle.ACTIVE
import pytest
from orchestrator.types import SubscriptionLifecycle
from gso.products import EdgePort, ProductName
from test.workflows import assert_complete, run_workflow
@pytest.mark.workflow()
def test_import_edge_port_success(edge_port_subscription_factory):
imported_edge_port = edge_port_subscription_factory(is_imported=False)
result, _, _ = run_workflow("import_edge_port", [{"subscription_id": imported_edge_port}])
subscription = EdgePort.from_subscription(imported_edge_port)
assert_complete(result)
assert subscription.product.name == ProductName.EDGE_PORT
assert subscription.status == SubscriptionLifecycle.ACTIVE
assert subscription.insync
from unittest.mock import patch
import pytest
from gso.products import EdgePort
from gso.utils.types.interfaces import PhysicalPortCapacity
from test.workflows import (
assert_complete,
assert_lso_interaction_success,
extract_state,
run_workflow,
)
from test.workflows.iptrunk.test_create_iptrunk import MockedNetboxClient
@pytest.fixture()
def input_form_wizard_data(request, faker, edge_port_subscription_factory, partner_factory):
subscription_id = edge_port_subscription_factory()
return [
{"subscription_id": subscription_id},
{
"tt_number": faker.tt_number(),
"geant_ga_id": faker.geant_gid(),
"member_speed": PhysicalPortCapacity.FOUR_HUNDRED_GIGABIT_PER_SECOND,
"number_of_members": 1,
},
{
"description": faker.sentence(),
"ae_members": [
{
"interface_name": "Interface1",
"interface_description": faker.sentence(),
}
],
},
]
@pytest.mark.workflow()
@patch("gso.services.lso_client._send_request")
@patch("gso.services.netbox_client.NetboxClient.get_available_interfaces")
@patch("gso.services.netbox_client.NetboxClient.attach_interface_to_lag")
@patch("gso.services.netbox_client.NetboxClient.reserve_interface")
@patch("gso.services.netbox_client.NetboxClient.allocate_interface")
@patch("gso.services.netbox_client.NetboxClient.free_interface")
@patch("gso.services.netbox_client.NetboxClient.detach_interfaces_from_lag")
@patch("gso.services.netbox_client.NetboxClient.get_interface_by_name_and_device")
def test_modify_edge_port_with_changing_capacity(
mocked_get_interface_by_name_and_device,
mocked_detach_interfaces_from_lag,
mocked_free_interface,
mocked_allocate_interface,
mocked_reserve_interface,
mocked_attach_interface_to_lag,
mocked_get_available_interfaces,
mocked_execute_playbook,
input_form_wizard_data,
faker,
data_config_filename,
):
# Set up mock return values
mocked_netbox = MockedNetboxClient()
mocked_get_available_interfaces.return_value = mocked_netbox.get_available_interfaces()
mocked_attach_interface_to_lag.return_value = mocked_netbox.attach_interface_to_lag()
mocked_reserve_interface.return_value = mocked_netbox.reserve_interface()
mocked_allocate_interface.return_value = mocked_netbox.allocate_interface()
mocked_free_interface.return_value = mocked_netbox.free_interface()
mocked_detach_interfaces_from_lag.return_value = mocked_netbox.detach_interfaces_from_lag()
mocked_get_interface_by_name_and_device.side_effect = mocked_netbox.get_interface_by_name_and_device
# Run workflow
result, process_stat, step_log = run_workflow("modify_edge_port", input_form_wizard_data)
for _ in range(2):
result, step_log = assert_lso_interaction_success(result, process_stat, step_log)
assert_complete(result)
# Validate the final state and subscription data
state = extract_state(result)
subscription_id = state["subscription_id"]
subscription = EdgePort.from_subscription(subscription_id)
assert subscription.status == "active"
assert mocked_execute_playbook.call_count == 2
# The number of members have been changed from 2 to 1
assert mocked_reserve_interface.call_count == 1
assert mocked_attach_interface_to_lag.call_count == 1
assert mocked_free_interface.call_count == 2
assert mocked_detach_interfaces_from_lag.call_count == 1
assert subscription.edge_port.edge_port_geant_ga_id == input_form_wizard_data[1]["geant_ga_id"]
assert len(subscription.edge_port.edge_port_ae_members) == 1
@pytest.fixture()
def input_form_wizard_without_changing_capacity(request, faker, edge_port_subscription_factory, partner_factory):
subscription_id = edge_port_subscription_factory()
subscription = EdgePort.from_subscription(subscription_id)
return [
{"subscription_id": subscription_id},
{"tt_number": faker.tt_number(), "geant_ga_id": faker.geant_gid()},
{
"description": faker.sentence(),
"ae_members": [
{
"interface_name": interface.interface_name,
"interface_description": interface.interface_description,
}
for interface in subscription.edge_port.edge_port_ae_members
],
},
]
@pytest.mark.workflow()
@patch("gso.services.lso_client._send_request")
@patch("gso.services.netbox_client.NetboxClient.get_available_interfaces")
@patch("gso.services.netbox_client.NetboxClient.attach_interface_to_lag")
@patch("gso.services.netbox_client.NetboxClient.reserve_interface")
@patch("gso.services.netbox_client.NetboxClient.allocate_interface")
@patch("gso.services.netbox_client.NetboxClient.free_interface")
@patch("gso.services.netbox_client.NetboxClient.detach_interfaces_from_lag")
@patch("gso.services.netbox_client.NetboxClient.get_interface_by_name_and_device")
def test_modify_edge_port_without_changing_capacity(
mocked_get_interface_by_name_and_device,
mocked_detach_interfaces_from_lag,
mocked_free_interface,
mocked_allocate_interface,
mocked_reserve_interface,
mocked_attach_interface_to_lag,
mocked_get_available_interfaces,
mocked_execute_playbook,
input_form_wizard_without_changing_capacity,
faker,
data_config_filename,
):
# Set up mock return values
mocked_netbox = MockedNetboxClient()
mocked_get_available_interfaces.return_value = mocked_netbox.get_available_interfaces()
mocked_attach_interface_to_lag.return_value = mocked_netbox.attach_interface_to_lag()
mocked_reserve_interface.return_value = mocked_netbox.reserve_interface()
mocked_allocate_interface.return_value = mocked_netbox.allocate_interface()
mocked_free_interface.return_value = mocked_netbox.free_interface()
mocked_detach_interfaces_from_lag.return_value = mocked_netbox.detach_interfaces_from_lag()
mocked_get_interface_by_name_and_device.side_effect = mocked_netbox.get_interface_by_name_and_device
# Run workflow
result, _, _ = run_workflow("modify_edge_port", input_form_wizard_without_changing_capacity)
assert_complete(result)
state = extract_state(result)
subscription_id = state["subscription_id"]
subscription = EdgePort.from_subscription(subscription_id)
assert subscription.status == "active"
# The capacity has not been changed so the following methods should not be called
assert mocked_execute_playbook.call_count == 0
assert mocked_reserve_interface.call_count == 0
assert mocked_attach_interface_to_lag.call_count == 0
assert mocked_free_interface.call_count == 0
assert mocked_detach_interfaces_from_lag.call_count == 0
assert subscription.edge_port.edge_port_geant_ga_id == input_form_wizard_without_changing_capacity[1]["geant_ga_id"]
assert len(subscription.edge_port.edge_port_ae_members) == 2
assert subscription.edge_port.edge_port_description == input_form_wizard_without_changing_capacity[2]["description"]
from unittest.mock import patch
import pytest
from gso.products import EdgePort
from test.services.conftest import MockedNetboxClient
from test.workflows import (
assert_complete,
assert_lso_interaction_success,
extract_state,
run_workflow,
)
@pytest.mark.workflow()
@patch("gso.services.lso_client._send_request")
@patch("gso.services.netbox_client.NetboxClient.delete_interface")
@patch("gso.services.netbox_client.NetboxClient.free_interface")
def test_successful_edge_port_termination(
mocked_free_interface,
mocked_delete_interface,
mock_execute_playbook,
edge_port_subscription_factory,
faker,
data_config_filename,
):
# Set up mock return values
subscription_id = edge_port_subscription_factory()
mocked_netbox = MockedNetboxClient()
mocked_delete_interface.return_value = mocked_netbox.delete_interface()
mocked_free_interface.return_value = mocked_netbox.free_interface()
# Run workflow
initial_data = [
{"subscription_id": subscription_id},
{
"tt_number": faker.tt_number(),
},
]
result, process_stat, step_log = run_workflow("terminate_edge_port", initial_data)
for _ in range(2):
result, step_log = assert_lso_interaction_success(result, process_stat, step_log)
assert_complete(result)
# Check NetboxClient calls
assert mocked_delete_interface.call_count == 1 # Delete the lag
assert mocked_free_interface.call_count == 2 # Free interfaces attached to the lag which is 2
state = extract_state(result)
subscription_id = state["subscription_id"]
subscription = EdgePort.from_subscription(subscription_id)
assert subscription.status == "terminated"
assert mock_execute_playbook.call_count == 2
from unittest.mock import patch
import pytest
from gso.products import EdgePort
from test.services.conftest import MockedNetboxClient
from test.workflows import (
assert_complete,
assert_lso_success,
extract_state,
run_workflow,
)
@pytest.mark.workflow()
@patch("gso.services.lso_client._send_request")
@patch("gso.services.netbox_client.NetboxClient.get_interface_by_name_and_device")
def test_validate_edge_port_success(
mock_get_interface_by_name_and_device,
mock_execute_playbook,
edge_port_subscription_factory,
faker,
data_config_filename,
):
subscription_id = edge_port_subscription_factory()
mock_get_interface_by_name_and_device.side_effect = [
MockedNetboxClient.BaseMockObject(
name="iFace1",
module=MockedNetboxClient.BaseMockObject(display="display1"),
description=subscription_id,
enabled=True,
),
MockedNetboxClient.BaseMockObject(
name="iFace2",
module=MockedNetboxClient.BaseMockObject(display="display2"),
description=subscription_id,
enabled=True,
),
MockedNetboxClient.BaseMockObject(
name="Iface3",
module=MockedNetboxClient.BaseMockObject(display="display3"),
description=subscription_id,
enabled=True,
),
]
# Run workflow
initial_data = [{"subscription_id": subscription_id}]
result, process_stat, step_log = run_workflow("validate_edge_port", initial_data)
state = extract_state(result)
subscription_id = state["subscription_id"]
for _ in range(1):
result, step_log = assert_lso_success(result, process_stat, step_log)
assert_complete(result)
subscription = EdgePort.from_subscription(subscription_id)
assert subscription.status == "active"
assert mock_execute_playbook.call_count == 1
# One time for getting the :term:`LAG` and two times for getting the interfaces
assert mock_get_interface_by_name_and_device.call_count == 3
from unittest.mock import patch
import pytest
from orchestrator.types import SubscriptionLifecycle
from gso.products import ProductName
from gso.products.product_types.nren_l3_core_service import GeantIP
from gso.services.subscriptions import get_product_id_by_name
from gso.utils.shared_enums import APType
from test.workflows import assert_complete, assert_lso_interaction_success, extract_state, run_workflow
@pytest.fixture()
def base_bgp_peer_input(faker):
def _base_bgp_peer_input():
bfd_enabled = faker.boolean()
return {
"bfd_enabled": bfd_enabled,
"bfd_interval": faker.pyint() if bfd_enabled else None,
"bfd_multiplier": faker.pyint() if bfd_enabled else None,
"has_custom_policies": faker.boolean(),
"authentication_key": faker.password(),
"multipath_enabled": faker.boolean(),
"send_default_route": faker.boolean(),
"is_passive": faker.boolean(),
}
return _base_bgp_peer_input
@pytest.mark.workflow()
@patch("gso.services.lso_client._send_request")
def test_create_geant_ip_success(
mock_lso_client,
responses,
faker,
partner_factory,
edge_port_subscription_factory,
base_bgp_peer_input,
data_config_filename,
):
partner = partner_factory()
product_id = get_product_id_by_name(ProductName.GEANT_IP)
edge_port_a = edge_port_subscription_factory(partner=partner)
form_input_data = [
{"product": product_id},
{"tt_number": faker.tt_number(), "partner": partner["partner_id"]},
{"edge_ports": [{"edge_port": edge_port_a, "ap_type": APType.PRIMARY}]},
{
"geant_sid": faker.geant_sid(),
"is_tagged": faker.boolean(),
"vlan_id": faker.vlan_id(),
"ipv4_address": faker.ipv4(),
"ipv4_mask": faker.ipv4_netmask(),
"ipv6_address": faker.ipv6(),
"ipv6_mask": faker.ipv6_netmask(),
"custom_firewall_filters": faker.boolean(),
"v4_bgp_peer": base_bgp_peer_input() | {"add_v4_multicast": faker.boolean(), "peer_address": faker.ipv4()},
"v6_bgp_peer": base_bgp_peer_input() | {"add_v6_multicast": faker.boolean(), "peer_address": faker.ipv6()},
},
]
lso_interaction_count = 6
result, process_stat, step_log = run_workflow("create_geant_ip", form_input_data)
for _ in range(lso_interaction_count):
result, step_log = assert_lso_interaction_success(result, process_stat, step_log)
assert_complete(result)
state = extract_state(result)
subscription = GeantIP.from_subscription(state["subscription_id"])
assert mock_lso_client.call_count == lso_interaction_count
assert subscription.status == SubscriptionLifecycle.ACTIVE
assert len(subscription.geant_ip.geant_ip_ap_list) == 1
assert (
str(subscription.geant_ip.geant_ip_ap_list[0].geant_ip_sbp.edge_port.owner_subscription_id)
== form_input_data[2]["edge_ports"][0]["edge_port"]
)
import pytest
from orchestrator.types import SubscriptionLifecycle
from gso.products import ImportedGeantIP
from gso.products.product_blocks.bgp_session import IPFamily
from gso.utils.shared_enums import SBPType
from test.workflows import assert_complete, extract_state, run_workflow
@pytest.fixture()
def imported_geant_ip_creation_input_form_data(edge_port_subscription_factory, partner_factory, faker):
return {
"partner": partner_factory()["name"],
"service_binding_ports": [
{
"edge_port": edge_port_subscription_factory(),
"ap_type": "PRIMARY",
"geant_sid": faker.geant_sid(),
"sbp_type": SBPType.L3,
"is_tagged": faker.boolean(),
"vlan_id": faker.vlan_id(),
"ipv4_address": faker.ipv4(),
"ipv4_mask": faker.ipv4_netmask(),
"ipv6_address": faker.ipv6(),
"ipv6_mask": faker.ipv6_netmask(),
"custom_firewall_filters": faker.boolean(),
"bgp_peers": [
{
"bfd_enabled": faker.boolean(),
"bfd_interval": faker.pyint(),
"bfd_multiplier": faker.pyint(),
"has_custom_policies": faker.boolean(),
"authentication_key": faker.password(),
"multipath_enabled": faker.boolean(),
"send_default_route": faker.boolean(),
"is_passive": faker.boolean(),
"peer_address": faker.ipv4(),
"families": [IPFamily.V4UNICAST, IPFamily.V4MULTICAST],
"is_multi_hop": faker.boolean(),
"rtbh_enabled": faker.boolean(),
},
{
"bfd_enabled": faker.boolean(),
"bfd_interval": faker.pyint(),
"bfd_multiplier": faker.pyint(),
"has_custom_policies": faker.boolean(),
"authentication_key": faker.password(),
"multipath_enabled": faker.boolean(),
"send_default_route": faker.boolean(),
"is_passive": faker.boolean(),
"peer_address": faker.ipv6(),
"families": [IPFamily.V6UNICAST],
"is_multi_hop": faker.boolean(),
"rtbh_enabled": faker.boolean(),
},
],
}
],
}
def test_create_imported_geant_ip_success(faker, imported_geant_ip_creation_input_form_data):
result, _, _ = run_workflow("create_imported_geant_ip", [imported_geant_ip_creation_input_form_data])
state = extract_state(result)
subscription = ImportedGeantIP.from_subscription(state["subscription_id"])
assert_complete(result)
assert subscription.status == SubscriptionLifecycle.ACTIVE
import pytest
from orchestrator.types import SubscriptionLifecycle
from gso.products import GeantIP, ProductName
from test.workflows import assert_complete, run_workflow
@pytest.mark.workflow()
def test_import_edge_port_success(geant_ip_subscription_factory):
imported_geant_ip = geant_ip_subscription_factory(is_imported=False)
result, _, _ = run_workflow("import_geant_ip", [{"subscription_id": imported_geant_ip}])
subscription = GeantIP.from_subscription(imported_geant_ip)
assert_complete(result)
assert subscription.product.name == ProductName.GEANT_IP
assert subscription.status == SubscriptionLifecycle.ACTIVE
assert subscription.insync
import pytest
from gso.products.product_types.nren_l3_core_service import GeantIP
from test.workflows import assert_complete, extract_state, run_workflow
@pytest.mark.workflow()
def test_migrate_geant_ip_success(
faker, edge_port_subscription_factory, partner_factory, geant_ip_subscription_factory
):
partner = partner_factory()
subscription_id = geant_ip_subscription_factory(partner=partner)
new_edge_port_1 = edge_port_subscription_factory(partner=partner)
new_edge_port_2 = edge_port_subscription_factory(partner=partner)
subscription = GeantIP.from_subscription(subscription_id)
form_input_data = [
{"subscription_id": subscription_id},
{
"tt_number": faker.tt_number(),
"edge_port_selection": [
{
"old_edge_port": subscription.geant_ip.geant_ip_ap_list[0].geant_ip_sbp.edge_port.description,
"new_edge_port": new_edge_port_1,
},
{
"old_edge_port": subscription.geant_ip.geant_ip_ap_list[1].geant_ip_sbp.edge_port.description,
"new_edge_port": new_edge_port_2,
},
],
},
]
result, _, _ = run_workflow("migrate_geant_ip", form_input_data)
assert_complete(result)
state = extract_state(result)
subscription = GeantIP.from_subscription(state["subscription_id"])
assert subscription.insync
assert len(subscription.geant_ip.geant_ip_ap_list) == 2
assert (
str(subscription.geant_ip.geant_ip_ap_list[0].geant_ip_sbp.edge_port.owner_subscription_id) == new_edge_port_1
)
assert (
str(subscription.geant_ip.geant_ip_ap_list[1].geant_ip_sbp.edge_port.owner_subscription_id) == new_edge_port_2
)
import pytest
from gso.products.product_blocks.bgp_session import IPFamily
from gso.products.product_types.nren_l3_core_service import GeantIP
from gso.utils.shared_enums import APType
from test.workflows import extract_state, run_workflow
@pytest.mark.workflow()
def test_modify_geant_ip_remove_edge_port_success(geant_ip_subscription_factory):
subscription_id = geant_ip_subscription_factory()
subscription = GeantIP.from_subscription(subscription_id)
access_port = subscription.geant_ip.geant_ip_ap_list[0]
input_form_data = [
{"subscription_id": subscription_id},
{
"access_ports": [
{
"geant_ip_ep": str(access_port.geant_ip_sbp.edge_port.owner_subscription_id),
"nren_ap_type": APType.LOAD_BALANCED,
}
] # The factory generates a subscription with two Access Ports, this will remove the second one.
},
{},
]
result, _, _ = run_workflow("modify_geant_ip", input_form_data)
state = extract_state(result)
subscription = GeantIP.from_subscription(state["subscription_id"])
assert len(subscription.geant_ip.geant_ip_ap_list) == 1
assert subscription.geant_ip.geant_ip_ap_list[0].nren_ap_type == APType.LOAD_BALANCED
@pytest.mark.workflow()
def test_modify_geant_ip_add_new_edge_port_success(
geant_ip_subscription_factory, edge_port_subscription_factory, partner_factory, faker
):
partner = partner_factory()
new_edge_port = edge_port_subscription_factory(partner=partner)
subscription_id = geant_ip_subscription_factory(partner=partner)
subscription = GeantIP.from_subscription(subscription_id)
input_form_data = [
{"subscription_id": subscription_id},
{
"access_ports": [
{
"geant_ip_ep": str(port.geant_ip_sbp.edge_port.owner_subscription_id),
"nren_ap_type": port.nren_ap_type,
}
for port in subscription.geant_ip.geant_ip_ap_list
]
+ [
{
"geant_ip_ep": str(new_edge_port),
"nren_ap_type": APType.BACKUP,
}
]
},
{}, # The existing SBPs are unchanged
{},
{ # Adding configuration for the new SBP
"geant_sid": faker.geant_sid(),
"vlan_id": faker.vlan_id(),
"ipv4_address": faker.ipv4(),
"ipv6_address": faker.ipv6(),
"v4_bgp_peer": {
"authentication_key": faker.password(),
"peer_address": faker.ipv4(),
},
"v6_bgp_peer": {
"authentication_key": faker.password(),
"peer_address": faker.ipv6(),
},
},
]
result, _, _ = run_workflow("modify_geant_ip", input_form_data)
state = extract_state(result)
subscription = GeantIP.from_subscription(state["subscription_id"])
assert len(subscription.geant_ip.geant_ip_ap_list) == 3
@pytest.mark.workflow()
def test_modify_geant_ip_modify_edge_port_success(faker, geant_ip_subscription_factory):
subscription_id = geant_ip_subscription_factory()
subscription = GeantIP.from_subscription(subscription_id)
new_sbp_data = [
{
"geant_sid": faker.geant_sid(),
"is_tagged": True,
"vlan_id": faker.vlan_id(),
"ipv4_address": faker.ipv4(),
"ipv6_address": faker.ipv6(),
"custom_firewall_filters": True,
"v4_bgp_peer": {
"bfd_enabled": True,
"bfd_interval": faker.pyint(),
"bfd_multiplier": faker.pyint(),
"has_custom_policies": True,
"authentication_key": faker.password(),
"multipath_enabled": True,
"send_default_route": True,
"is_passive": True,
"peer_address": faker.ipv4(),
"add_v4_multicast": True,
},
"v6_bgp_peer": {
"bfd_enabled": True,
"bfd_interval": faker.pyint(),
"bfd_multiplier": faker.pyint(),
"has_custom_policies": True,
"authentication_key": faker.password(),
"multipath_enabled": True,
"send_default_route": True,
"is_passive": True,
"peer_address": faker.ipv6(),
"add_v6_multicast": True,
},
},
{
"geant_sid": faker.geant_sid(),
"is_tagged": True,
"vlan_id": faker.vlan_id(),
"ipv4_address": faker.ipv4(),
"ipv6_address": faker.ipv6(),
"custom_firewall_filters": True,
"v4_bgp_peer": {
"bfd_enabled": True,
"bfd_interval": faker.pyint(),
"bfd_multiplier": faker.pyint(),
"has_custom_policies": True,
"authentication_key": faker.password(),
"multipath_enabled": True,
"send_default_route": True,
"is_passive": True,
"peer_address": faker.ipv4(),
"add_v4_multicast": True,
},
"v6_bgp_peer": {
"bfd_enabled": True,
"bfd_interval": faker.pyint(),
"bfd_multiplier": faker.pyint(),
"has_custom_policies": True,
"authentication_key": faker.password(),
"multipath_enabled": True,
"send_default_route": True,
"is_passive": True,
"peer_address": faker.ipv6(),
"add_v6_multicast": True,
},
},
]
input_form_data = [
{"subscription_id": subscription_id},
{
"access_ports": [
{
"geant_ip_ep": str(port.geant_ip_sbp.edge_port.owner_subscription_id),
"nren_ap_type": port.nren_ap_type,
}
for port in subscription.geant_ip.geant_ip_ap_list
]
},
{**new_sbp_data[0]},
{**new_sbp_data[1]},
]
result, _, _ = run_workflow("modify_geant_ip", input_form_data)
state = extract_state(result)
subscription = GeantIP.from_subscription(state["subscription_id"])
assert len(subscription.geant_ip.geant_ip_ap_list) == 2
for i in range(2):
assert subscription.geant_ip.geant_ip_ap_list[i].geant_ip_sbp.geant_sid == new_sbp_data[i]["geant_sid"]
assert subscription.geant_ip.geant_ip_ap_list[i].geant_ip_sbp.is_tagged == new_sbp_data[i]["is_tagged"]
assert subscription.geant_ip.geant_ip_ap_list[i].geant_ip_sbp.vlan_id == new_sbp_data[i]["vlan_id"]
assert (
str(subscription.geant_ip.geant_ip_ap_list[i].geant_ip_sbp.ipv4_address) == new_sbp_data[i]["ipv4_address"]
)
assert (
str(subscription.geant_ip.geant_ip_ap_list[i].geant_ip_sbp.ipv6_address) == new_sbp_data[i]["ipv6_address"]
)
assert (
subscription.geant_ip.geant_ip_ap_list[i].geant_ip_sbp.custom_firewall_filters
== new_sbp_data[i]["custom_firewall_filters"]
)
assert (
subscription.geant_ip.geant_ip_ap_list[i].geant_ip_sbp.sbp_bgp_session_list[0].bfd_enabled
== new_sbp_data[i]["v4_bgp_peer"]["bfd_enabled"]
)
assert (
subscription.geant_ip.geant_ip_ap_list[i].geant_ip_sbp.sbp_bgp_session_list[0].bfd_interval
== new_sbp_data[i]["v4_bgp_peer"]["bfd_interval"]
)
assert (
subscription.geant_ip.geant_ip_ap_list[i].geant_ip_sbp.sbp_bgp_session_list[0].bfd_multiplier
== new_sbp_data[i]["v4_bgp_peer"]["bfd_multiplier"]
)
assert (
subscription.geant_ip.geant_ip_ap_list[i].geant_ip_sbp.sbp_bgp_session_list[0].has_custom_policies
== new_sbp_data[i]["v4_bgp_peer"]["has_custom_policies"]
)
assert (
subscription.geant_ip.geant_ip_ap_list[i].geant_ip_sbp.sbp_bgp_session_list[0].authentication_key
== new_sbp_data[i]["v4_bgp_peer"]["authentication_key"]
)
assert (
subscription.geant_ip.geant_ip_ap_list[i].geant_ip_sbp.sbp_bgp_session_list[0].multipath_enabled
== new_sbp_data[i]["v4_bgp_peer"]["multipath_enabled"]
)
assert (
subscription.geant_ip.geant_ip_ap_list[i].geant_ip_sbp.sbp_bgp_session_list[0].send_default_route
== new_sbp_data[i]["v4_bgp_peer"]["send_default_route"]
)
assert (
subscription.geant_ip.geant_ip_ap_list[i].geant_ip_sbp.sbp_bgp_session_list[0].is_passive
== new_sbp_data[i]["v4_bgp_peer"]["is_passive"]
)
assert (
str(subscription.geant_ip.geant_ip_ap_list[i].geant_ip_sbp.sbp_bgp_session_list[0].peer_address)
== new_sbp_data[i]["v4_bgp_peer"]["peer_address"]
)
assert (
bool(
IPFamily.V4MULTICAST
in subscription.geant_ip.geant_ip_ap_list[i].geant_ip_sbp.sbp_bgp_session_list[0].families
)
== new_sbp_data[i]["v4_bgp_peer"]["add_v4_multicast"]
)
assert (
subscription.geant_ip.geant_ip_ap_list[i].geant_ip_sbp.sbp_bgp_session_list[1].bfd_enabled
== new_sbp_data[i]["v6_bgp_peer"]["bfd_enabled"]
)
assert (
subscription.geant_ip.geant_ip_ap_list[i].geant_ip_sbp.sbp_bgp_session_list[1].bfd_interval
== new_sbp_data[i]["v6_bgp_peer"]["bfd_interval"]
)
assert (
subscription.geant_ip.geant_ip_ap_list[i].geant_ip_sbp.sbp_bgp_session_list[1].bfd_multiplier
== new_sbp_data[i]["v6_bgp_peer"]["bfd_multiplier"]
)
assert (
subscription.geant_ip.geant_ip_ap_list[i].geant_ip_sbp.sbp_bgp_session_list[1].has_custom_policies
== new_sbp_data[i]["v6_bgp_peer"]["has_custom_policies"]
)
assert (
subscription.geant_ip.geant_ip_ap_list[i].geant_ip_sbp.sbp_bgp_session_list[1].authentication_key
== new_sbp_data[i]["v6_bgp_peer"]["authentication_key"]
)
assert (
subscription.geant_ip.geant_ip_ap_list[i].geant_ip_sbp.sbp_bgp_session_list[1].multipath_enabled
== new_sbp_data[i]["v6_bgp_peer"]["multipath_enabled"]
)
assert (
subscription.geant_ip.geant_ip_ap_list[i].geant_ip_sbp.sbp_bgp_session_list[1].send_default_route
== new_sbp_data[i]["v6_bgp_peer"]["send_default_route"]
)
assert (
subscription.geant_ip.geant_ip_ap_list[i].geant_ip_sbp.sbp_bgp_session_list[1].is_passive
== new_sbp_data[i]["v6_bgp_peer"]["is_passive"]
)
assert (
str(subscription.geant_ip.geant_ip_ap_list[i].geant_ip_sbp.sbp_bgp_session_list[1].peer_address)
== new_sbp_data[i]["v6_bgp_peer"]["peer_address"]
)
assert (
bool(
IPFamily.V6MULTICAST
in subscription.geant_ip.geant_ip_ap_list[i].geant_ip_sbp.sbp_bgp_session_list[1].families
)
== new_sbp_data[i]["v6_bgp_peer"]["add_v6_multicast"]
)
......@@ -6,7 +6,7 @@ from pydantic_forms.exceptions import FormValidationError
from sqlalchemy import select
from gso.services.partners import filter_partners_by_name
from test.fixtures import create_subscription_for_mapping
from test.fixtures.common_fixtures import create_subscription_for_mapping
from test.workflows import assert_complete, run_workflow
CORRECT_SUBSCRIPTION = str(uuid4())
......