From 89f65bf63445680f9f10388c51ad7b829b8adf58 Mon Sep 17 00:00:00 2001
From: Neda Moeini <neda.moeini@geant.org>
Date: Mon, 23 Sep 2024 16:20:38 +0200
Subject: [PATCH] Import Edge Port workflows.

---
 gso/cli/imports.py                            |  86 ++++++++++++-
 ...a9c88aba_add_imported_edge_port_product.py |  56 +++++++++
 ...cbaa47d7_add_import_edge_port_workflows.py |  45 +++++++
 gso/products/__init__.py                      |   5 +-
 gso/products/product_types/edge_port.py       |  14 +++
 gso/utils/helpers.py                          |  10 ++
 gso/workflows/__init__.py                     |   2 +
 gso/workflows/edge_port/create_edge_port.py   |  11 +-
 .../edge_port/create_imported_edge_port.py    | 118 ++++++++++++++++++
 gso/workflows/edge_port/import_edge_port.py   |  28 +++++
 test/cli/test_imports.py                      |  69 ++++++++++
 test/fixtures/edge_port_fixtures.py           |  17 ++-
 .../test_create_imported_edge_port.py         |  43 +++++++
 .../edge_port/test_import_edge_port.py        |  17 +++
 14 files changed, 506 insertions(+), 15 deletions(-)
 create mode 100644 gso/migrations/versions/2024-09-23_3b65a9c88aba_add_imported_edge_port_product.py
 create mode 100644 gso/migrations/versions/2024-09-23_f900cbaa47d7_add_import_edge_port_workflows.py
 create mode 100644 gso/workflows/edge_port/create_imported_edge_port.py
 create mode 100644 gso/workflows/edge_port/import_edge_port.py
 create mode 100644 test/workflows/edge_port/test_create_imported_edge_port.py
 create mode 100644 test/workflows/edge_port/test_import_edge_port.py

diff --git a/gso/cli/imports.py b/gso/cli/imports.py
index 33a4636a..4e0d56f1 100644
--- a/gso/cli/imports.py
+++ b/gso/cli/imports.py
@@ -18,6 +18,7 @@ from sqlalchemy.exc import SQLAlchemyError
 
 from gso.db.models import PartnerTable
 from gso.products import ProductType
