From 4b490666761b80f1af1ef003ca687d03aada5213 Mon Sep 17 00:00:00 2001
From: Neda Moeini <neda.moeini@geant.org>
Date: Wed, 18 Sep 2024 11:37:32 +0200
Subject: [PATCH] Add Edge port unit tests and improve the fixtures for the
 tests.

# Conflicts:
#	test/fixtures.py
---
 gso/services/netbox_client.py                 |   3 +-
 gso/utils/helpers.py                          |   2 +-
 gso/workflows/edge_port/create_edge_port.py   |   4 +-
 gso/workflows/edge_port/validate_edge_port.py |   4 +-
 test/conftest.py                              |   2 +-
 test/fixtures/__init__.py                     |  19 ++
 test/fixtures/common_fixtures.py              |  81 +++++++++
 test/fixtures/edge_port_fixtures.py           |  89 ++++++++++
 test/fixtures/iptrunk_fixtures.py             | 126 +++++++++++++
 test/fixtures/office_router_fixtures.py       |  74 ++++++++
 test/fixtures/opengear_fixtures.py            |  70 ++++++++
 test/fixtures/router_fixtures.py              | 148 +++++++++++++++
 test/fixtures/site_fixtures.py                |  79 ++++++++
 test/fixtures/super_pop_switch_fixtures.py    |  75 ++++++++
 test/services/conftest.py                     |  12 ++
 test/workflows/edge_port/__init__.py          |   0
 .../edge_port/test_create_edge_port.py        | 124 +++++++++++++
 .../edge_port/test_modify_edge_port.py        | 168 ++++++++++++++++++
 .../edge_port/test_terminate_edge_port.py     |  56 ++++++
 .../edge_port/test_validate_edge_port.py      |  62 +++++++
 test/workflows/tasks/test_delete_partners.py  |   2 +-
 21 files changed, 1193 insertions(+), 7 deletions(-)
 create mode 100644 test/fixtures/__init__.py
 create mode 100644 test/fixtures/common_fixtures.py
 create mode 100644 test/fixtures/edge_port_fixtures.py
 create mode 100644 test/fixtures/iptrunk_fixtures.py
 create mode 100644 test/fixtures/office_router_fixtures.py
 create mode 100644 test/fixtures/opengear_fixtures.py
 create mode 100644 test/fixtures/router_fixtures.py
 create mode 100644 test/fixtures/site_fixtures.py
 create mode 100644 test/fixtures/super_pop_switch_fixtures.py
 create mode 100644 test/workflows/edge_port/__init__.py
 create mode 100644 test/workflows/edge_port/test_create_edge_port.py
 create mode 100644 test/workflows/edge_port/test_modify_edge_port.py
 create mode 100644 test/workflows/edge_port/test_terminate_edge_port.py
 create mode 100644 test/workflows/edge_port/test_validate_edge_port.py

diff --git a/gso/services/netbox_client.py b/gso/services/netbox_client.py
index 2566f603..63ce9da8 100644
--- a/gso/services/netbox_client.py
+++ b/gso/services/netbox_client.py
@@ -13,6 +13,7 @@ from gso.settings import load_oss_params
 from gso.utils.device_info import (
     DEFAULT_SITE,
     FEASIBLE_IP_TRUNK_LAG_RANGE,
+    FEASIBLE_SERVICES_LAG_RANGE,
     ROUTER_ROLE,
     TierInfo,
 )
@@ -311,7 +312,7 @@ class NetboxClient:
 
     def get_available_services_lags(self, router_id: UUID) -> list[str]:
         """Return all available Edge port LAGs not assigned to a device."""
-        return self.get_available_lags_in_range(router_id, range(20, 51))
+        return self.get_available_lags_in_range(router_id, FEASIBLE_SERVICES_LAG_RANGE)
 
     @staticmethod
     def calculate_speed_bits_per_sec(speed: str) -> int:
diff --git a/gso/utils/helpers.py b/gso/utils/helpers.py
index 4e4c7646..47fbec26 100644
--- a/gso/utils/helpers.py
+++ b/gso/utils/helpers.py
@@ -215,5 +215,5 @@ def validate_edge_port_number_of_members_based_on_lacp(*, number_of_members: int
     :raises ValueError: If the number of members is greater than 1 and LACP is disabled.
     """
     if number_of_members > 1 and not enable_lacp:
-        err_msg = "Number of members must be 1 if LACP is disabled"
+        err_msg = "Number of members must be 1 if LACP is disabled."
         raise ValueError(err_msg)
diff --git a/gso/workflows/edge_port/create_edge_port.py b/gso/workflows/edge_port/create_edge_port.py
index ce87d1f5..312461a0 100644
--- a/gso/workflows/edge_port/create_edge_port.py
+++ b/gso/workflows/edge_port/create_edge_port.py
@@ -24,6 +24,7 @@ from gso.products.product_types.router import Router
 from gso.services import lso_client, subscriptions
 from gso.services.lso_client import lso_interaction
 from gso.services.netbox_client import NetboxClient
+from gso.services.partners import get_partner_by_id
 from gso.utils.helpers import (
     available_interfaces_choices,
     available_service_lags_choices,
@@ -131,7 +132,8 @@ def initialize_subscription(
     subscription.edge_port.edge_port_ignore_if_down = ignore_if_down
     subscription.edge_port.edge_port_geant_ga_id = geant_ga_id
     subscription.edge_port.edge_port_mac_address = mac_address
-    subscription.description = f"Edge Port {name} on {router.router_fqdn}, {partner}, {geant_ga_id or ""}"
+    partner_name = get_partner_by_id(partner).name
+    subscription.description = f"Edge Port {name} on {router.router_fqdn}, {partner_name}, {geant_ga_id or ""}"
     subscription.edge_port.edge_port_description = description
     for member in ae_members:
         subscription.edge_port.edge_port_ae_members.append(
diff --git a/gso/workflows/edge_port/validate_edge_port.py b/gso/workflows/edge_port/validate_edge_port.py
index 60611acc..81714721 100644
--- a/gso/workflows/edge_port/validate_edge_port.py
+++ b/gso/workflows/edge_port/validate_edge_port.py
@@ -10,7 +10,7 @@ from orchestrator.workflows.steps import resync, store_process_subscription
 from orchestrator.workflows.utils import wrap_modify_initial_input_form
 
 from gso.products.product_types.edge_port import EdgePort
-from gso.services.lso_client import execute_playbook
+from gso.services.lso_client import anonymous_lso_interaction, execute_playbook
 from gso.services.netbox_client import NetboxClient
 
 
@@ -85,7 +85,7 @@ def validate_edge_port() -> StepList:
         >> store_process_subscription(Target.SYSTEM)
         >> prepare_state
         >> verify_netbox_entries
-        >> verify_base_config
+        >> anonymous_lso_interaction(verify_base_config)
         >> resync
         >> done
     )
diff --git a/test/conftest.py b/test/conftest.py
index ed4cb631..5b1e91f1 100644
--- a/test/conftest.py
+++ b/test/conftest.py
@@ -36,6 +36,7 @@ from gso.main import init_gso_app
 from gso.services.partners import PartnerSchema, create_partner
 from gso.utils.types.interfaces import LAGMember, LAGMemberList
 from test.fixtures import (  # noqa: F401
+    edge_port_subscription_factory,
     iptrunk_side_subscription_factory,
     iptrunk_subscription_factory,
     office_router_subscription_factory,
@@ -43,7 +44,6 @@ from test.fixtures import (  # noqa: F401
     router_subscription_factory,
     site_subscription_factory,
     super_pop_switch_subscription_factory,
-    test_workflow,
 )
 
 logging.getLogger("faker.factory").setLevel(logging.WARNING)
diff --git a/test/fixtures/__init__.py b/test/fixtures/__init__.py
new file mode 100644
index 00000000..2269a6b8
--- /dev/null
+++ b/test/fixtures/__init__.py
@@ -0,0 +1,19 @@
+from test.fixtures.edge_port_fixtures import edge_port_subscription_factory
+from test.fixtures.iptrunk_fixtures import iptrunk_side_subscription_factory, iptrunk_subscription_factory
+from test.fixtures.office_router_fixtures import office_router_subscription_factory
+from test.fixtures.opengear_fixtures import opengear_subscription_factory
+from test.fixtures.router_fixtures import juniper_router_subscription_factory, nokia_router_subscription_factory
+from test.fixtures.site_fixtures import site_subscription_factory
+from test.fixtures.super_pop_switch_fixtures import super_pop_switch_subscription_factory
+
+__all__ = [
+    "edge_port_subscription_factory",
+    "iptrunk_side_subscription_factory",
+    "iptrunk_subscription_factory",
+    "juniper_router_subscription_factory",
+    "nokia_router_subscription_factory",
+    "office_router_subscription_factory",
+    "opengear_subscription_factory",
+    "site_subscription_factory",
+    "super_pop_switch_subscription_factory",
+]
diff --git a/test/fixtures/common_fixtures.py b/test/fixtures/common_fixtures.py
new file mode 100644
index 00000000..aeac5a99
--- /dev/null
+++ b/test/fixtures/common_fixtures.py
@@ -0,0 +1,81 @@
+from typing import Any
+
+from orchestrator.db import (
+    ProductTable,
+    SubscriptionInstanceTable,
+    SubscriptionInstanceValueTable,
+    SubscriptionTable,
+    db,
+)
+from orchestrator.utils.datetime import nowtz
+from pydantic_forms.types import SubscriptionMapping
+
+
+def create_subscription_for_mapping(
+    product: ProductTable, mapping: SubscriptionMapping, values: dict[str, Any], **kwargs: Any
+) -> SubscriptionTable:
+    """Create a subscription in the test coredb for the given subscription_mapping and values.
+
+    This function handles optional resource types starting with a ? in the mapping not supplied in the values array.
+
+    Args:
+        product: the ProductTable to create a sub for
+        mapping: the subscription_mapping belonging to that product
+        values: a dictionary of keys from the sub_map and their corresponding test values
+        kwargs: The rest of the arguments
+
+    Returns: The conforming subscription.
+    """
+
+    def build_instance(name, value_mapping):
+        block = product.find_block_by_name(name)
+
+        def build_value(rt, value):
+            resource_type = block.find_resource_type_by_name(rt)
+            return SubscriptionInstanceValueTable(resource_type_id=resource_type.resource_type_id, value=value)
+
+        return SubscriptionInstanceTable(
+            product_block_id=block.product_block_id,
+            values=[
+                build_value(resource_type, values[value_key]) for (resource_type, value_key) in value_mapping.items()
+            ],
+        )
+
+    # recreate the mapping: leave out the ?keys if no value supplied for them
+    mapping = {
+        name: [
+            {
+                **{k: value_map[k] for k in value_map if not value_map[k].startswith("?")},
+                **{
+                    k: value_map[k][1:]
+                    for k in value_map
+                    if value_map[k].startswith("?") and value_map[k][1:] in values
+                },
+            }
+            for value_map in mapping[name]
+        ]
+        for name in mapping
+    }
+
+    instances = [
+        build_instance(name, value_mapping)
+        for (name, value_mappings) in mapping.items()
+        for value_mapping in value_mappings
+    ]
+
+    return create_subscription(instances=instances, product=product, **kwargs)
+
+
+def create_subscription(**kwargs):
+    attrs = {
+        "description": "A subscription.",
+        "customer_id": kwargs.get("customer_id", "85938c4c-0a11-e511-80d0-005056956c1a"),
+        "start_date": nowtz(),
+        "status": "active",
+        "insync": True,
+        **kwargs,
+    }
+    o = SubscriptionTable(**attrs)
+    db.session.add(o)
+    db.session.commit()
+    return o
diff --git a/test/fixtures/edge_port_fixtures.py b/test/fixtures/edge_port_fixtures.py
new file mode 100644
index 00000000..730da5b4
--- /dev/null
+++ b/test/fixtures/edge_port_fixtures.py
@@ -0,0 +1,89 @@
+import pytest
+from orchestrator.db import (
+    db,
+)
+from orchestrator.domain import SubscriptionModel
+from orchestrator.types import SubscriptionLifecycle, UUIDstr
+
+from gso.products import ProductName, Router
+from gso.products.product_blocks.edge_port import (
+    EdgePortAEMemberBlock,
+    EdgePortType,
+    EncapsulationType,
+)
+from gso.products.product_types.edge_port import EdgePortInactive
+from gso.services import subscriptions
+from gso.utils.types.interfaces import PhysicalPortCapacity
+
+
+@pytest.fixture()
+def edge_port_subscription_factory(faker, partner_factory, nokia_router_subscription_factory):
+    def subscription_create(
+        description=None,
+        partner: dict | None = None,
+        start_date="2023-05-24T00:00:00+00:00",
+        node=None,
+        name=None,
+        edge_port_description=None,
+        encapsulation=EncapsulationType.DOT1Q,
+        mac_address=None,
+        member_speed=PhysicalPortCapacity.HUNDRED_GIGABIT_PER_SECOND,
+        minimum_links=None,
+        edge_port_type=EdgePortType.PUBLIC,
+        geant_ga_id=None,
+        edge_port_ae_members=None,
+        edge_port_sbp_list=None,
+        status: SubscriptionLifecycle | None = None,
+        *,
+        enable_lacp=True,
+        ignore_if_down=False,
+    ) -> UUIDstr:
+        partner = partner or partner_factory(name=faker.company(), email=faker.email())
+        edge_port_node = Router.from_subscription(nokia_router_subscription_factory()).router
+        product_id = subscriptions.get_product_id_by_name(ProductName.EDGE_PORT)
+        edge_port_subscription = EdgePortInactive.from_product_id(
+            product_id, customer_id=partner["partner_id"], insync=True
+        )
+
+        edge_port_subscription.edge_port.edge_port_description = description or faker.text(max_nb_chars=30)
+        edge_port_subscription.edge_port.edge_port_geant_ga_id = geant_ga_id or faker.geant_sid()
+        edge_port_subscription.edge_port.edge_port_node = node or edge_port_node
+        edge_port_subscription.edge_port.edge_port_name = name or f"lag-{faker.pyint(21, 50)}"
+        edge_port_subscription.edge_port.edge_port_description = edge_port_description or faker.sentence()
+        edge_port_subscription.edge_port.edge_port_enable_lacp = enable_lacp
+        edge_port_subscription.edge_port.edge_port_encapsulation = encapsulation
+        edge_port_subscription.edge_port.edge_port_mac_address = mac_address or faker.mac_address()
+        edge_port_subscription.edge_port.edge_port_member_speed = member_speed
+        edge_port_subscription.edge_port.edge_port_minimum_links = minimum_links or faker.pyint(1, 2)
+        edge_port_subscription.edge_port.edge_port_type = edge_port_type
+        edge_port_subscription.edge_port.edge_port_ignore_if_down = ignore_if_down
+        edge_port_subscription.edge_port.edge_port_geant_ga_id = geant_ga_id
+        edge_port_subscription.edge_port.edge_port_ae_members = edge_port_ae_members or [
+            EdgePortAEMemberBlock.new(
+                faker.uuid4(),
+                interface_name="Interface2",
+                interface_description=faker.sentence(),
+            ),
+            EdgePortAEMemberBlock.new(
+                faker.uuid4(),
+                interface_name="Interface3",
+                interface_description=faker.sentence(),
+            ),
+        ]
+        edge_port_subscription.edge_port.edge_port_sbp_list = edge_port_sbp_list or []
+        edge_port_subscription = SubscriptionModel.from_other_lifecycle(
+            edge_port_subscription,
+            SubscriptionLifecycle.ACTIVE,
+        )
+
+        if status:
+            edge_port_subscription.status = status
+
+        edge_port_subscription.description = description or faker.text(max_nb_chars=30)
+        edge_port_subscription.start_date = start_date
+        edge_port_subscription.save()
+        db.session.commit()
+
+        return str(edge_port_subscription.subscription_id)
+
+    return subscription_create
diff --git a/test/fixtures/iptrunk_fixtures.py b/test/fixtures/iptrunk_fixtures.py
new file mode 100644
index 00000000..f2251fc7
--- /dev/null
+++ b/test/fixtures/iptrunk_fixtures.py
@@ -0,0 +1,126 @@
+import pytest
+from orchestrator.db import (
+    db,
+)
+from orchestrator.domain import SubscriptionModel
+from orchestrator.types import SubscriptionLifecycle, UUIDstr
+
+from gso.products import ProductName
+from gso.products.product_blocks.iptrunk import (
+    IptrunkInterfaceBlock,
+    IptrunkSideBlock,
+    IptrunkType,
+)
+from gso.products.product_types.iptrunk import ImportedIptrunkInactive, IptrunkInactive
+from gso.products.product_types.router import Router
+from gso.services import subscriptions
+from gso.utils.types.interfaces import PhysicalPortCapacity
+
+
+@pytest.fixture()
+def iptrunk_side_subscription_factory(nokia_router_subscription_factory, faker):
+    def subscription_create(
+        iptrunk_side_node=None,
+        iptrunk_side_ae_iface=None,
+        iptrunk_side_ae_geant_a_sid=None,
+        iptrunk_side_ae_members=None,
+        iptrunk_side_ae_members_description=None,
+    ) -> IptrunkSideBlock:
+        iptrunk_side_node_id = iptrunk_side_node or nokia_router_subscription_factory()
+        iptrunk_side_node = Router.from_subscription(iptrunk_side_node_id).router
+        iptrunk_side_ae_iface = iptrunk_side_ae_iface or faker.pystr()
+        iptrunk_side_ae_geant_a_sid = iptrunk_side_ae_geant_a_sid or faker.geant_sid()
+        iptrunk_side_ae_members = iptrunk_side_ae_members or [
+            IptrunkInterfaceBlock.new(
+                faker.uuid4(),
+                interface_name=faker.network_interface(),
+                interface_description=faker.sentence(),
+            ),
+            IptrunkInterfaceBlock.new(
+                faker.uuid4(),
+                interface_name=faker.network_interface(),
+                interface_description=faker.sentence(),
+            ),
+        ]
+
+        return IptrunkSideBlock.new(
+            faker.uuid4(),
+            iptrunk_side_node=iptrunk_side_node,
+            iptrunk_side_ae_iface=iptrunk_side_ae_iface,
+            iptrunk_side_ae_geant_a_sid=iptrunk_side_ae_geant_a_sid,
+            iptrunk_side_ae_members=iptrunk_side_ae_members,
+            iptrunk_side_ae_members_description=iptrunk_side_ae_members_description,
+        )
+
+    return subscription_create
+
+
+@pytest.fixture()
+def iptrunk_subscription_factory(iptrunk_side_subscription_factory, faker, geant_partner):
+    def subscription_create(
+        description=None,
+        start_date="2023-05-24T00:00:00+00:00",
+        geant_s_sid=None,
+        iptrunk_description=None,
+        iptrunk_type=IptrunkType.LEASED,
+        iptrunk_speed=PhysicalPortCapacity.ONE_GIGABIT_PER_SECOND,
+        iptrunk_isis_metric=None,
+        iptrunk_ipv4_network=None,
+        iptrunk_ipv6_network=None,
+        iptrunk_sides=None,
+        status: SubscriptionLifecycle | None = None,
+        partner: dict | None = None,
+        *,
+        is_imported: bool | None = True,
+    ) -> UUIDstr:
+        if partner is None:
+            partner = geant_partner
+
+        if is_imported:
+            product_id = subscriptions.get_product_id_by_name(ProductName.IP_TRUNK)
+            iptrunk_subscription = IptrunkInactive.from_product_id(
+                product_id, customer_id=partner["partner_id"], insync=True
+            )
+        else:
+            product_id = subscriptions.get_product_id_by_name(ProductName.IMPORTED_IP_TRUNK)
+            iptrunk_subscription = ImportedIptrunkInactive.from_product_id(
+                product_id, customer_id=partner["partner_id"], insync=True
+            )
+
+        description = description or faker.sentence()
+        geant_s_sid = geant_s_sid or faker.geant_sid()
+        iptrunk_description = iptrunk_description or faker.sentence()
+        iptrunk_isis_metric = iptrunk_isis_metric or faker.pyint()
+        iptrunk_ipv4_network = iptrunk_ipv4_network or faker.ipv4_network(max_subnet=31)
+        iptrunk_ipv6_network = iptrunk_ipv6_network or faker.ipv6_network(max_subnet=126)
+        iptrunk_minimum_links = 1
+        iptrunk_side_a = iptrunk_side_subscription_factory()
+        iptrunk_side_b = iptrunk_side_subscription_factory()
+        iptrunk_sides = iptrunk_sides or [iptrunk_side_a, iptrunk_side_b]
+
+        iptrunk_subscription.iptrunk.geant_s_sid = geant_s_sid
+        iptrunk_subscription.iptrunk.iptrunk_description = iptrunk_description
+        iptrunk_subscription.iptrunk.iptrunk_type = iptrunk_type
+        iptrunk_subscription.iptrunk.iptrunk_speed = iptrunk_speed
+        iptrunk_subscription.iptrunk.iptrunk_minimum_links = iptrunk_minimum_links
+        iptrunk_subscription.iptrunk.iptrunk_isis_metric = iptrunk_isis_metric
+        iptrunk_subscription.iptrunk.iptrunk_ipv4_network = iptrunk_ipv4_network
+        iptrunk_subscription.iptrunk.iptrunk_ipv6_network = iptrunk_ipv6_network
+        iptrunk_subscription.iptrunk.iptrunk_sides = iptrunk_sides
+
+        iptrunk_subscription = SubscriptionModel.from_other_lifecycle(
+            iptrunk_subscription,
+            SubscriptionLifecycle.ACTIVE,
+        )
+
+        if status:
+            iptrunk_subscription.status = status
+
+        iptrunk_subscription.description = description
+        iptrunk_subscription.start_date = start_date
+        iptrunk_subscription.save()
+        db.session.commit()
+
+        return str(iptrunk_subscription.subscription_id)
+
+    return subscription_create
diff --git a/test/fixtures/office_router_fixtures.py b/test/fixtures/office_router_fixtures.py
new file mode 100644
index 00000000..2f43fa6b
--- /dev/null
+++ b/test/fixtures/office_router_fixtures.py
@@ -0,0 +1,74 @@
+import ipaddress
+
+import pytest
+from orchestrator.db import (
+    db,
+)
+from orchestrator.domain import SubscriptionModel
+from orchestrator.types import SubscriptionLifecycle, UUIDstr
+
+from gso.products import ProductName
+from gso.products.product_types.office_router import ImportedOfficeRouterInactive, OfficeRouterInactive
+from gso.products.product_types.site import Site
+from gso.services import subscriptions
+from gso.utils.shared_enums import Vendor
+
+
+@pytest.fixture()
+def office_router_subscription_factory(site_subscription_factory, faker, geant_partner):
+    def subscription_create(
+        description=None,
+        start_date="2023-05-24T00:00:00+00:00",
+        office_router_fqdn=None,
+        office_router_ts_port=None,
+        office_router_lo_ipv4_address=None,
+        office_router_lo_ipv6_address=None,
+        office_router_site=None,
+        status: SubscriptionLifecycle | None = None,
+        partner: dict | None = None,
+        *,
+        is_imported: bool | None = True,
+    ) -> UUIDstr:
+        if partner is None:
+            partner = geant_partner
+
+        description = description or faker.text(max_nb_chars=30)
+        office_router_fqdn = office_router_fqdn or faker.domain_name(levels=4)
+        office_router_ts_port = office_router_ts_port or faker.random_int(min=1, max=49151)
+        office_router_lo_ipv4_address = office_router_lo_ipv4_address or ipaddress.IPv4Address(faker.ipv4())
+        office_router_lo_ipv6_address = office_router_lo_ipv6_address or ipaddress.IPv6Address(faker.ipv6())
+        office_router_site = office_router_site or site_subscription_factory()
+
+        if is_imported:
+            product_id = subscriptions.get_product_id_by_name(ProductName.OFFICE_ROUTER)
+            office_router_subscription = OfficeRouterInactive.from_product_id(
+                product_id, customer_id=partner["partner_id"], insync=True
+            )
+        else:
+            product_id = subscriptions.get_product_id_by_name(ProductName.IMPORTED_OFFICE_ROUTER)
+            office_router_subscription = ImportedOfficeRouterInactive.from_product_id(
+                product_id, customer_id=partner["partner_id"], insync=True
+            )
+
+        office_router_subscription.office_router.office_router_fqdn = office_router_fqdn
+        office_router_subscription.office_router.office_router_ts_port = office_router_ts_port
+        office_router_subscription.office_router.office_router_lo_ipv4_address = office_router_lo_ipv4_address
+        office_router_subscription.office_router.office_router_lo_ipv6_address = office_router_lo_ipv6_address
+        office_router_subscription.office_router.office_router_site = Site.from_subscription(office_router_site).site
+        office_router_subscription.office_router.vendor = Vendor.NOKIA
+
+        office_router_subscription = SubscriptionModel.from_other_lifecycle(
+            office_router_subscription, SubscriptionLifecycle.ACTIVE
+        )
+        office_router_subscription.description = description
+        office_router_subscription.start_date = start_date
+
+        if status:
+            office_router_subscription.status = status
+
+        office_router_subscription.save()
+        db.session.commit()
+
+        return str(office_router_subscription.subscription_id)
+
+    return subscription_create
diff --git a/test/fixtures/opengear_fixtures.py b/test/fixtures/opengear_fixtures.py
new file mode 100644
index 00000000..d3cd83da
--- /dev/null
+++ b/test/fixtures/opengear_fixtures.py
@@ -0,0 +1,70 @@
+import pytest
+from orchestrator.db import (
+    db,
+)
+from orchestrator.domain import SubscriptionModel
+from orchestrator.types import SubscriptionLifecycle, UUIDstr
+
+from gso.products import ProductName
+from gso.products.product_types.opengear import ImportedOpengearInactive, OpengearInactive
+from gso.products.product_types.site import Site
+from gso.services import subscriptions
+
+
+@pytest.fixture()
+def opengear_subscription_factory(site_subscription_factory, faker, geant_partner):
+    def subscription_create(
+        description=None,
+        start_date="2023-05-24T00:00:00+00:00",
+        opengear_site=None,
+        opengear_hostname=None,
+        opengear_wan_address=None,
+        opengear_wan_netmask=None,
+        opengear_wan_gateway=None,
+        status: SubscriptionLifecycle | None = None,
+        partner: dict | None = None,
+        *,
+        is_imported: bool | None = True,
+    ) -> UUIDstr:
+        if partner is None:
+            partner = geant_partner
+
+        description = description or faker.text(max_nb_chars=30)
+        opengear_site = opengear_site or site_subscription_factory()
+        opengear_hostname = opengear_hostname or faker.domain_name(levels=4)
+        opengear_wan_address = opengear_wan_address or faker.ipv4()
+        opengear_wan_netmask = opengear_wan_netmask or faker.ipv4()
+        opengear_wan_gateway = opengear_wan_gateway or faker.ipv4()
+
+        if is_imported:
+            product_id = subscriptions.get_product_id_by_name(ProductName.OPENGEAR)
+            opengear_subscription = OpengearInactive.from_product_id(
+                product_id, customer_id=partner["partner_id"], insync=True
+            )
+        else:
+            product_id = subscriptions.get_product_id_by_name(ProductName.IMPORTED_OPENGEAR)
+            opengear_subscription = ImportedOpengearInactive.from_product_id(
+                product_id, customer_id=partner["partner_id"], insync=True
+            )
+
+        opengear_subscription.opengear.opengear_site = Site.from_subscription(opengear_site).site
+        opengear_subscription.opengear.opengear_hostname = opengear_hostname
+        opengear_subscription.opengear.opengear_wan_address = opengear_wan_address
+        opengear_subscription.opengear.opengear_wan_netmask = opengear_wan_netmask
+        opengear_subscription.opengear.opengear_wan_gateway = opengear_wan_gateway
+
+        opengear_subscription = SubscriptionModel.from_other_lifecycle(
+            opengear_subscription, SubscriptionLifecycle.ACTIVE
+        )
+        opengear_subscription.description = description
+        opengear_subscription.start_date = start_date
+
+        if status:
+            opengear_subscription.status = status
+
+        opengear_subscription.save()
+        db.session.commit()
+
+        return str(opengear_subscription.subscription_id)
+
+    return subscription_create
diff --git a/test/fixtures/router_fixtures.py b/test/fixtures/router_fixtures.py
new file mode 100644
index 00000000..d08dde35
--- /dev/null
+++ b/test/fixtures/router_fixtures.py
@@ -0,0 +1,148 @@
+import ipaddress
+
+import pytest
+from orchestrator.db import (
+    db,
+)
+from orchestrator.domain import SubscriptionModel
+from orchestrator.types import SubscriptionLifecycle, UUIDstr
+
+from gso.products import ProductName
+from gso.products.product_blocks.router import RouterRole
+from gso.products.product_types.router import ImportedRouterInactive, RouterInactive
+from gso.products.product_types.site import Site
+from gso.services import subscriptions
+from gso.utils.shared_enums import Vendor
+
+
+@pytest.fixture()
+def nokia_router_subscription_factory(site_subscription_factory, faker, geant_partner):
+    def subscription_create(
+        description=None,
+        start_date="2023-05-24T00:00:00+00:00",
+        router_fqdn=None,
+        router_ts_port=None,
+        router_access_via_ts=None,
+        router_lo_ipv4_address=None,
+        router_lo_ipv6_address=None,
+        router_lo_iso_address=None,
+        router_role=RouterRole.PE,
+        router_site=None,
+        status: SubscriptionLifecycle | None = None,
+        partner: dict | None = None,
+        *,
+        is_imported: bool | None = True,
+    ) -> UUIDstr:
+        if partner is None:
+            partner = geant_partner
+
+        description = description or faker.text(max_nb_chars=30)
+        router_fqdn = router_fqdn or faker.domain_name(levels=4)
+        router_ts_port = router_ts_port or faker.random_int(min=1, max=49151)
+        router_access_via_ts = router_access_via_ts or faker.boolean()
+        router_lo_ipv4_address = router_lo_ipv4_address or ipaddress.IPv4Address(faker.ipv4())
+        router_lo_ipv6_address = router_lo_ipv6_address or ipaddress.IPv6Address(faker.ipv6())
+        router_lo_iso_address = router_lo_iso_address or faker.word()
+        router_site = router_site or site_subscription_factory()
+
+        if is_imported:
+            product_id = subscriptions.get_product_id_by_name(ProductName.ROUTER)
+            router_subscription = RouterInactive.from_product_id(
+                product_id, customer_id=partner["partner_id"], insync=True
+            )
+        else:
+            product_id = subscriptions.get_product_id_by_name(ProductName.IMPORTED_ROUTER)
+            router_subscription = ImportedRouterInactive.from_product_id(
+                product_id, customer_id=partner["partner_id"], insync=True
+            )
+
+        router_subscription.router.router_fqdn = router_fqdn
+        router_subscription.router.router_ts_port = router_ts_port
+        router_subscription.router.router_access_via_ts = router_access_via_ts
+        router_subscription.router.router_lo_ipv4_address = router_lo_ipv4_address
+        router_subscription.router.router_lo_ipv6_address = router_lo_ipv6_address
+        router_subscription.router.router_lo_iso_address = router_lo_iso_address
+        router_subscription.router.router_role = router_role
+        router_subscription.router.router_site = Site.from_subscription(router_site).site
+        router_subscription.router.vendor = Vendor.NOKIA
+
+        router_subscription = SubscriptionModel.from_other_lifecycle(router_subscription, SubscriptionLifecycle.ACTIVE)
+        router_subscription.insync = True
+        router_subscription.description = description
+        router_subscription.start_date = start_date
+
+        if status:
+            router_subscription.status = status
+
+        router_subscription.save()
+        db.session.commit()
+
+        return str(router_subscription.subscription_id)
+
+    return subscription_create
+
+
+@pytest.fixture()
+def juniper_router_subscription_factory(site_subscription_factory, faker, geant_partner):
+    def subscription_create(
+        description=None,
+        start_date="2023-05-24T00:00:00+00:00",
+        router_fqdn=None,
+        router_ts_port=None,
+        router_access_via_ts=None,
+        router_lo_ipv4_address=None,
+        router_lo_ipv6_address=None,
+        router_lo_iso_address=None,
+        router_role=RouterRole.PE,
+        router_site=None,
+        status: SubscriptionLifecycle | None = None,
+        partner: dict | None = None,
+        *,
+        is_imported: bool | None = True,
+    ) -> UUIDstr:
+        if partner is None:
+            partner = geant_partner
+
+        description = description or faker.text(max_nb_chars=30)
+        router_fqdn = router_fqdn or faker.domain_name(levels=4)
+        router_ts_port = router_ts_port or faker.random_int(min=1, max=49151)
+        router_access_via_ts = router_access_via_ts or faker.boolean()
+        router_lo_ipv4_address = router_lo_ipv4_address or ipaddress.IPv4Address(faker.ipv4())
+        router_lo_ipv6_address = router_lo_ipv6_address or ipaddress.IPv6Address(faker.ipv6())
+        router_lo_iso_address = router_lo_iso_address or faker.word()
+        router_site = router_site or site_subscription_factory()
+
+        if is_imported:
+            product_id = subscriptions.get_product_id_by_name(ProductName.ROUTER)
+            router_subscription = RouterInactive.from_product_id(
+                product_id, customer_id=partner["partner_id"], insync=True
+            )
+        else:
+            product_id = subscriptions.get_product_id_by_name(ProductName.IMPORTED_ROUTER)
+            router_subscription = ImportedRouterInactive.from_product_id(
+                product_id, customer_id=partner["partner_id"], insync=True
+            )
+
+        router_subscription.router.router_fqdn = router_fqdn
+        router_subscription.router.router_ts_port = router_ts_port
+        router_subscription.router.router_access_via_ts = router_access_via_ts
+        router_subscription.router.router_lo_ipv4_address = router_lo_ipv4_address
+        router_subscription.router.router_lo_ipv6_address = router_lo_ipv6_address
+        router_subscription.router.router_lo_iso_address = router_lo_iso_address
+        router_subscription.router.router_role = router_role
+        router_subscription.router.router_site = Site.from_subscription(router_site).site
+        router_subscription.router.vendor = Vendor.JUNIPER
+
+        router_subscription = SubscriptionModel.from_other_lifecycle(router_subscription, SubscriptionLifecycle.ACTIVE)
+        router_subscription.description = description
+        router_subscription.start_date = start_date
+
+        if status:
+            router_subscription.status = status
+
+        router_subscription.save()
+        db.session.commit()
+
+        return str(router_subscription.subscription_id)
+
+    return subscription_create
diff --git a/test/fixtures/site_fixtures.py b/test/fixtures/site_fixtures.py
new file mode 100644
index 00000000..b9155a29
--- /dev/null
+++ b/test/fixtures/site_fixtures.py
@@ -0,0 +1,79 @@
+import pytest
+from orchestrator.db import (
+    db,
+)
+from orchestrator.domain import SubscriptionModel
+from orchestrator.types import SubscriptionLifecycle, UUIDstr
+
+from gso.products import ProductName
+from gso.products.product_blocks.site import SiteTier
+from gso.products.product_types.site import ImportedSiteInactive, SiteInactive
+from gso.services import subscriptions
+
+
+@pytest.fixture()
+def site_subscription_factory(faker, geant_partner):
+    def subscription_create(
+        description=None,
+        start_date="2023-05-24T00:00:00+00:00",
+        site_name=None,
+        site_city=None,
+        site_country=None,
+        site_country_code=None,
+        site_latitude=None,
+        site_longitude=None,
+        site_bgp_community_id=None,
+        site_internal_id=None,
+        site_tier=SiteTier.TIER1,
+        site_ts_address=None,
+        status: SubscriptionLifecycle | None = None,
+        partner: dict | None = None,
+        *,
+        is_imported: bool | None = True,
+    ) -> UUIDstr:
+        if partner is None:
+            partner = geant_partner
+
+        description = description or "Site Subscription"
+        site_name = site_name or faker.site_name()
+        site_city = site_city or faker.city()
+        site_country = site_country or faker.country()
+        site_country_code = site_country_code or faker.country_code()
+        site_latitude = site_latitude or str(faker.latitude())
+        site_longitude = site_longitude or str(faker.longitude())
+        site_bgp_community_id = site_bgp_community_id or faker.pyint()
+        site_internal_id = site_internal_id or faker.pyint()
+        site_ts_address = site_ts_address or faker.ipv4()
+
+        if is_imported:
+            product_id = subscriptions.get_product_id_by_name(ProductName.SITE)
+            site_subscription = SiteInactive.from_product_id(product_id, customer_id=partner["partner_id"], insync=True)
+        else:
+            product_id = subscriptions.get_product_id_by_name(ProductName.IMPORTED_SITE)
+            site_subscription = ImportedSiteInactive.from_product_id(
+                product_id, customer_id=partner["partner_id"], insync=True
+            )
+
+        site_subscription.site.site_city = site_city
+        site_subscription.site.site_name = site_name
+        site_subscription.site.site_country = site_country
+        site_subscription.site.site_country_code = site_country_code
+        site_subscription.site.site_latitude = site_latitude
+        site_subscription.site.site_longitude = site_longitude
+        site_subscription.site.site_bgp_community_id = site_bgp_community_id
+        site_subscription.site.site_internal_id = site_internal_id
+        site_subscription.site.site_tier = site_tier
+        site_subscription.site.site_ts_address = site_ts_address
+
+        site_subscription = SubscriptionModel.from_other_lifecycle(site_subscription, SubscriptionLifecycle.ACTIVE)
+        site_subscription.description = description
+        site_subscription.start_date = start_date
+        if status:
+            site_subscription.status = status
+
+        site_subscription.save()
+        db.session.commit()
+
+        return str(site_subscription.subscription_id)
+
+    return subscription_create
diff --git a/test/fixtures/super_pop_switch_fixtures.py b/test/fixtures/super_pop_switch_fixtures.py
new file mode 100644
index 00000000..33459570
--- /dev/null
+++ b/test/fixtures/super_pop_switch_fixtures.py
@@ -0,0 +1,75 @@
+import ipaddress
+
+import pytest
+from orchestrator.db import (
+    db,
+)
+from orchestrator.domain import SubscriptionModel
+from orchestrator.types import SubscriptionLifecycle, UUIDstr
+
+from gso.products import ProductName
+from gso.products.product_types.site import Site
+from gso.products.product_types.super_pop_switch import ImportedSuperPopSwitchInactive, SuperPopSwitchInactive
+from gso.services import subscriptions
+from gso.utils.shared_enums import Vendor
+
+
+@pytest.fixture()
+def super_pop_switch_subscription_factory(site_subscription_factory, faker, geant_partner):
+    def subscription_create(
+        description=None,
+        start_date="2023-05-24T00:00:00+00:00",
+        super_pop_switch_fqdn=None,
+        super_pop_switch_ts_port=None,
+        super_pop_switch_mgmt_ipv4_address=None,
+        super_pop_switch_site=None,
+        status: SubscriptionLifecycle | None = None,
+        partner: dict | None = None,
+        *,
+        is_imported: bool | None = True,
+    ) -> UUIDstr:
+        if partner is None:
+            partner = geant_partner
+
+        description = description or faker.text(max_nb_chars=30)
+        super_pop_switch_fqdn = super_pop_switch_fqdn or faker.domain_name(levels=4)
+        super_pop_switch_ts_port = super_pop_switch_ts_port or faker.random_int(min=1, max=49151)
+        super_pop_switch_mgmt_ipv4_address = super_pop_switch_mgmt_ipv4_address or ipaddress.IPv4Address(faker.ipv4())
+        super_pop_switch_site = super_pop_switch_site or site_subscription_factory()
+
+        if is_imported:
+            product_id = subscriptions.get_product_id_by_name(ProductName.SUPER_POP_SWITCH)
+            super_pop_switch_subscription = SuperPopSwitchInactive.from_product_id(
+                product_id, customer_id=partner["partner_id"], insync=True
+            )
+        else:
+            product_id = subscriptions.get_product_id_by_name(ProductName.IMPORTED_SUPER_POP_SWITCH)
+            super_pop_switch_subscription = ImportedSuperPopSwitchInactive.from_product_id(
+                product_id, customer_id=partner["partner_id"], insync=True
+            )
+
+        super_pop_switch_subscription.super_pop_switch.super_pop_switch_fqdn = super_pop_switch_fqdn
+        super_pop_switch_subscription.super_pop_switch.super_pop_switch_ts_port = super_pop_switch_ts_port
+        super_pop_switch_subscription.super_pop_switch.super_pop_switch_mgmt_ipv4_address = (
+            super_pop_switch_mgmt_ipv4_address
+        )
+        super_pop_switch_subscription.super_pop_switch.super_pop_switch_site = Site.from_subscription(
+            super_pop_switch_site
+        ).site
+        super_pop_switch_subscription.super_pop_switch.vendor = Vendor.NOKIA
+
+        super_pop_switch_subscription = SubscriptionModel.from_other_lifecycle(
+            super_pop_switch_subscription, SubscriptionLifecycle.ACTIVE
+        )
+        super_pop_switch_subscription.description = description
+        super_pop_switch_subscription.start_date = start_date
+
+        if status:
+            super_pop_switch_subscription.status = status
+
+        super_pop_switch_subscription.save()
+        db.session.commit()
+
+        return str(super_pop_switch_subscription.subscription_id)
+
+    return subscription_create
diff --git a/test/services/conftest.py b/test/services/conftest.py
index 9557b1c5..3467d545 100644
--- a/test/services/conftest.py
+++ b/test/services/conftest.py
@@ -12,10 +12,22 @@ class MockedNetboxClient:
     def get_device_by_name(self):
         return self.BaseMockObject(id=1, name="test")
 
+    @staticmethod
+    def get_interface_by_name_and_device(interface_name: str, device_name: str):
+        return {
+            "name": f"{interface_name}",
+            "module": {"display": f"Module{interface_name}"},
+            "description": f"Description{interface_name}-{device_name}",
+        }
+
     @staticmethod
     def get_available_lags() -> list[str]:
         return [f"lag-{lag}" for lag in range(1, 5)]
 
+    @staticmethod
+    def get_available_services_lags() -> list[str]:
+        return [f"lag-{lag}" for lag in range(21, 50)]
+
     @staticmethod
     def get_available_interfaces():
         interfaces = []
diff --git a/test/workflows/edge_port/__init__.py b/test/workflows/edge_port/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/test/workflows/edge_port/test_create_edge_port.py b/test/workflows/edge_port/test_create_edge_port.py
new file mode 100644
index 00000000..db855e8a
--- /dev/null
+++ b/test/workflows/edge_port/test_create_edge_port.py
@@ -0,0 +1,124 @@
+from os import PathLike
+from unittest.mock import patch
+
+import pytest
+from pydantic_forms.exceptions import FormValidationError
+
+from gso.products import EdgePort, ProductName, Router
+from gso.products.product_blocks.edge_port import EdgePortType, EncapsulationType
+from gso.services.subscriptions import get_product_id_by_name
+from gso.utils.types.interfaces import PhysicalPortCapacity
+from test.services.conftest import MockedNetboxClient
+from test.workflows import (
+    assert_complete,
+    assert_lso_interaction_success,
+    extract_state,
+    run_workflow,
+)
+
+
+@pytest.fixture()
+def _netbox_client_mock():
+    # Mock NetboxClient methods
+    with (
+        patch("gso.services.netbox_client.NetboxClient.get_device_by_name") as mock_get_device_by_name,
+        patch("gso.services.netbox_client.NetboxClient.get_available_interfaces") as mock_get_available_interfaces,
+        patch("gso.services.netbox_client.NetboxClient.get_available_services_lags") as mock_available_services_lags,
+        patch("gso.services.netbox_client.NetboxClient.create_interface") as mock_create_interface,
+        patch("gso.services.netbox_client.NetboxClient.attach_interface_to_lag") as mock_attach_interface_to_lag,
+        patch("gso.services.netbox_client.NetboxClient.reserve_interface") as mock_reserve_interface,
+        patch("gso.services.netbox_client.NetboxClient.allocate_interface") as mock_allocate_interface,
+    ):
+        mock_get_device_by_name.return_value = MockedNetboxClient().get_device_by_name()
+        mock_get_available_interfaces.return_value = MockedNetboxClient().get_available_interfaces()
+        mock_available_services_lags.return_value = MockedNetboxClient().get_available_services_lags()
+        mock_create_interface.return_value = MockedNetboxClient().create_interface()
+        mock_attach_interface_to_lag.return_value = MockedNetboxClient().attach_interface_to_lag()
+        mock_reserve_interface.return_value = MockedNetboxClient().reserve_interface()
+        mock_allocate_interface.return_value = MockedNetboxClient().allocate_interface()
+
+        yield
+
+
+@pytest.fixture()
+def input_form_wizard_data(request, nokia_router_subscription_factory, partner_factory, faker):
+    create_edge_port_step = {
+        "tt_number": faker.tt_number(),
+        "node": nokia_router_subscription_factory(),
+        "partner": partner_factory(name="GAAR", email=faker.email())["partner_id"],
+        "service_type": EdgePortType.PUBLIC,
+        "geant_ga_id": faker.geant_gid(),
+        "enable_lacp": True,
+        "speed": PhysicalPortCapacity.HUNDRED_GIGABIT_PER_SECOND,
+        "encapsulation": EncapsulationType.DOT1Q,
+        "number_of_members": 2,
+        "minimum_links": 1,
+    }
+    create_edge_port_interface_step = {
+        "name": "lag-21",
+        "description": faker.sentence(),
+        "ae_members": [
+            {
+                "interface_name": f"Interface{interface}",
+                "interface_description": faker.sentence(),
+            }
+            for interface in range(2)
+        ],
+    }
+
+    return [
+        create_edge_port_step,
+        create_edge_port_interface_step,
+    ]
+
+
+@pytest.mark.workflow()
+@patch("gso.workflows.edge_port.create_edge_port.lso_client.execute_playbook")
+def test_successful_edge_port_creation(
+    mock_execute_playbook,
+    responses,
+    input_form_wizard_data,
+    faker,
+    _netbox_client_mock,  # noqa: PT019
+    data_config_filename: PathLike,
+    test_client,
+):
+    product_id = get_product_id_by_name(ProductName.EDGE_PORT)
+    initial_data = [{"product": product_id}, *input_form_wizard_data]
+    result, process_stat, step_log = run_workflow("create_edge_port", initial_data)
+
+    for _ in range(2):
+        result, step_log = assert_lso_interaction_success(result, process_stat, step_log)
+
+    assert_complete(result)
+
+    state = extract_state(result)
+    subscription_id = state["subscription_id"]
+    subscription = EdgePort.from_subscription(subscription_id)
+
+    assert subscription.status == "active"
+    ga_id = input_form_wizard_data[0]["geant_ga_id"]
+    router_fqdn = Router.from_subscription(input_form_wizard_data[0]["node"]).router.router_fqdn
+    assert subscription.description == f"Edge Port lag-21 on {router_fqdn}, GAAR, {ga_id}"
+    assert len(subscription.edge_port.edge_port_ae_members) == 2
+    assert mock_execute_playbook.call_count == 2
+
+
+def test_edge_port_creation_with_invalid_input(
+    input_form_wizard_data,
+    faker,
+    _netbox_client_mock,  # noqa: PT019
+    data_config_filename: PathLike,
+    test_client,
+):
+    product_id = get_product_id_by_name(ProductName.EDGE_PORT)
+    # If the number of members is greater than 1 then LACP must be enabled.
+    input_form_wizard_data[0]["enable_lacp"] = False
+    initial_data = [{"product": product_id}, *input_form_wizard_data]
+
+    with pytest.raises(FormValidationError) as error:
+        run_workflow("create_edge_port", initial_data)
+
+    error = error.value.errors[0]
+    assert error["msg"] == "Number of members must be 1 if LACP is disabled."
+    assert error["loc"][0] == "__root__"
diff --git a/test/workflows/edge_port/test_modify_edge_port.py b/test/workflows/edge_port/test_modify_edge_port.py
new file mode 100644
index 00000000..18d12a05
--- /dev/null
+++ b/test/workflows/edge_port/test_modify_edge_port.py
@@ -0,0 +1,168 @@
+from unittest.mock import patch
+
+import pytest
+
+from gso.products import EdgePort
+from gso.utils.types.interfaces import PhysicalPortCapacity
+from test.workflows import (
+    assert_complete,
+    assert_lso_interaction_success,
+    extract_state,
+    run_workflow,
+)
+from test.workflows.iptrunk.test_create_iptrunk import MockedNetboxClient
+
+
+@pytest.fixture()
+def input_form_wizard_data(request, faker, edge_port_subscription_factory, partner_factory):
+    subscription_id = edge_port_subscription_factory()
+
+    return [
+        {"subscription_id": subscription_id},
+        {
+            "tt_number": faker.tt_number(),
+            "geant_ga_id": faker.geant_gid(),
+            "member_speed": PhysicalPortCapacity.FOUR_HUNDRED_GIGABIT_PER_SECOND,
+            "number_of_members": 1,
+        },
+        {
+            "description": faker.sentence(),
+            "ae_members": [
+                {
+                    "interface_name": "Interface1",
+                    "interface_description": faker.sentence(),
+                }
+            ],
+        },
+    ]
+
+
+@pytest.mark.workflow()
+@patch("gso.workflows.edge_port.modify_edge_port.execute_playbook")
+@patch("gso.services.netbox_client.NetboxClient.get_available_interfaces")
+@patch("gso.services.netbox_client.NetboxClient.attach_interface_to_lag")
+@patch("gso.services.netbox_client.NetboxClient.reserve_interface")
+@patch("gso.services.netbox_client.NetboxClient.allocate_interface")
+@patch("gso.services.netbox_client.NetboxClient.free_interface")
+@patch("gso.services.netbox_client.NetboxClient.detach_interfaces_from_lag")
+@patch("gso.services.netbox_client.NetboxClient.get_interface_by_name_and_device")
+def test_modify_edge_port_with_changing_capacity(
+    mocked_get_interface_by_name_and_device,
+    mocked_detach_interfaces_from_lag,
+    mocked_free_interface,
+    mocked_allocate_interface,
+    mocked_reserve_interface,
+    mocked_attach_interface_to_lag,
+    mocked_get_available_interfaces,
+    mocked_execute_playbook,
+    input_form_wizard_data,
+    faker,
+    data_config_filename,
+):
+    #  Set up mock return values
+    mocked_netbox = MockedNetboxClient()
+    mocked_get_available_interfaces.return_value = mocked_netbox.get_available_interfaces()
+    mocked_attach_interface_to_lag.return_value = mocked_netbox.attach_interface_to_lag()
+    mocked_reserve_interface.return_value = mocked_netbox.reserve_interface()
+    mocked_allocate_interface.return_value = mocked_netbox.allocate_interface()
+    mocked_free_interface.return_value = mocked_netbox.free_interface()
+    mocked_detach_interfaces_from_lag.return_value = mocked_netbox.detach_interfaces_from_lag()
+    mocked_get_interface_by_name_and_device.side_effect = mocked_netbox.get_interface_by_name_and_device
+
+    #  Run workflow
+    result, process_stat, step_log = run_workflow("modify_edge_port", input_form_wizard_data)
+
+    for _ in range(2):
+        result, step_log = assert_lso_interaction_success(result, process_stat, step_log)
+
+    assert_complete(result)
+    # Validate the final state and subscription data
+    state = extract_state(result)
+    subscription_id = state["subscription_id"]
+    subscription = EdgePort.from_subscription(subscription_id)
+
+    assert subscription.status == "active"
+    assert mocked_execute_playbook.call_count == 2
+
+    # The number of members have been changed from 2 to 1
+    assert mocked_reserve_interface.call_count == 1
+    assert mocked_attach_interface_to_lag.call_count == 1
+    assert mocked_free_interface.call_count == 2
+    assert mocked_detach_interfaces_from_lag.call_count == 1
+    assert subscription.edge_port.edge_port_geant_ga_id == input_form_wizard_data[1]["geant_ga_id"]
+    assert len(subscription.edge_port.edge_port_ae_members) == 1
+
+
+@pytest.fixture()
+def input_form_wizard_without_changing_capacity(request, faker, edge_port_subscription_factory, partner_factory):
+    subscription_id = edge_port_subscription_factory()
+    subscription = EdgePort.from_subscription(subscription_id)
+
+    return [
+        {"subscription_id": subscription_id},
+        {"tt_number": faker.tt_number(), "geant_ga_id": faker.geant_gid()},
+        {
+            "description": faker.sentence(),
+            "ae_members": [
+                {
+                    "interface_name": interface.interface_name,
+                    "interface_description": interface.interface_description,
+                }
+                for interface in subscription.edge_port.edge_port_ae_members
+            ],
+        },
+    ]
+
+
+@pytest.mark.workflow()
+@patch("gso.workflows.edge_port.modify_edge_port.execute_playbook")
+@patch("gso.services.netbox_client.NetboxClient.get_available_interfaces")
+@patch("gso.services.netbox_client.NetboxClient.attach_interface_to_lag")
+@patch("gso.services.netbox_client.NetboxClient.reserve_interface")
+@patch("gso.services.netbox_client.NetboxClient.allocate_interface")
+@patch("gso.services.netbox_client.NetboxClient.free_interface")
+@patch("gso.services.netbox_client.NetboxClient.detach_interfaces_from_lag")
+@patch("gso.services.netbox_client.NetboxClient.get_interface_by_name_and_device")
+def test_modify_edge_port_without_changing_capacity(
+    mocked_get_interface_by_name_and_device,
+    mocked_detach_interfaces_from_lag,
+    mocked_free_interface,
+    mocked_allocate_interface,
+    mocked_reserve_interface,
+    mocked_attach_interface_to_lag,
+    mocked_get_available_interfaces,
+    mocked_execute_playbook,
+    input_form_wizard_without_changing_capacity,
+    faker,
+    data_config_filename,
+):
+    #  Set up mock return values
+    mocked_netbox = MockedNetboxClient()
+    mocked_get_available_interfaces.return_value = mocked_netbox.get_available_interfaces()
+    mocked_attach_interface_to_lag.return_value = mocked_netbox.attach_interface_to_lag()
+    mocked_reserve_interface.return_value = mocked_netbox.reserve_interface()
+    mocked_allocate_interface.return_value = mocked_netbox.allocate_interface()
+    mocked_free_interface.return_value = mocked_netbox.free_interface()
+    mocked_detach_interfaces_from_lag.return_value = mocked_netbox.detach_interfaces_from_lag()
+    mocked_get_interface_by_name_and_device.side_effect = mocked_netbox.get_interface_by_name_and_device
+
+    #  Run workflow
+    result, _, _ = run_workflow("modify_edge_port", input_form_wizard_without_changing_capacity)
+    assert_complete(result)
+
+    state = extract_state(result)
+    subscription_id = state["subscription_id"]
+    subscription = EdgePort.from_subscription(subscription_id)
+
+    assert subscription.status == "active"
+
+    # The capacity has not been changed so the following methods should not be called
+    assert mocked_execute_playbook.call_count == 0
+    assert mocked_reserve_interface.call_count == 0
+    assert mocked_attach_interface_to_lag.call_count == 0
+    assert mocked_free_interface.call_count == 0
+    assert mocked_detach_interfaces_from_lag.call_count == 0
+
+    assert subscription.edge_port.edge_port_geant_ga_id == input_form_wizard_without_changing_capacity[1]["geant_ga_id"]
+    assert len(subscription.edge_port.edge_port_ae_members) == 2
+    assert subscription.edge_port.edge_port_description == input_form_wizard_without_changing_capacity[2]["description"]
diff --git a/test/workflows/edge_port/test_terminate_edge_port.py b/test/workflows/edge_port/test_terminate_edge_port.py
new file mode 100644
index 00000000..cc7610c3
--- /dev/null
+++ b/test/workflows/edge_port/test_terminate_edge_port.py
@@ -0,0 +1,56 @@
+from unittest.mock import patch
+
+import pytest
+
+from gso.products import EdgePort
+from test.services.conftest import MockedNetboxClient
+from test.workflows import (
+    assert_complete,
+    assert_lso_interaction_success,
+    extract_state,
+    run_workflow,
+)
+
+
+@pytest.mark.workflow()
+@patch("gso.workflows.edge_port.terminate_edge_port.execute_playbook")
+@patch("gso.services.netbox_client.NetboxClient.delete_interface")
+@patch("gso.services.netbox_client.NetboxClient.free_interface")
+def test_successful_edge_port_termination(
+    mocked_free_interface,
+    mocked_delete_interface,
+    mock_execute_playbook,
+    edge_port_subscription_factory,
+    faker,
+    data_config_filename,
+):
+    #  Set up mock return values
+    subscription_id = edge_port_subscription_factory()
+    mocked_netbox = MockedNetboxClient()
+    mocked_delete_interface.return_value = mocked_netbox.delete_interface()
+    mocked_free_interface.return_value = mocked_netbox.free_interface()
+
+    #  Run workflow
+    initial_data = [
+        {"subscription_id": subscription_id},
+        {
+            "tt_number": faker.tt_number(),
+        },
+    ]
+    result, process_stat, step_log = run_workflow("terminate_edge_port", initial_data)
+
+    for _ in range(2):
+        result, step_log = assert_lso_interaction_success(result, process_stat, step_log)
+
+    assert_complete(result)
+
+    # Check NetboxClient calls
+    assert mocked_delete_interface.call_count == 1  # Delete the lag
+    assert mocked_free_interface.call_count == 2  # Free interfaces attached to the lag which is 2
+
+    state = extract_state(result)
+    subscription_id = state["subscription_id"]
+    subscription = EdgePort.from_subscription(subscription_id)
+
+    assert subscription.status == "terminated"
+    assert mock_execute_playbook.call_count == 2
diff --git a/test/workflows/edge_port/test_validate_edge_port.py b/test/workflows/edge_port/test_validate_edge_port.py
new file mode 100644
index 00000000..261d5b24
--- /dev/null
+++ b/test/workflows/edge_port/test_validate_edge_port.py
@@ -0,0 +1,62 @@
+from unittest.mock import patch
+
+import pytest
+
+from gso.products import EdgePort
+from test.services.conftest import MockedNetboxClient
+from test.workflows import (
+    assert_complete,
+    assert_lso_success,
+    extract_state,
+    run_workflow,
+)
+
+
+@pytest.mark.workflow()
+@patch("gso.workflows.edge_port.validate_edge_port.execute_playbook")
+@patch("gso.services.netbox_client.NetboxClient.get_interface_by_name_and_device")
+def test_validate_edge_port_success(
+    mock_get_interface_by_name_and_device,
+    mock_execute_playbook,
+    edge_port_subscription_factory,
+    faker,
+    data_config_filename,
+):
+    subscription_id = edge_port_subscription_factory()
+    mock_get_interface_by_name_and_device.side_effect = [
+        MockedNetboxClient.BaseMockObject(
+            name="iFace1",
+            module=MockedNetboxClient.BaseMockObject(display="display1"),
+            description=subscription_id,
+            enabled=True,
+        ),
+        MockedNetboxClient.BaseMockObject(
+            name="iFace2",
+            module=MockedNetboxClient.BaseMockObject(display="display2"),
+            description=subscription_id,
+            enabled=True,
+        ),
+        MockedNetboxClient.BaseMockObject(
+            name="Iface3",
+            module=MockedNetboxClient.BaseMockObject(display="display3"),
+            description=subscription_id,
+            enabled=True,
+        ),
+    ]
+
+    #  Run workflow
+    initial_data = [{"subscription_id": subscription_id}]
+    result, process_stat, step_log = run_workflow("validate_edge_port", initial_data)
+
+    state = extract_state(result)
+    subscription_id = state["subscription_id"]
+
+    for _ in range(1):
+        result, step_log = assert_lso_success(result, process_stat, step_log)
+
+    assert_complete(result)
+    subscription = EdgePort.from_subscription(subscription_id)
+    assert subscription.status == "active"
+    assert mock_execute_playbook.call_count == 1
+    # One time for getting the LAG and two times for getting the interfaces
+    assert mock_get_interface_by_name_and_device.call_count == 3
diff --git a/test/workflows/tasks/test_delete_partners.py b/test/workflows/tasks/test_delete_partners.py
index b0964bdf..3fb36c84 100644
--- a/test/workflows/tasks/test_delete_partners.py
+++ b/test/workflows/tasks/test_delete_partners.py
@@ -6,7 +6,7 @@ from pydantic_forms.exceptions import FormValidationError
 from sqlalchemy import select
 
 from gso.services.partners import filter_partners_by_name
-from test.fixtures import create_subscription_for_mapping
+from test.fixtures.common_fixtures import create_subscription_for_mapping
 from test.workflows import assert_complete, run_workflow
 
 CORRECT_SUBSCRIPTION = str(uuid4())
-- 
GitLab