Skip to content
Snippets Groups Projects
imports.py 27.05 KiB
"""CLI commands for importing data to coreDB."""

import csv
import ipaddress
import json
import time
from datetime import UTC, datetime
from pathlib import Path
from typing import Self, TypeVar

import typer
import yaml
from orchestrator.db import db
from orchestrator.services.processes import start_process
from orchestrator.types import SubscriptionLifecycle
from pydantic import BaseModel, NonNegativeInt, ValidationError, field_validator, model_validator
from pydantic_forms.types import UUIDstr
from sqlalchemy.exc import SQLAlchemyError

from gso.db.models import PartnerTable
from gso.products import ProductName, ProductType
from gso.products.product_blocks.bgp_session import IPFamily
from gso.products.product_blocks.edge_port import EdgePortType, EncapsulationType
from gso.products.product_blocks.ias import IASFlavor
from gso.products.product_blocks.iptrunk import IptrunkType
from gso.products.product_blocks.layer_2_circuit import Layer2CircuitType
from gso.products.product_blocks.router import RouterRole
from gso.products.product_blocks.switch import SwitchModel
from gso.products.product_types.edge_port import EdgePort
from gso.products.product_types.layer_2_circuit import Layer2CircuitServiceType
from gso.services.partners import (
    PartnerEmail,
    PartnerName,
    PartnerNotFoundError,
    get_partner_by_name,
)
from gso.services.subscriptions import (
    get_active_edge_port_subscriptions,
    get_active_router_subscriptions,
    get_active_subscriptions_by_field_and_value,
    get_subscriptions,
)
from gso.utils.shared_enums import SBPType, Vendor
from gso.utils.types.base_site import BaseSiteValidatorModel
from gso.utils.types.geant_ids import IMPORTED_GA_ID, IMPORTED_GS_ID
from gso.utils.types.interfaces import BandwidthString, LAGMember, LAGMemberList, PhysicalPortCapacity
from gso.utils.types.ip_address import (
    IPAddress,
    IPv4AddressType,
    IPv4Netmask,
    IPv6AddressType,
    IPv6Netmask,
    PortNumber,
)
from gso.utils.types.virtual_identifiers import VC_ID, VLAN_ID
from gso.workflows.l3_core_service.shared import L3_CREAT_IMPORTED_WF_MAP, L3ProductNameType

app: typer.Typer = typer.Typer()
IMPORT_WAIT_MESSAGE = "Waiting for the dust to settle before importing new products..."


class CreatePartner(BaseModel):
    """Required inputs for creating a partner."""

    name: PartnerName
    email: PartnerEmail


class SiteImportModel(BaseSiteValidatorModel):
    """The required input for importing an existing `gso.products.product_types.site`."""


class RouterImportModel(BaseModel):
    """Required fields for importing an existing `gso.product.product_types.router`."""

    partner: str
    router_site: str
    hostname: str
    ts_port: int
    router_vendor: Vendor
    router_role: RouterRole
    router_lo_ipv4_address: IPv4AddressType
    router_lo_ipv6_address: IPv6AddressType
    router_lo_iso_address: str


class SwitchImportModel(BaseModel):
    """Required fields for importing an existing `gso.product.product_types.switch`."""

    fqdn: str
    ts_port: PortNumber
    site: UUIDstr
    switch_vendor: Vendor
    switch_model: SwitchModel


class SuperPopSwitchImportModel(BaseModel):
    """Required fields for importing an existing `gso.product.product_types.super_pop_switch`."""

    partner: str
    super_pop_switch_site: str
    hostname: str
    super_pop_switch_ts_port: PortNumber
    super_pop_switch_mgmt_ipv4_address: ipaddress.IPv4Address


class OfficeRouterImportModel(BaseModel):
    """Required fields for importing an existing `gso.product.product_types.office_router`."""

    partner: str
    office_router_site: str
    office_router_fqdn: str
    office_router_ts_port: PortNumber
    office_router_lo_ipv4_address: ipaddress.IPv4Address
    office_router_lo_ipv6_address: ipaddress.IPv6Address


class IptrunkImportModel(BaseModel):
    """Required fields for importing an existing `gso.products.product_types.iptrunk`."""

    partner: str
    gs_id: IMPORTED_GS_ID | None
    iptrunk_type: IptrunkType
    iptrunk_description: str | None = None
    iptrunk_speed: PhysicalPortCapacity
    iptrunk_minimum_links: int
    iptrunk_isis_metric: int
    side_a_node_id: str
    side_a_ae_iface: str
    side_a_ga_id: IMPORTED_GA_ID | None
    side_a_ae_members: LAGMemberList[LAGMember]
    side_b_node_id: str
    side_b_ae_iface: str
    side_b_ga_id: IMPORTED_GA_ID | None
    side_b_ae_members: LAGMemberList[LAGMember]

    iptrunk_ipv4_network: ipaddress.IPv4Network
    iptrunk_ipv6_network: ipaddress.IPv6Network

    @classmethod
    def _get_active_routers(cls) -> set[str]:
        return {
            str(router["subscription_id"]) for router in get_active_router_subscriptions(includes=["subscription_id"])
        }

    @field_validator("partner")
    def check_if_partner_exists(cls, value: str) -> str:
        """Validate that the partner exists."""
        try:
            get_partner_by_name(value)
        except PartnerNotFoundError as e:
            msg = f"partner {value} not found"
            raise ValueError(msg) from e

        return value

    @field_validator("side_a_node_id", "side_b_node_id")
    def check_if_router_side_is_available(cls, value: str) -> str:
        """Both sides of the trunk must exist in GSO."""
        if value not in cls._get_active_routers():
            msg = f"Router {value} not found"
            raise ValueError(msg)

        return value

    @model_validator(mode="after")
    def check_members(self) -> Self:
        """Amount of LAG members has to match on side A and B, and meet the minimum requirement."""
        len_a = len(self.side_a_ae_members)
        len_b = len(self.side_b_ae_members)

        if len_a < self.iptrunk_minimum_links:
            msg = f"Side A members should be at least {self.iptrunk_minimum_links} (iptrunk_minimum_links)"
            raise ValueError(msg)

        if len_a != len_b:
            msg = "Mismatch between Side A and B members"
            raise ValueError(msg)

        return self


class OpenGearImportModel(BaseModel):
    """Required fields for importing an existing `gso.products.product_types.opengear`."""

    partner: str
    opengear_site: str
    opengear_hostname: str
    opengear_wan_address: IPv4AddressType
    opengear_wan_netmask: IPv4AddressType
    opengear_wan_gateway: IPv4AddressType


class EdgePortImportModel(BaseModel):
    """Required fields for importing an existing `gso.products.product_types.edge_port`."""

    node: str
    service_type: EdgePortType
    speed: PhysicalPortCapacity
    encapsulation: EncapsulationType
    name: str
    minimum_links: int
    ga_id: IMPORTED_GA_ID | None
    mac_address: str | None
    partner: str
    enable_lacp: bool
    ignore_if_down: bool
    ae_members: LAGMemberList[LAGMember]
    description: str | None = None
    custom_service_name: str | None = None

    @field_validator("partner")
    def check_if_partner_exists(cls, value: str) -> str:
        """Validate that the partner exists."""
        try:
            get_partner_by_name(value)
        except PartnerNotFoundError as e:
            msg = f"Partner {value} not found"
            raise ValueError(msg) from e

        return value

    @field_validator("node")
    def validate_node(cls, value: str) -> str:
        """Check if the node is an active PE router in GSO."""
        pe_routers = {
            str(router.subscription_id)
            for router in get_active_subscriptions_by_field_and_value("router_role", RouterRole.PE)
        }
        if value not in pe_routers:
            msg = f"Router {value} not found"
            raise ValueError(msg)

        return value

    @model_validator(mode="after")
    def check_members(self) -> Self:
        """Amount of LAG members has to match and meet the minimum requirement."""
        if len(self.ae_members) < self.minimum_links:
            msg = f"Number of members should be at least {self.minimum_links} (edge_port_minimum_links)"
            raise ValueError(msg)
        return self


class L3CoreServiceImportModel(BaseModel):
    """Import L3 Core Service model."""

    class BaseBGPPeer(BaseModel):
        """Base BGP Peer model."""

        bfd_enabled: bool = False
        has_custom_policies: bool = False
        authentication_key: str | None
        multipath_enabled: bool = False
        send_default_route: bool = False
        is_passive: bool = False
        peer_address: IPAddress
        families: list[IPFamily]
        is_multi_hop: bool
        rtbh_enabled: bool  # whether Remote Triggered Blackhole is enabled
        prefix_limit: NonNegativeInt | None = None
        ttl_security: NonNegativeInt | None = None

    class BFDSettingsModel(BaseModel):
        """BFD Settings model."""

        bfd_enabled: bool = False
        bfd_interval_rx: int | None = None
        bfd_interval_tx: int | None = None
        bfd_multiplier: int | None = None

    class ServiceBindingPort(BaseModel):
        """Service Binding model."""

        edge_port: str
        ap_type: str
        custom_service_name: str | None = None
        gs_id: IMPORTED_GS_ID
        sbp_type: SBPType = SBPType.L3
        is_tagged: bool = False
        vlan_id: VLAN_ID
        custom_firewall_filters: bool = False
        ipv4_address: IPv4AddressType
        ipv4_mask: IPv4Netmask
        ipv6_address: IPv6AddressType
        ipv6_mask: IPv6Netmask
        is_multi_hop: bool = True
        bgp_peers: list["L3CoreServiceImportModel.BaseBGPPeer"]
        v4_bfd_settings: "L3CoreServiceImportModel.BFDSettingsModel"
        v6_bfd_settings: "L3CoreServiceImportModel.BFDSettingsModel"

    partner: str
    service_binding_ports: list[ServiceBindingPort]
    product_name: L3ProductNameType

    @field_validator("partner")
    def check_if_partner_exists(cls, value: str) -> str:
        """Validate that the partner exists."""
        try:
            get_partner_by_name(value)
        except PartnerNotFoundError as e:
            msg = f"Partner {value} not found"
            raise ValueError(msg) from e

        return value

    @field_validator("service_binding_ports")
    def validate_node(cls, value: list[ServiceBindingPort]) -> list[ServiceBindingPort]:
        """Check if the Service Binding Ports are valid."""
        edge_ports = [str(subscription.subscription_id) for subscription in get_active_edge_port_subscriptions()]
        for sbp in value:
            if sbp.edge_port not in edge_ports:
                msg = f"Edge Port {sbp.edge_port} not found"
                raise ValueError(msg)

        return value


class IASImportModel(L3CoreServiceImportModel):
    """Import IAS model."""

    ias_flavor: IASFlavor = IASFlavor.IAS_PS_OPT_OUT


class LanSwitchInterconnectRouterSideImportModel(BaseModel):
    """Import LAN Switch Interconnect Router side model."""

    node: UUIDstr
    ae_iface: str
    ae_members: LAGMemberList[LAGMember]


class LanSwitchInterconnectSwitchSideImportModel(BaseModel):
    """Import LAN Switch Interconnect Switch side model."""

    switch: UUIDstr
    ae_iface: str
    ae_members: LAGMemberList[LAGMember]


class LanSwitchInterconnectImportModel(BaseModel):
    """Import LAN Switch Interconnect model."""

    lan_switch_interconnect_description: str
    minimum_links: int
    router_side: LanSwitchInterconnectRouterSideImportModel
    switch_side: LanSwitchInterconnectSwitchSideImportModel


class Layer2CircuitServiceImportModel(BaseModel):
    """Import Layer 2 Circuit Service model."""

    class ServiceBindingPortInput(BaseModel):
        """Service Binding Port model."""

        edge_port: UUIDstr
        vlan_id: VLAN_ID

    service_type: Layer2CircuitServiceType
    partner: str
    gs_id: IMPORTED_GS_ID
    vc_id: VC_ID
    layer_2_circuit_side_a: ServiceBindingPortInput
    layer_2_circuit_side_b: ServiceBindingPortInput
    layer_2_circuit_type: Layer2CircuitType
    vlan_range_lower_bound: VLAN_ID | None = None
    vlan_range_upper_bound: VLAN_ID | None = None
    policer_enabled: bool = False
    policer_bandwidth: BandwidthString | None = None
    policer_burst_rate: BandwidthString | None = None
    custom_service_name: str | None = None

    @field_validator("partner")
    def check_if_partner_exists(cls, value: str) -> str:
        """Validate that the partner exists."""
        try:
            get_partner_by_name(value)
        except PartnerNotFoundError as e:
            msg = f"Partner {value} not found"
            raise ValueError(msg) from e

        return value

    @model_validator(mode="after")
    def check_if_edge_ports_exist(self) -> Self:
        """Check if the edge ports exist."""
        for side in [self.layer_2_circuit_side_a, self.layer_2_circuit_side_b]:
            if not EdgePort.from_subscription(side.edge_port):
                msg = f"Edge Port {side.edge_port} not found"
                raise ValueError(msg)
        return self


T = TypeVar(
    "T",
    SiteImportModel,
    RouterImportModel,
    SwitchImportModel,
    IptrunkImportModel,
    SuperPopSwitchImportModel,
    OfficeRouterImportModel,
    OpenGearImportModel,
    EdgePortImportModel,
    L3CoreServiceImportModel,
    LanSwitchInterconnectImportModel,
    Layer2CircuitServiceImportModel,
)

common_filepath_option = typer.Option(
    default="data.json",
    help="Path to the file",
)


def _read_data(file_path: Path) -> dict:
    """Read data from a JSON or YAML file."""
    typer.echo(f"Starting import from {file_path!s}")
    file_extension = file_path.suffix.lower()

    with file_path.open("r") as f:
        supported_extensions = {".json", ".yaml", ".yml"}

        if file_extension == ".json":
            return json.load(f)
        if file_extension in supported_extensions:
            return yaml.safe_load(f)

        typer.echo(f"Unsupported file format: {file_extension}")
        raise typer.Exit(code=1)


def _get_router_subscription_id(node_name: str) -> str | None:
    """Get the subscription id for a router by its node name."""
    subscriptions = get_active_subscriptions_by_field_and_value(
        "router_fqdn",
        node_name,
    )
    if subscriptions:
        return str(subscriptions[0].subscription_id)
    return None


def _import_partners_from_csv(file_path: Path) -> list[dict]:
    """Read partners from a CSV file."""
    with Path.open(file_path, encoding="utf-8") as csv_file:
        csv_reader = csv.DictReader(csv_file)
        return list(csv_reader)


def _generic_import_product(
    file_path: Path, imported_product_type: ProductType, workflow_suffix: str, name_key: str, import_model: type[T]
) -> None:
    """Import subscriptions from a JSON or YAML file."""
    successfully_imported_data = []
    data = _read_data(file_path)
    for details in data:
        details["partner"] = details.get("partner", "GEANT")
        typer.echo(f"Creating imported {name_key}: {details[name_key]}")
        try:
            initial_data = import_model(**details)
            start_process(f"create_imported_{workflow_suffix}", [initial_data.model_dump()])
            successfully_imported_data.append(getattr(initial_data, name_key))
            typer.echo(
                f"Successfully created {name_key}: {getattr(initial_data, name_key)}",
            )
        except ValidationError as e:
            typer.echo(f"Validation error: {e}")

    typer.echo(IMPORT_WAIT_MESSAGE)
    time.sleep(1)

    #  Migrate new products from imported to "full" counterpart.
    imported_products = get_subscriptions(
        [imported_product_type], lifecycles=[SubscriptionLifecycle.ACTIVE], includes=["subscription_id"]
    )
    for subscription_id in imported_products:
        typer.echo(f"Importing {subscription_id}")
        start_process(f"import_{workflow_suffix}", [subscription_id])

    if successfully_imported_data:
        typer.echo(f"Successfully created imported {name_key}s:")
        for item in successfully_imported_data:
            typer.echo(f"- {item}")
        typer.echo(f"Please validate no more imported {workflow_suffix} products exist anymore in the database.")