+from gso.products.product_blocks.edge_port import EdgePortType, EncapsulationType
 from gso.products.product_blocks.iptrunk import IptrunkType
 from gso.products.product_blocks.router import RouterRole
 from gso.services.partners import (
@@ -161,6 +162,56 @@ class OpenGearImportModel(BaseModel):
     opengear_wan_gateway: IPv4AddressType
 
 
+class EdgePortImportModel(BaseModel):
+    """Required fields for importing an existing :class:`gso.products.product_types.edge_port`."""
+
+    node: str
+    service_type: EdgePortType
+    speed: PhysicalPortCapacity
+    encapsulation: EncapsulationType
+    name: str
+    minimum_links: int
+    geant_ga_id: str | None
+    mac_address: str | None
+    partner: str
+    enable_lacp: bool
+    ignore_if_down: bool
+    ae_members: LAGMemberList[LAGMember]
+    description: str | None = None
+
+    @field_validator("partner")
+    def check_if_partner_exists(cls, value: str) -> str:
+        """Validate that the partner exists."""
+        try:
+            get_partner_by_name(value)
+        except PartnerNotFoundError as e:
+            msg = f"Partner {value} not found"
+            raise ValueError(msg) from e
+
+        return value
+
+    @field_validator("node")
+    def validate_node(cls, value: str) -> str:
+        """Check if the node is an active PE router in :term:`GSO`."""
+        pe_routers = {
+            str(router.subscription_id)
+            for router in get_active_subscriptions_by_field_and_value("router_role", RouterRole.PE)
+        }
+        if value not in pe_routers:
+            msg = f"Router {value} not found"
+            raise ValueError(msg)
+
+        return value
+
+    @model_validator(mode="after")
+    def check_members(self) -> Self:
+        """Amount of :term:`LAG` members has to match and meet the minimum requirement."""
+        if len(self.ae_members) < self.minimum_links:
+            msg = f"Number of members should be at least {self.minimum_links} (edge_port_minimum_links)"
+            raise ValueError(msg)
+        return self
+
+
 T = TypeVar(
     "T",
     SiteImportModel,
@@ -169,6 +220,7 @@ T = TypeVar(
     SuperPopSwitchImportModel,
     OfficeRouterImportModel,
     OpenGearImportModel,
+    EdgePortImportModel,
 )
 
 common_filepath_option = typer.Option(
@@ -219,7 +271,7 @@ def _generic_import_product(
     successfully_imported_data = []
     data = _read_data(file_path)
     for details in data:
-        details["partner"] = "GEANT"
+        details["partner"] = details.get("partner", "GEANT")
         typer.echo(f"Creating imported {name_key}: {details[name_key]}")
         try:
             initial_data = import_model(**details)
@@ -297,6 +349,38 @@ def import_opengear(filepath: str = common_filepath_option) -> None:
     )
 
 
+@app.command()
+def import_edge_port(filepath: str = common_filepath_option) -> None:
+    """Import Edge Port into GSO."""
+    successfully_imported_data = []
+    data = _read_data(Path(filepath))
+    for edge_port in data:
+        typer.echo(f"Importing Edge Port {edge_port["name"]} on {edge_port["node"]}. ")
+        try:
+            edge_port["node"] = _get_router_subscription_id(edge_port["node"])
+            initial_data = EdgePortImportModel(**edge_port)
+            start_process("create_imported_edge_port", [initial_data.model_dump()])
+            successfully_imported_data.append(edge_port["name"])
+            typer.echo(f"Successfully imported Edge Port {edge_port["name"]} on {edge_port["node"]}.")
+        except ValidationError as e:
+            typer.echo(f"Validation error: {e}")
+
+    typer.echo("Waiting for the dust to settle before moving on the importing new products...")
+    time.sleep(1)
+
+    edge_port_ids = get_subscriptions(
+        [ProductType.IMPORTED_EDGE_PORT], lifecycles=[SubscriptionLifecycle.ACTIVE], includes=["subscription_id"]
+    )
+    for subscription_id in edge_port_ids:
+        typer.echo(f"Migrating Edge Port {subscription_id}")
+        start_process("import_edge_port", [subscription_id])
+
+    if successfully_imported_data:
+        typer.echo("Successfully imported Edge Ports:")
+        for item in successfully_imported_data:
+            typer.echo(f"- {item}")
+
+
 @app.command()
 def import_iptrunks(filepath: str = common_filepath_option) -> None:
     """Import IP trunks into GSO."""
diff --git a/gso/migrations/versions/2024-09-23_3b65a9c88aba_add_imported_edge_port_product.py b/gso/migrations/versions/2024-09-23_3b65a9c88aba_add_imported_edge_port_product.py
new file mode 100644
index 00000000..880665f8
--- /dev/null
+++ b/gso/migrations/versions/2024-09-23_3b65a9c88aba_add_imported_edge_port_product.py
@@ -0,0 +1,56 @@
+"""Add Imported Edge Port product..
+
+Revision ID: 3b65a9c88aba
+Revises: 734dc86f5dd3
+Create Date: 2024-09-23 10:55:40.258629
+
+"""
+import sqlalchemy as sa
+from alembic import op
+
+# revision identifiers, used by Alembic.
+revision = '3b65a9c88aba'
+down_revision = '734dc86f5dd3'
+branch_labels = None
+depends_on = None
+
+
+def upgrade() -> None:
+    conn = op.get_bind()
+    conn.execute(sa.text("""
+DELETE FROM product_block_relations WHERE product_block_relations.in_use_by_id IN (SELECT product_blocks.product_block_id FROM product_blocks WHERE product_blocks.name IN ('EdgePortBlock')) AND product_block_relations.depends_on_id IN (SELECT product_blocks.product_block_id FROM product_blocks WHERE product_blocks.name IN ('EdgePortInterfaceBlock'))
+    """))
+    conn.execute(sa.text("""
+DELETE FROM subscription_instances WHERE subscription_instances.product_block_id IN (SELECT product_blocks.product_block_id FROM product_blocks WHERE product_blocks.name IN ('EdgePortInterfaceBlock'))
+    """))
+    conn.execute(sa.text("""
+DELETE FROM product_blocks WHERE product_blocks.name IN ('EdgePortInterfaceBlock')
+    """))
+    conn.execute(sa.text("""
+INSERT INTO products (name, description, product_type, tag, status) VALUES ('Imported Edge port', 'Imported edge ports', 'ImportedEdgePort', 'IMPORTED_EDGE_PORT', 'active') RETURNING products.product_id
+    """))
+    conn.execute(sa.text("""
+INSERT INTO product_product_blocks (product_id, product_block_id) VALUES ((SELECT products.product_id FROM products WHERE products.name IN ('Imported Edge port')), (SELECT product_blocks.product_block_id FROM product_blocks WHERE product_blocks.name IN ('EdgePortBlock')))
+    """))
+
+
+def downgrade() -> None:
+    conn = op.get_bind()
+    conn.execute(sa.text("""
+DELETE FROM product_product_blocks WHERE product_product_blocks.product_id IN (SELECT products.product_id FROM products WHERE products.name IN ('Imported Edge port')) AND product_product_blocks.product_block_id IN (SELECT product_blocks.product_block_id FROM product_blocks WHERE product_blocks.name IN ('EdgePortBlock'))
+    """))
+    conn.execute(sa.text("""
+DELETE FROM processes WHERE processes.pid IN (SELECT processes_subscriptions.pid FROM processes_subscriptions WHERE processes_subscriptions.subscription_id IN (SELECT subscriptions.subscription_id FROM subscriptions WHERE subscriptions.product_id IN (SELECT products.product_id FROM products WHERE products.name IN ('Imported Edge port'))))
+    """))
+    conn.execute(sa.text("""
+DELETE FROM processes_subscriptions WHERE processes_subscriptions.subscription_id IN (SELECT subscriptions.subscription_id FROM subscriptions WHERE subscriptions.product_id IN (SELECT products.product_id FROM products WHERE products.name IN ('Imported Edge port')))
+    """))
+    conn.execute(sa.text("""
+DELETE FROM subscription_instances WHERE subscription_instances.subscription_id IN (SELECT subscriptions.subscription_id FROM subscriptions WHERE subscriptions.product_id IN (SELECT products.product_id FROM products WHERE products.name IN ('Imported Edge port')))
+    """))
+    conn.execute(sa.text("""
+DELETE FROM subscriptions WHERE subscriptions.product_id IN (SELECT products.product_id FROM products WHERE products.name IN ('Imported Edge port'))
+    """))
+    conn.execute(sa.text("""
+DELETE FROM products WHERE products.name IN ('Imported Edge port')
+    """))
diff --git a/gso/migrations/versions/2024-09-23_f900cbaa47d7_add_import_edge_port_workflows.py b/gso/migrations/versions/2024-09-23_f900cbaa47d7_add_import_edge_port_workflows.py
new file mode 100644
index 00000000..2e6175d7
--- /dev/null
+++ b/gso/migrations/versions/2024-09-23_f900cbaa47d7_add_import_edge_port_workflows.py
@@ -0,0 +1,45 @@
+"""Add Import Edge Port workflows..
+
+Revision ID: f900cbaa47d7
+Revises: 3b65a9c88aba
+Create Date: 2024-09-23 10:58:47.196446
+
+"""
+import sqlalchemy as sa
+from alembic import op
+
+# revision identifiers, used by Alembic.
+revision = 'f900cbaa47d7'
+down_revision = '3b65a9c88aba'
+branch_labels = None
+depends_on = None
+
+
+from orchestrator.migrations.helpers import create_workflow, delete_workflow
+
+new_workflows = [
+    {
+        "name": "create_imported_edge_port",
+        "target": "CREATE",
+        "description": "Import Edge Port",
+        "product_type": "ImportedEdgePort"
+    },
+    {
+        "name": "import_edge_port",
+        "target": "MODIFY",
+        "description": "Import Edge Port",
+        "product_type": "ImportedEdgePort"
+    }
+]
+
+
+def upgrade() -> None:
+    conn = op.get_bind()
+    for workflow in new_workflows:
+        create_workflow(conn, workflow)
+
+
+def downgrade() -> None:
+    conn = op.get_bind()
+    for workflow in new_workflows:
+        delete_workflow(conn, workflow["name"])
diff --git a/gso/products/__init__.py b/gso/products/__init__.py
index df2f7ff1..0e943f6a 100644
--- a/gso/products/__init__.py
+++ b/gso/products/__init__.py
@@ -8,7 +8,7 @@
 from orchestrator.domain import SUBSCRIPTION_MODEL_REGISTRY
 from pydantic_forms.types import strEnum
 
-from gso.products.product_types.edge_port import EdgePort
+from gso.products.product_types.edge_port import EdgePort, ImportedEdgePort
 from gso.products.product_types.geant_ip import GeantIP
 from gso.products.product_types.iptrunk import ImportedIptrunk, Iptrunk
 from gso.products.product_types.lan_switch_interconnect import LanSwitchInterconnect
@@ -40,6 +40,7 @@ class ProductName(strEnum):
     OPENGEAR = "Opengear"
     IMPORTED_OPENGEAR = "Imported Opengear"
     EDGE_PORT = "Edge port"
+    IMPORTED_EDGE_PORT = "Imported Edge port"
     GEANT_IP = "GEANT IP"
 
 
@@ -62,6 +63,7 @@ class ProductType(strEnum):
     OPENGEAR = Opengear.__name__
     IMPORTED_OPENGEAR = Opengear.__name__
     EDGE_PORT = EdgePort.__name__
+    IMPORTED_EDGE_PORT = ImportedEdgePort.__name__
     GEANT_IP = GeantIP.__name__
 
 
@@ -83,6 +85,7 @@ SUBSCRIPTION_MODEL_REGISTRY.update(
         ProductName.OPENGEAR.value: Opengear,
         ProductName.IMPORTED_OPENGEAR.value: ImportedOpengear,
         ProductName.EDGE_PORT.value: EdgePort,
+        ProductName.IMPORTED_EDGE_PORT.value: ImportedEdgePort,
         ProductType.GEANT_IP.value: GeantIP,
     },
 )
diff --git a/gso/products/product_types/edge_port.py b/gso/products/product_types/edge_port.py
index 021aa026..417b6047 100644
--- a/gso/products/product_types/edge_port.py
+++ b/gso/products/product_types/edge_port.py
@@ -26,3 +26,17 @@ class EdgePort(EdgePortProvisioning, lifecycle=[SubscriptionLifecycle.ACTIVE]):
     """An Edge Port that is active."""
 
     edge_port: EdgePortBlock
+
+
+class ImportedEdgePortInactive(SubscriptionModel, is_base=True):
+    """An imported, inactive Edge Port."""
+
+    edge_port: EdgePortBlockInactive
+
+
+class ImportedEdgePort(
+    ImportedEdgePortInactive, lifecycle=[SubscriptionLifecycle.PROVISIONING, SubscriptionLifecycle.ACTIVE]
+):
+    """An imported Edge Port that is currently active."""
+
+    edge_port: EdgePortBlock
diff --git a/gso/utils/helpers.py b/gso/utils/helpers.py
index 1e1470c1..6a1a49f9 100644
--- a/gso/utils/helpers.py
+++ b/gso/utils/helpers.py
@@ -190,6 +190,16 @@ def active_router_selector() -> Choice:
     return Choice("Select a router", zip(router_subscriptions.keys(), router_subscriptions.items(), strict=True))  # type: ignore[arg-type]
 
 
+def active_pe_router_selector() -> Choice:
+    """Generate a dropdown selector for choosing an active PE Router in an input form."""
+    routers = {
+        str(router.subscription_id): router.description
+        for router in subscriptions.get_active_subscriptions_by_field_and_value("router_role", RouterRole.PE)
+    }
+
+    return Choice("Select a router", zip(routers.keys(), routers.items(), strict=True))  # type: ignore[arg-type]
+
+
 def active_switch_selector() -> Choice:
     """Generate a dropdown selector for choosing an active Switch in an input form."""
     switch_subscriptions = {
diff --git a/gso/workflows/__init__.py b/gso/workflows/__init__.py
index 9d7ff083..29d180c1 100644
--- a/gso/workflows/__init__.py
+++ b/gso/workflows/__init__.py
@@ -84,6 +84,8 @@ LazyWorkflowInstance("gso.workflows.edge_port.create_edge_port", "create_edge_po
 LazyWorkflowInstance("gso.workflows.edge_port.modify_edge_port", "modify_edge_port")
 LazyWorkflowInstance("gso.workflows.edge_port.terminate_edge_port", "terminate_edge_port")
 LazyWorkflowInstance("gso.workflows.edge_port.validate_edge_port", "validate_edge_port")
+LazyWorkflowInstance("gso.workflows.edge_port.create_imported_edge_port", "create_imported_edge_port")
+LazyWorkflowInstance("gso.workflows.edge_port.import_edge_port", "import_edge_port")
 
 #  GÉANT IP workflows
 LazyWorkflowInstance("gso.workflows.geant_ip.create_geant_ip", "create_geant_ip")
diff --git a/gso/workflows/edge_port/create_edge_port.py b/gso/workflows/edge_port/create_edge_port.py
index 1e57b232..399013cd 100644
--- a/gso/workflows/edge_port/create_edge_port.py
+++ b/gso/workflows/edge_port/create_edge_port.py
@@ -6,7 +6,6 @@ from uuid import uuid4
 from annotated_types import Len
 from orchestrator import step, workflow
 from orchestrator.forms import FormPage
-from orchestrator.forms.validators import Choice
 from orchestrator.targets import Target
 from orchestrator.types import FormGenerator, State, SubscriptionLifecycle, UUIDstr
 from orchestrator.utils.errors import ProcessFailureError
@@ -18,14 +17,14 @@ from pydantic_forms.validators import validate_unique_list
 from pynetbox.models.dcim import Interfaces
 
 from gso.products.product_blocks.edge_port import EdgePortAEMemberBlockInactive, EdgePortType, EncapsulationType
-from gso.products.product_blocks.router import RouterRole
 from gso.products.product_types.edge_port import EdgePortInactive, EdgePortProvisioning
 from gso.products.product_types.router import Router
-from gso.services import lso_client, subscriptions
+from gso.services import lso_client
 from gso.services.lso_client import lso_interaction
 from gso.services.netbox_client import NetboxClient
 from gso.services.partners import get_partner_by_id
 from gso.utils.helpers import (
+    active_pe_router_selector,
     available_interfaces_choices,
     available_service_lags_choices,
     partner_choice,
@@ -37,16 +36,12 @@ from gso.utils.types.tt_number import TTNumber
 
 def initial_input_form_generator(product_name: str) -> FormGenerator:
     """Gather information to create a new Edge Port."""
-    routers = {}
-    for router in subscriptions.get_active_subscriptions_by_field_and_value("router_role", RouterRole.PE):
-        routers[str(router.subscription_id)] = router.description
-    router_enum = Choice("Select a router", zip(routers.keys(), routers.items(), strict=True))  # type: ignore[arg-type]
 
     class CreateEdgePortForm(FormPage):
         model_config = ConfigDict(title=product_name)
 
         tt_number: TTNumber
-        node: router_enum  # type: ignore[valid-type]
+        node: active_pe_router_selector()  # type: ignore[valid-type]
         partner: partner_choice()  # type: ignore[valid-type]
         service_type: EdgePortType
         enable_lacp: bool = False
diff --git a/gso/workflows/edge_port/create_imported_edge_port.py b/gso/workflows/edge_port/create_imported_edge_port.py
new file mode 100644
index 00000000..f6fb84fd
--- /dev/null
+++ b/gso/workflows/edge_port/create_imported_edge_port.py
@@ -0,0 +1,118 @@
+"""A creation workflow that adds an existing Edge Port to the DB."""
+
+from typing import Annotated, Any
+from uuid import uuid4
+
+from orchestrator import workflow
+from orchestrator.forms import FormPage
+from orchestrator.targets import Target
+from orchestrator.types import FormGenerator, State, SubscriptionLifecycle
+from orchestrator.workflow import StepList, begin, done, step
+from orchestrator.workflows.steps import resync, set_status, store_process_subscription
+from pydantic import AfterValidator, ConfigDict
+from pydantic_forms.types import UUIDstr
+from pydantic_forms.validators import validate_unique_list
+
+from gso.products import ProductName
+from gso.products.product_blocks.edge_port import EdgePortAEMemberBlockInactive, EdgePortType, EncapsulationType
+from gso.products.product_types.edge_port import EdgePortInactive, ImportedEdgePortInactive
+from gso.products.product_types.router import Router
+from gso.services.partners import get_partner_by_name
+from gso.services.subscriptions import get_product_id_by_name
+from gso.utils.helpers import active_pe_router_selector
+from gso.utils.types.interfaces import LAGMember, PhysicalPortCapacity
+
+
+@step("Create subscription")
+def create_subscription(partner: str) -> State:
+    """Create a new subscription object."""
+    partner_id = get_partner_by_name(partner)["partner_id"]
+    product_id = get_product_id_by_name(ProductName.IMPORTED_EDGE_PORT)
+    subscription = ImportedEdgePortInactive.from_product_id(product_id, partner_id)
+
+    return {
+        "subscription": subscription,
+        "subscription_id": subscription.subscription_id,
+    }
+
+
+def initial_input_form_generator() -> FormGenerator:
+    """Generate a form that is filled in using information passed through the :term:`API` endpoint."""
+
+    class ImportEdgePort(FormPage):
+        model_config = ConfigDict(title="Import Router")
+
+        node: active_pe_router_selector()  # type: ignore[valid-type]
+        partner: str
+        service_type: EdgePortType
+        enable_lacp: bool
+        speed: PhysicalPortCapacity
+        encapsulation: EncapsulationType = EncapsulationType.DOT1Q
+        minimum_links: int
+        mac_address: str | None = None
+        ignore_if_down: bool = False
+        geant_ga_id: str | None = None
+        description: str | None = None
+        name: str
+        ae_members: Annotated[list[LAGMember], AfterValidator(validate_unique_list)]
+
+    user_input = yield ImportEdgePort
+
+    return user_input.model_dump()
+
+
+@step("Initialize subscription")
+def initialize_subscription(
+    subscription: EdgePortInactive,
+    node: UUIDstr,
+    service_type: EdgePortType,
+    speed: PhysicalPortCapacity,
+    encapsulation: EncapsulationType,
+    name: str,
+    minimum_links: int,
+    geant_ga_id: str | None,
+    mac_address: str | None,
+    partner: str,
+    enable_lacp: bool,  # noqa: FBT001
+    ignore_if_down: bool,  # noqa: FBT001
+    ae_members: list[dict[str, Any]],
+    description: str | None = None,
+) -> State:
+    """Initialise the subscription object in the service database."""
+    router = Router.from_subscription(node).router
+    subscription.edge_port.edge_port_node = router
+    subscription.edge_port.edge_port_type = service_type
+    subscription.edge_port.edge_port_enable_lacp = enable_lacp
+    subscription.edge_port.edge_port_member_speed = speed
+    subscription.edge_port.edge_port_encapsulation = encapsulation
+    subscription.edge_port.edge_port_name = name
+    subscription.edge_port.edge_port_minimum_links = minimum_links
+    subscription.edge_port.edge_port_ignore_if_down = ignore_if_down
+    subscription.edge_port.edge_port_geant_ga_id = geant_ga_id
+    subscription.edge_port.edge_port_mac_address = mac_address
+    subscription.description = f"Edge Port {name} on {router.router_fqdn}, {partner}, {geant_ga_id or ""}"
+    subscription.edge_port.edge_port_description = description
+    for member in ae_members:
+        subscription.edge_port.edge_port_ae_members.append(
+            EdgePortAEMemberBlockInactive.new(subscription_id=uuid4(), **member)
+        )
+
+    return {"subscription": subscription}
+
+
+@workflow(
+    "Import Edge Port",
+    initial_input_form=initial_input_form_generator,
+    target=Target.CREATE,
+)
+def create_imported_edge_port() -> StepList:
+    """Import a Edge Port without provisioning it."""
+    return (
+        begin
+        >> create_subscription
+        >> store_process_subscription(Target.CREATE)
+        >> initialize_subscription
+        >> set_status(SubscriptionLifecycle.ACTIVE)
+        >> resync
+        >> done
+    )
diff --git a/gso/workflows/edge_port/import_edge_port.py b/gso/workflows/edge_port/import_edge_port.py
new file mode 100644
index 00000000..abbf49e2
--- /dev/null
+++ b/gso/workflows/edge_port/import_edge_port.py
@@ -0,0 +1,28 @@
+"""A modification workflow for migrating an ImportedEdgePort to an EdgePort subscription."""
+
+from orchestrator.targets import Target
+from orchestrator.types import State, UUIDstr
+from orchestrator.workflow import StepList, done, init, step, workflow
+from orchestrator.workflows.steps import resync, store_process_subscription, unsync
+from orchestrator.workflows.utils import wrap_modify_initial_input_form
+
+from gso.products import EdgePort, ImportedEdgePort, ProductName
+from gso.services.subscriptions import get_product_id_by_name
+
+
+@step("Create new Edge Port subscription")
+def import_edge_port_subscription(subscription_id: UUIDstr) -> State:
+    """Take an ImportedEdgePort subscription, and turn it into an EdgePort subscription."""
+    old_edge_port = ImportedEdgePort.from_subscription(subscription_id)
+    new_subscription_id = get_product_id_by_name(ProductName.EDGE_PORT)
+    new_subscription = EdgePort.from_other_product(old_edge_port, new_subscription_id)  # type: ignore[arg-type]
+
+    return {"subscription": new_subscription}
+
+
+@workflow("Import Edge Port", target=Target.MODIFY, initial_input_form=wrap_modify_initial_input_form(None))
+def import_edge_port() -> StepList:
+    """Modify an ImportedEdgePort subscription into an EdgePort subscription to complete the import."""
+    return (
+        init >> store_process_subscription(Target.MODIFY) >> unsync >> import_edge_port_subscription >> resync >> done
+    )
diff --git a/test/cli/test_imports.py b/test/cli/test_imports.py
index 98332386..58352ea7 100644
--- a/test/cli/test_imports.py
+++ b/test/cli/test_imports.py
@@ -5,6 +5,7 @@ from unittest.mock import patch
 import pytest
 
 from gso.cli.imports import (
+    import_edge_port,
     import_iptrunks,
     import_office_routers,
     import_opengear,
@@ -13,6 +14,7 @@ from gso.cli.imports import (
     import_super_pop_switches,
 )
 from gso.products import Router, Site
+from gso.products.product_blocks.edge_port import EdgePortType, EncapsulationType
 from gso.products.product_blocks.iptrunk import IptrunkType
 from gso.products.product_blocks.router import RouterRole
 from gso.products.product_blocks.site import SiteTier
@@ -195,6 +197,41 @@ def opengear_data(temp_file, faker, site_subscription_factory):
     return _opengear_data
 
 
+@pytest.fixture()
+def edge_port_data(temp_file, faker, nokia_router_subscription_factory, partner_factory):
+    def _edge_port_data(**kwargs):
+        edge_port_data = {
+            "node": Router.from_subscription(nokia_router_subscription_factory()).router.router_fqdn,
+            "service_type": EdgePortType.CUSTOMER,
+            "speed": PhysicalPortCapacity.TEN_GIGABIT_PER_SECOND,
+            "encapsulation": EncapsulationType.DOT1Q,
+            "name": "lag34",
+            "minimum_links": 2,
+            "geant_ga_id": faker.geant_gid(),
+            "mac_address": faker.mac_address(),
+            "partner": partner_factory(name="GAAR", email=faker.email())["name"],
+            "enable_lacp": True,
+            "ignore_if_down": False,
+            "ae_members": [
+                {
+                    "interface_name": faker.network_interface(),
+                    "interface_description": faker.sentence(),
+                },
+                {
+                    "interface_name": faker.network_interface(),
+                    "interface_description": faker.sentence(),
+                },
+            ],
+            "description": faker.sentence(),
+        }
+        edge_port_data.update(**kwargs)
+
+        temp_file.write_text(json.dumps([edge_port_data]))
+        return {"path": str(temp_file), "data": edge_port_data}
+
+    return _edge_port_data
+
+
 ###########
 #  TESTS  #
 ###########
@@ -377,3 +414,35 @@ def test_import_super_pop_switch_success(mock_start_process, mock_sleep, super_p
 def test_import_opengear_success(mock_start_process, opengear_data):
     import_opengear(opengear_data()["path"])
     assert mock_start_process.call_count == 1
+
+
+@patch("gso.cli.imports.time.sleep")
+@patch("gso.cli.imports.start_process")
+def test_import_edge_port_successful(mock_start_process, mock_sleep, edge_port_data):
+    import_edge_port(edge_port_data()["path"])
+    assert mock_start_process.call_count == 1
+
+
+@patch("gso.cli.imports.time.sleep")
+@patch("gso.cli.imports.start_process")
+def test_import_edge_port_with_invalid_router(
+    mock_start_process, mock_sleep, edge_port_data, capfd, nokia_router_subscription_factory
+):
+    p_router = nokia_router_subscription_factory(router_role=RouterRole.P)
+    broken_data = edge_port_data(node=Router.from_subscription(p_router).router.router_fqdn)
+    import_edge_port(broken_data["path"])
+
+    captured_output, _ = capfd.readouterr()
+    assert f"Router {p_router} not found" in captured_output
+    assert mock_start_process.call_count == 0
+
+
+@patch("gso.cli.imports.time.sleep")
+@patch("gso.cli.imports.start_process")
+def test_import_edge_port_with_invalid_partner(mock_start_process, mock_sleep, edge_port_data, capfd):
+    broken_data = edge_port_data(partner="INVALID")
+    import_edge_port(broken_data["path"])
+
+    captured_output, _ = capfd.readouterr()
+    assert "Partner INVALID not found" in captured_output
+    assert mock_start_process.call_count == 0
diff --git a/test/fixtures/edge_port_fixtures.py b/test/fixtures/edge_port_fixtures.py
index 730da5b4..cfed2090 100644
--- a/test/fixtures/edge_port_fixtures.py
+++ b/test/fixtures/edge_port_fixtures.py
@@ -11,7 +11,7 @@ from gso.products.product_blocks.edge_port import (
     EdgePortType,
     EncapsulationType,
 )
-from gso.products.product_types.edge_port import EdgePortInactive
+from gso.products.product_types.edge_port import EdgePortInactive, ImportedEdgePortInactive
 from gso.services import subscriptions
 from gso.utils.types.interfaces import PhysicalPortCapacity
 
@@ -37,13 +37,20 @@ def edge_port_subscription_factory(faker, partner_factory, nokia_router_subscrip
         *,
         enable_lacp=True,
         ignore_if_down=False,
+        is_imported=True,
     ) -> UUIDstr:
         partner = partner or partner_factory(name=faker.company(), email=faker.email())
         edge_port_node = Router.from_subscription(nokia_router_subscription_factory()).router
-        product_id = subscriptions.get_product_id_by_name(ProductName.EDGE_PORT)
-        edge_port_subscription = EdgePortInactive.from_product_id(
-            product_id, customer_id=partner["partner_id"], insync=True
-        )
+        if is_imported:
+            product_id = subscriptions.get_product_id_by_name(ProductName.EDGE_PORT)
+            edge_port_subscription = EdgePortInactive.from_product_id(
+                product_id, customer_id=partner["partner_id"], insync=True
+            )
+        else:
+            product_id = subscriptions.get_product_id_by_name(ProductName.IMPORTED_EDGE_PORT)
+            edge_port_subscription = ImportedEdgePortInactive.from_product_id(
+                product_id, customer_id=partner["partner_id"], insync=True
+            )
 
         edge_port_subscription.edge_port.edge_port_description = description or faker.text(max_nb_chars=30)
         edge_port_subscription.edge_port.edge_port_geant_ga_id = geant_ga_id or faker.geant_sid()
diff --git a/test/workflows/edge_port/test_create_imported_edge_port.py b/test/workflows/edge_port/test_create_imported_edge_port.py
new file mode 100644
index 00000000..21d9f829
--- /dev/null
+++ b/test/workflows/edge_port/test_create_imported_edge_port.py
@@ -0,0 +1,43 @@
+import pytest
+from orchestrator.types import SubscriptionLifecycle
+
+from gso.products import ImportedEdgePort
+from gso.products.product_blocks.edge_port import EdgePortType, EncapsulationType
+from gso.utils.types.interfaces import PhysicalPortCapacity
+from test.workflows import assert_complete, extract_state, run_workflow
+
+
+@pytest.fixture()
+def imported_edge_port_creation_input_form_data(nokia_router_subscription_factory, partner_factory, faker):
+    return {
+        "node": nokia_router_subscription_factory(),
+        "service_type": EdgePortType.CUSTOMER,
+        "speed": PhysicalPortCapacity.TEN_GIGABIT_PER_SECOND,
+        "encapsulation": EncapsulationType.DOT1Q,
+        "name": "lag34",
+        "minimum_links": 2,
+        "geant_ga_id": faker.geant_gid(),
+        "mac_address": faker.mac_address(),
+        "partner": partner_factory(name="GAAR", email=faker.email())["name"],
+        "enable_lacp": True,
+        "ignore_if_down": False,
+        "ae_members": [
+            {
+                "interface_name": faker.network_interface(),
+                "interface_description": faker.sentence(),
+            },
+            {
+                "interface_name": faker.network_interface(),
+                "interface_description": faker.sentence(),
+            },
+        ],
+        "description": faker.sentence(),
+    }
+
+
+def test_create_imported_edge_port_success(faker, imported_edge_port_creation_input_form_data):
+    result, _, _ = run_workflow("create_imported_edge_port", [imported_edge_port_creation_input_form_data])
+    state = extract_state(result)
+    subscription = ImportedEdgePort.from_subscription(state["subscription_id"])
+    assert_complete(result)
+    assert subscription.status == SubscriptionLifecycle.ACTIVE
diff --git a/test/workflows/edge_port/test_import_edge_port.py b/test/workflows/edge_port/test_import_edge_port.py
new file mode 100644
index 00000000..46405d50
--- /dev/null
+++ b/test/workflows/edge_port/test_import_edge_port.py
@@ -0,0 +1,17 @@
+import pytest
+from orchestrator.types import SubscriptionLifecycle
+
+from gso.products import EdgePort, ProductName
+from test.workflows import assert_complete, run_workflow
+
+
+@pytest.mark.workflow()
+def test_import_edge_port_success(edge_port_subscription_factory):
+    imported_edge_port = edge_port_subscription_factory(is_imported=False)
+    result, _, _ = run_workflow("import_edge_port", [{"subscription_id": imported_edge_port}])
+    subscription = EdgePort.from_subscription(imported_edge_port)
+
+    assert_complete(result)
+    assert subscription.product.name == ProductName.EDGE_PORT
+    assert subscription.status == SubscriptionLifecycle.ACTIVE
+    assert subscription.insync
-- 
GitLab