Skip to content
Snippets Groups Projects

Add Edge Port, GÉANT IP and IAS products

Merged Karel van Klink requested to merge feature/add-geant-ip into develop
All threads resolved!
14 files
+ 506
15
Compare changes
  • Side-by-side
  • Inline
Files
14
+ 85
1
@@ -18,6 +18,7 @@ from sqlalchemy.exc import SQLAlchemyError
from gso.db.models import PartnerTable
from gso.products import ProductType
from gso.products.product_blocks.edge_port import EdgePortType, EncapsulationType
from gso.products.product_blocks.iptrunk import IptrunkType
from gso.products.product_blocks.router import RouterRole
from gso.services.partners import (
@@ -161,6 +162,56 @@ class OpenGearImportModel(BaseModel):
opengear_wan_gateway: IPv4AddressType
class EdgePortImportModel(BaseModel):
"""Required fields for importing an existing :class:`gso.products.product_types.edge_port`."""
node: str
service_type: EdgePortType
speed: PhysicalPortCapacity
encapsulation: EncapsulationType
name: str
minimum_links: int
geant_ga_id: str | None
mac_address: str | None
partner: str
enable_lacp: bool
ignore_if_down: bool
ae_members: LAGMemberList[LAGMember]
description: str | None = None
@field_validator("partner")
def check_if_partner_exists(cls, value: str) -> str:
"""Validate that the partner exists."""
try:
get_partner_by_name(value)
except PartnerNotFoundError as e:
msg = f"Partner {value} not found"
raise ValueError(msg) from e
return value
@field_validator("node")
def validate_node(cls, value: str) -> str:
"""Check if the node is an active PE router in :term:`GSO`."""
pe_routers = {
str(router.subscription_id)
for router in get_active_subscriptions_by_field_and_value("router_role", RouterRole.PE)
}
if value not in pe_routers:
msg = f"Router {value} not found"
raise ValueError(msg)
return value
@model_validator(mode="after")
def check_members(self) -> Self:
"""Amount of :term:`LAG` members has to match and meet the minimum requirement."""
if len(self.ae_members) < self.minimum_links:
msg = f"Number of members should be at least {self.minimum_links} (edge_port_minimum_links)"
raise ValueError(msg)
return self
T = TypeVar(
"T",
SiteImportModel,
@@ -169,6 +220,7 @@ T = TypeVar(
SuperPopSwitchImportModel,
OfficeRouterImportModel,
OpenGearImportModel,
EdgePortImportModel,
)
common_filepath_option = typer.Option(
@@ -219,7 +271,7 @@ def _generic_import_product(
successfully_imported_data = []
data = _read_data(file_path)
for details in data:
details["partner"] = "GEANT"
details["partner"] = details.get("partner", "GEANT")
typer.echo(f"Creating imported {name_key}: {details[name_key]}")
try:
initial_data = import_model(**details)
@@ -297,6 +349,38 @@ def import_opengear(filepath: str = common_filepath_option) -> None:
)
@app.command()
def import_edge_port(filepath: str = common_filepath_option) -> None:
"""Import Edge Port into GSO."""
successfully_imported_data = []
data = _read_data(Path(filepath))
for edge_port in data:
typer.echo(f"Importing Edge Port {edge_port["name"]} on {edge_port["node"]}. ")
try:
edge_port["node"] = _get_router_subscription_id(edge_port["node"])
initial_data = EdgePortImportModel(**edge_port)
start_process("create_imported_edge_port", [initial_data.model_dump()])
successfully_imported_data.append(edge_port["name"])
typer.echo(f"Successfully imported Edge Port {edge_port["name"]} on {edge_port["node"]}.")
except ValidationError as e:
typer.echo(f"Validation error: {e}")
typer.echo("Waiting for the dust to settle before moving on the importing new products...")
time.sleep(1)
edge_port_ids = get_subscriptions(
[ProductType.IMPORTED_EDGE_PORT], lifecycles=[SubscriptionLifecycle.ACTIVE], includes=["subscription_id"]
)
for subscription_id in edge_port_ids:
typer.echo(f"Migrating Edge Port {subscription_id}")
start_process("import_edge_port", [subscription_id])
if successfully_imported_data:
typer.echo("Successfully imported Edge Ports:")
for item in successfully_imported_data:
typer.echo(f"- {item}")
@app.command()
def import_iptrunks(filepath: str = common_filepath_option) -> None:
"""Import IP trunks into GSO."""
Loading