@app.command()
def import_sites(filepath: str = common_filepath_option) -> None:
    """Import sites into GSO."""
    _generic_import_product(Path(filepath), ProductType.IMPORTED_SITE, "site", "site_name", SiteImportModel)


@app.command()
def import_routers(filepath: str = common_filepath_option) -> None:
    """Import routers into GSO."""
    _generic_import_product(Path(filepath), ProductType.IMPORTED_ROUTER, "router", "hostname", RouterImportModel)


@app.command()
def import_switches(filepath: str = common_filepath_option) -> None:
    """Import switches into GSO."""
    _generic_import_product(Path(filepath), ProductType.IMPORTED_SWITCH, "switch", "fqdn", SwitchImportModel)


@app.command()
def import_super_pop_switches(filepath: str = common_filepath_option) -> None:
    """Import Super PoP Switches into GSO."""
    _generic_import_product(
        Path(filepath),
        ProductType.IMPORTED_SUPER_POP_SWITCH,
        "super_pop_switch",
        "hostname",
        SuperPopSwitchImportModel,
    )


@app.command()
def import_office_routers(filepath: str = common_filepath_option) -> None:
    """Import office routers into GSO."""
    _generic_import_product(
        Path(filepath),
        ProductType.IMPORTED_OFFICE_ROUTER,
        "office_router",
        "office_router_fqdn",
        OfficeRouterImportModel,
    )


@app.command()
def import_opengear(filepath: str = common_filepath_option) -> None:
    """Import Opengear into GSO."""
    _generic_import_product(
        Path(filepath),
        ProductType.IMPORTED_OPENGEAR,
        "opengear",
        "opengear_hostname",
        OpenGearImportModel,
    )


@app.command()
def import_edge_port(filepath: str = common_filepath_option) -> None:
    """Import Edge Port into GSO."""
    successfully_imported_data = []
    data = _read_data(Path(filepath))
    for edge_port in data:
        typer.echo(f"Importing Edge Port {edge_port["name"]} on {edge_port["node"]}. ")
        try:
            edge_port["node"] = _get_router_subscription_id(edge_port["node"])
            initial_data = EdgePortImportModel(**edge_port)
            start_process("create_imported_edge_port", [initial_data.model_dump()])
            successfully_imported_data.append(edge_port["name"])
            typer.echo(f"Successfully imported Edge Port {edge_port["name"]} on {edge_port["node"]}.")
        except ValidationError as e:
            typer.echo(f"Validation error: {e}")

    typer.echo(IMPORT_WAIT_MESSAGE)
    time.sleep(1)

    edge_port_ids = get_subscriptions(
        [ProductType.IMPORTED_EDGE_PORT], lifecycles=[SubscriptionLifecycle.ACTIVE], includes=["subscription_id"]
    )
    for subscription_id in edge_port_ids:
        typer.echo(f"Migrating Edge Port {subscription_id}")
        start_process("import_edge_port", [subscription_id])

    if successfully_imported_data:
        typer.echo("Successfully imported Edge Ports:")
        for item in successfully_imported_data:
            typer.echo(f"- {item}")


@app.command()
def import_iptrunks(filepath: str = common_filepath_option) -> None:
    """Import IP trunks into GSO."""
    successfully_imported_data = []
    data = _read_data(Path(filepath))
    for trunk in data:
        ipv4_network_a = ipaddress.IPv4Network(trunk["config"]["nodeA"]["ipv4_address"], strict=False)
        ipv4_network_b = ipaddress.IPv4Network(trunk["config"]["nodeB"]["ipv4_address"], strict=False)
        ipv6_network_a = ipaddress.IPv6Network(trunk["config"]["nodeA"]["ipv6_address"], strict=False)
        ipv6_network_b = ipaddress.IPv6Network(trunk["config"]["nodeB"]["ipv6_address"], strict=False)
        # Check if IPv4 networks are equal
        if ipv4_network_a == ipv4_network_b:
            iptrunk_ipv4_network = ipv4_network_a
        else:
            # Handle the case where IPv4 networks are different
            typer.echo(f"Error: IPv4 networks are different for trunk {trunk["id"]}.")
            continue
        # Check if IPv6 networks are the same
        if ipv6_network_a == ipv6_network_b:
            iptrunk_ipv6_network = ipv6_network_a
        else:
            # Handle the case where IPv6 networks are different
            typer.echo(f"Error: IPv6 networks are different for trunk {trunk["id"]}.")
            continue

        typer.echo(
            f"Importing IP Trunk: "
            f'{get_active_subscriptions_by_field_and_value("router_fqdn", trunk["config"]["nodeA"]["name"])}',
        )
        try:
            initial_data = IptrunkImportModel(
                partner="GEANT",
                gs_id=trunk["id"],
                iptrunk_type=trunk["config"]["common"]["type"],
                iptrunk_description=trunk["config"]["common"].get("description", ""),
                iptrunk_speed=trunk["config"]["common"]["link_speed"],
                iptrunk_minimum_links=trunk["config"]["common"]["minimum_links"],
                iptrunk_isis_metric=trunk["config"]["common"]["isis_metric"],
                side_a_node_id=_get_router_subscription_id(trunk["config"]["nodeA"]["name"]) or "",
                side_a_ae_iface=trunk["config"]["nodeA"]["ae_name"],
                side_a_ga_id=trunk["config"]["nodeA"]["port_ga_id"],
                side_a_ae_members=trunk["config"]["nodeA"]["members"],
                side_b_node_id=_get_router_subscription_id(trunk["config"]["nodeB"]["name"]) or "",
                side_b_ae_iface=trunk["config"]["nodeB"]["ae_name"],
                side_b_ga_id=trunk["config"]["nodeB"]["port_ga_id"],
                side_b_ae_members=trunk["config"]["nodeB"]["members"],
                iptrunk_ipv4_network=iptrunk_ipv4_network,
                iptrunk_ipv6_network=iptrunk_ipv6_network,
            )
            start_process("create_imported_iptrunk", [initial_data.model_dump()])
            successfully_imported_data.append(trunk["id"])
            typer.echo(f"Successfully imported IP Trunk: {trunk["id"]}")
        except ValidationError as e:
            typer.echo(f"Validation error: {e}")

    typer.echo(IMPORT_WAIT_MESSAGE)
    time.sleep(1)

    trunk_ids = get_subscriptions(
        [ProductType.IMPORTED_IP_TRUNK], lifecycles=[SubscriptionLifecycle.ACTIVE], includes=["subscription_id"]
    )
    for subscription_id in trunk_ids:
        typer.echo(f"Migrating iptrunk {subscription_id}")
        start_process("import_iptrunk", [subscription_id])

    if successfully_imported_data:
        typer.echo("Successfully imported IP Trunks:")
        for item in successfully_imported_data:
            typer.echo(f"- {item}")


@app.command()
def import_partners(file_path: str = typer.Argument(..., help="Path to the CSV file containing partners")) -> None:
    """Import partners from a CSV file into the database."""
    typer.echo(f"Importing partners from {file_path} ...")

    partners = _import_partners_from_csv(Path(file_path))

    try:
        for partner in partners:
            if partner.get("created_at"):
                partner["created_at"] = datetime.strptime(partner["created_at"], "%Y-%m-%d").replace(tzinfo=UTC)

            new_partner = PartnerTable(**CreatePartner(**partner).model_dump())
            db.session.add(new_partner)

        db.session.commit()
        typer.echo(f"Successfully imported {len(partners)} partners.")
    except SQLAlchemyError as e:
        db.session.rollback()
        typer.echo(f"Failed to import partners: {e}")
    finally:
        db.session.close()


