Skip to content
Snippets Groups Projects
Verified Commit 19b26d17 authored by Karel van Klink's avatar Karel van Klink :smiley_cat:
Browse files

Add GÉANT IP migration workflow

parent aead3144
No related branches found
No related tags found
1 merge request!286Add Edge Port, GÉANT IP and IAS products
......@@ -66,6 +66,12 @@ new_workflows = [
"description": "Modify G\u00c9ANT IP",
"product_type": "GeantIP"
},
{
"name": "migrate_geant_ip",
"target": "MODIFY",
"description": "Migrate G\u00c9ANT IP",
"product_type": "GeantIP"
},
{
"name": "create_imported_geant_ip",
"target": "CREATE",
......
......@@ -48,6 +48,7 @@
"create_geant_ip": "Create GÉANT IP",
"deploy_twamp": "Deploy TWAMP",
"migrate_iptrunk": "Migrate IP Trunk",
"migrate_geant_ip": "Migrate GÉANT IP",
"modify_isis_metric": "Modify the ISIS metric",
"modify_site": "Modify Site",
"modify_trunk_interface": "Modify IP Trunk interface",
......
......@@ -92,3 +92,4 @@ LazyWorkflowInstance("gso.workflows.geant_ip.create_geant_ip", "create_geant_ip"
LazyWorkflowInstance("gso.workflows.geant_ip.modify_geant_ip", "modify_geant_ip")
LazyWorkflowInstance("gso.workflows.geant_ip.create_imported_geant_ip", "create_imported_geant_ip")
LazyWorkflowInstance("gso.workflows.geant_ip.import_geant_ip", "import_geant_ip")
LazyWorkflowInstance("gso.workflows.geant_ip.migrate_geant_ip", "migrate_geant_ip")
"""A modification workflow that migrates a GÉANT IP subscription to a different set of Edge Ports."""
from typing import Annotated
from annotated_types import Len
from orchestrator import workflow
from orchestrator.targets import Target
from orchestrator.workflow import StepList, begin, done, step
from orchestrator.workflows.steps import resync, store_process_subscription, unsync
from orchestrator.workflows.utils import wrap_modify_initial_input_form
from pydantic import AfterValidator, BaseModel, ConfigDict, Field
from pydantic_forms.core import FormPage
from pydantic_forms.types import FormGenerator, State, UUIDstr
from pydantic_forms.validators import Choice, Divider
from gso.products.product_types.edge_port import EdgePort
from gso.products.product_types.geant_ip import GeantIP
from gso.services.subscriptions import get_active_edge_port_subscriptions
from gso.utils.types.tt_number import TTNumber
def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
"""Gather input from the operator on what new Edge Ports this GÉANT IP should migrate to."""
subscription = GeantIP.from_subscription(subscription_id)
partner_id = subscription.customer_id
edge_port_count = len(subscription.geant_ip.geant_ip_ap_list)
def _new_edge_port_selector(pid: UUIDstr) -> Choice:
existing_ep_name_list = [
ap.geant_ip_sbp.edge_port.owner_subscription_id for ap in subscription.geant_ip.geant_ip_ap_list
]
edge_port_subscriptions = list(
filter(
lambda ep: bool(ep["customer_id"] == pid) and ep["subscription_id"] not in existing_ep_name_list,
get_active_edge_port_subscriptions(includes=["subscription_id", "description", "customer_id"]),
)
)
edge_ports = {str(port["subscription_id"]): port["description"] for port in edge_port_subscriptions}
return Choice(
"Select an Edge Port",
zip(edge_ports.keys(), edge_ports.items(), strict=True), # type: ignore[arg-type]
)
class NewEdgePortSelection(BaseModel):
old_edge_port: str
new_edge_port: _new_edge_port_selector(partner_id) | str # type: ignore[valid-type]
def _validate_new_edge_ports_are_unique(edge_ports: list[NewEdgePortSelection]) -> list[NewEdgePortSelection]:
new_edge_ports = [str(port.new_edge_port) for port in edge_ports]
if len(new_edge_ports) != len(set(new_edge_ports)):
msg = "New Edge Ports must be unique"
raise ValueError(msg)
return edge_ports
class GeantIPEdgePortSelectionForm(FormPage):
model_config = ConfigDict(title="Migrating GÉANT IP to a new set of Edge Ports")
tt_number: TTNumber
divider: Divider = Field(None, exclude=True)
edge_port_selection: Annotated[
list[NewEdgePortSelection],
AfterValidator(_validate_new_edge_ports_are_unique),
Len(min_length=edge_port_count, max_length=edge_port_count),
] = [ # noqa: RUF012
NewEdgePortSelection(
old_edge_port=f"{
EdgePort.from_subscription(ap.geant_ip_sbp.edge_port.owner_subscription_id).description
} ({ap.nren_ap_type})",
new_edge_port="",
)
for ap in subscription.geant_ip.geant_ip_ap_list
]
ep_user_input = yield GeantIPEdgePortSelectionForm
return {"subscription_id": subscription_id, "subscription": subscription} | ep_user_input.model_dump()
@step("Update subscription model")
def update_subscription_model(subscription: GeantIP, edge_port_selection: list[dict]) -> State:
"""Update the subscription model with the new list of Access Ports."""
for index, selected_port in enumerate(edge_port_selection):
subscription.geant_ip.geant_ip_ap_list[index].geant_ip_sbp.edge_port = EdgePort.from_subscription(
selected_port["new_edge_port"]
).edge_port
return {"subscription": subscription}
@workflow(
"Migrate GÉANT IP",
initial_input_form=wrap_modify_initial_input_form(initial_input_form_generator),
target=Target.MODIFY,
)
def migrate_geant_ip() -> StepList:
"""Migrate a GÉANT IP to a new set of Edge Ports."""
return begin >> store_process_subscription(Target.MODIFY) >> unsync >> update_subscription_model >> resync >> done
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment