Skip to content
Snippets Groups Projects

Draft: add IAS to R&E Interconnect and its imported models and workflows

Closed Mohammad Torkashvand requested to merge feature/NAT-732-ias-to-re-interconnect into develop
Files
15
+ 63
0
@@ -27,6 +27,7 @@ from gso.services.partners import (
get_partner_by_name,
)
from gso.services.subscriptions import (
get_active_pe_router_subscriptions,
get_active_router_subscriptions,
get_active_subscriptions_by_field_and_value,
get_subscriptions,
@@ -187,6 +188,28 @@ class OpenGearImportModel(BaseModel):
opengear_wan_gateway: IPv4AddressType
class IasToReInterconnectImportModel(BaseModel):
"""Required fields for importing an existing :class:`gso.products.product_types.ias_to_re_interconnect.`."""
partner: str
ipv4_network: ipaddress.IPv4Network
ipv6_network: ipaddress.IPv6Network
router_id: str
@classmethod
def _get_active_pe_routers(cls) -> set[str]:
return {str(router.subscription_id) for router in get_active_pe_router_subscriptions()}
@field_validator("router_id")
def check_if_router_is_available(cls, value: str) -> str:
"""Router must exist in :term:`GSO`."""
if value not in cls._get_active_pe_routers():
msg = f"PE Router {value} not found"
raise ValueError(msg)
return value
T = TypeVar(
"T",
SiteImportModel,
@@ -195,6 +218,7 @@ T = TypeVar(
SuperPopSwitchImportModel,
OfficeRouterImportModel,
OpenGearImportModel,
IasToReInterconnectImportModel,
)
common_filepath_option = typer.Option(
@@ -323,6 +347,45 @@ def import_opengear(filepath: str = common_filepath_option) -> None:
)
@app.command()
def import_ias_to_re_interconnect(filepath: str = common_filepath_option) -> None:
"""Import IasToReInterconnect into GSO."""
successfully_imported_data = []
data = _read_data(Path(filepath))
for details in data:
details["partner"] = "GEANT"
key = f"{details["router_id"]}-{details["ipv4_network"]}-{details["ipv6_network"]}"
typer.echo(f"Creating imported IAS to R&E Interconnect: {key}")
try:
initial_data = IasToReInterconnectImportModel(**details)
start_process("create_imported_ias_to_re_interconnect", [initial_data.model_dump()])
successfully_imported_data.append(key)
typer.echo(
f"Successfully created {key}",
)
except ValidationError as e:
typer.echo(f"Validation error: {e}")
typer.echo("Waiting for the dust to settle before moving on the importing new products...")
time.sleep(1)
# Migrate new products from imported to "full" counterpart.
imported_products = get_subscriptions(
[ProductType.IMPORTED_IAS_TO_RE_INTERCONNECT],
lifecycles=[SubscriptionLifecycle.ACTIVE],
includes=["subscription_id"],
)
for subscription_id in imported_products:
typer.echo(f"Importing {subscription_id}")
start_process("import_ias_to_re_interconnect", [subscription_id])
if successfully_imported_data:
typer.echo("Successfully created imported IasToReInterconnects:")
for item in successfully_imported_data:
typer.echo(f"- {item}")
typer.echo("Please validate no more imported IasToReInterconnect products exist anymore in the database.")
@app.command()
def import_iptrunks(filepath: str = common_filepath_option) -> None:
"""Import IP trunks into GSO."""
Loading