@app.command()
def import_l3_core_service(filepath: str = common_filepath_option) -> None:
    """Import L3 Core Services into GSO."""
    successfully_imported_data = []
    l3_core_service_list = _read_data(Path(filepath))

    for l3_core_service in l3_core_service_list:
        partner = l3_core_service[0]["partner"]
        product_name = l3_core_service[0]["product_name"]
        typer.echo(f"Creating imported {product_name} for {partner}")

        try:
            if product_name == ProductName.IAS.value:
                initial_data = IASImportModel(**l3_core_service[0], **l3_core_service[1]).model_dump()
            else:
                initial_data = L3CoreServiceImportModel(**l3_core_service[0], **l3_core_service[1]).model_dump()

            start_process(L3_CREAT_IMPORTED_WF_MAP[product_name], [initial_data])
            edge_ports = [sbp["edge_port"] for sbp in l3_core_service[0]["service_binding_ports"]]
            successfully_imported_data.append(edge_ports)
            typer.echo(f"Successfully created imported {product_name} for {partner}")
        except ValidationError as e:
            typer.echo(f"Validation error: {e}")

    typer.echo(IMPORT_WAIT_MESSAGE)
    time.sleep(1)

    # Migrate new products from imported to "full" counterpart.
    imported_products = get_subscriptions(
        product_types=[
            ProductType.IMPORTED_GEANT_IP,
            ProductType.IMPORTED_IAS,
            ProductType.IMPORTED_LHCONE,
            ProductType.IMPORTED_COPERNICUS,
        ],
        lifecycles=[SubscriptionLifecycle.ACTIVE],
        includes=["subscription_id"],
    )

    for subscription_id in imported_products:
        typer.echo(f"Importing {subscription_id}")
        start_process("import_l3_core_service", [subscription_id])

    if successfully_imported_data:
        typer.echo("Successfully created imported L3 Core Services:")
        for item in successfully_imported_data:
            typer.echo(f"- {item}")


@app.command()
def import_lan_switch_interconnect(filepath: str = common_filepath_option) -> None:
    """Import LAN Switch Interconnect services into GSO."""
    _generic_import_product(
        Path(filepath),
        ProductType.IMPORTED_LAN_SWITCH_INTERCONNECT,
        "lan_switch_interconnect",
        "lan_switch_interconnect_description",
        LanSwitchInterconnectImportModel,
    )


@app.command()
def import_layer_2_circuit_service(filepath: str = common_filepath_option) -> None:
    """Import Layer 2 Circuit services into GSO."""
    successfully_imported_data = []
    layer_2_circuit_service_list = _read_data(Path(filepath))

    for layer_2_circuit_service in layer_2_circuit_service_list:
        partner = layer_2_circuit_service["partner"]
        service_type = Layer2CircuitServiceType(layer_2_circuit_service["service_type"])
        typer.echo(f"Creating imported {service_type} for {partner}")

        try:
            initial_data = Layer2CircuitServiceImportModel(**layer_2_circuit_service)
            start_process("create_imported_layer_2_circuit", [initial_data.model_dump()])
            successfully_imported_data.append(initial_data.vc_id)
            typer.echo(
                f"Successfully created imported {service_type} with virtual circuit ID {initial_data.vc_id}"
                f" for {partner}"
            )
        except ValidationError as e:
            typer.echo(f"Validation error: {e}")
    typer.echo(IMPORT_WAIT_MESSAGE)
    time.sleep(1)

    # Migrate new products from imported to "full" counterpart.
    imported_products = get_subscriptions(
        product_types=[ProductType.IMPORTED_EXPRESSROUTE, ProductType.IMPORTED_GEANT_PLUS],
        lifecycles=[SubscriptionLifecycle.ACTIVE],
        includes=["subscription_id"],
    )

    for subscription_id in imported_products:
        typer.echo(f"Importing {subscription_id}")
        start_process("import_layer_2_circuit", [subscription_id])

    if successfully_imported_data:
        typer.echo("Successfully created imported Layer 2 Circuit services:")
        for item in successfully_imported_data:
            typer.echo(f"- {item}")