diff --git a/gso/workflows/iptrunk/migrate_iptrunk.py b/gso/workflows/iptrunk/migrate_iptrunk.py index 46945c724962c5fff742009b25e8ebf355fbe5e4..d2abb74e9d2af26c07d0fd69c586d8821eeb3bde 100644 --- a/gso/workflows/iptrunk/migrate_iptrunk.py +++ b/gso/workflows/iptrunk/migrate_iptrunk.py @@ -2,17 +2,16 @@ import re from typing import NoReturn from orchestrator import step, workflow -from orchestrator.db import SubscriptionTable, ProductTable +from orchestrator.db import ProductTable, SubscriptionTable from orchestrator.forms import FormPage from orchestrator.forms.validators import Choice, UniqueConstrainedList from orchestrator.targets import Target -from orchestrator.types import UUIDstr, FormGenerator -from orchestrator.workflow import StepList, init, done -from orchestrator.workflows.steps import store_process_subscription, unsync, resync +from orchestrator.types import FormGenerator, State, UUIDstr +from orchestrator.workflow import StepList, done, init +from orchestrator.workflows.steps import resync, store_process_subscription, unsync from orchestrator.workflows.utils import wrap_modify_initial_input_form -from pydantic import validator - from products import Iptrunk +from pydantic import validator def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: @@ -20,15 +19,18 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: routers = {} for router_id, router_description in ( - SubscriptionTable.query.join(ProductTable) - .filter( - ProductTable.product_type == "Router", - SubscriptionTable.status == "active", - ) - .with_entities(SubscriptionTable.subscription_id, SubscriptionTable.description) - .all() + SubscriptionTable.query.join(ProductTable) + .filter( + ProductTable.product_type == "Router", + SubscriptionTable.status == "active", + ) + .with_entities(SubscriptionTable.subscription_id, SubscriptionTable.description) + .all() ): - if router_id not in [subscription.iptrunk.iptrunk_sideA_node, subscription.iptrunk.iptrunk_sideB_node]: + if router_id not in [ + subscription.iptrunk.iptrunk_sideA_node.subscription.subscription_id, + subscription.iptrunk.iptrunk_sideB_node.subscription.subscription_id, + ]: routers[str(router_id)] = router_description NewRouterEnum = Choice("Select a new router", zip(routers.keys(), routers.items())) # type: ignore @@ -39,17 +41,24 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: class ModifyIptrunkForm(FormPage): class Config: - title = f"Subscription {subscription.iptrunk.geant_s_sid} from " \ - f"{subscription.iptrunk.iptrunk_sideA_node.router_fqdn} to " \ - f"{subscription.iptrunk.iptrunk_sideB_node.router_fqdn}" + title = ( + f"Subscription {subscription.iptrunk.geant_s_sid} from " + f"{subscription.iptrunk.iptrunk_sideA_node.router_fqdn} to " + f"{subscription.iptrunk.iptrunk_sideB_node.router_fqdn}" + ) - replace_side = Choice("Select the side of the IP trunk to be replaced", - [subscription.iptrunk.iptrunk_sideA_node, subscription.iptrunk.iptrunk_sideB_node]) + replace_side = Choice( + "Select the side of the IP trunk to be replaced", + [ # type: ignore + (subscription.iptrunk.iptrunk_sideA_node.router_fqdn, subscription.iptrunk.iptrunk_sideA_node), + (subscription.iptrunk.iptrunk_sideB_node.router_fqdn, subscription.iptrunk.iptrunk_sideB_node), + ], + ) new_node: NewRouterEnum # type: ignore new_lag_interface: str new_lag_member_interfaces = LagMemberList - @validator("new_lag_interface", pre=True, always=True) + @validator("new_lag_interface", allow_reuse=True, pre=True, always=True) def lag_interface_proper_name(cls, new_lag_name: str) -> str | NoReturn: nokia_lag_re = re.compile("^lag-\\d+$") juniper_lag_re = re.compile("^ae\\d{1,2}$") @@ -59,12 +68,14 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: raise ValueError("Invalid LAG name, please try again.") - yield ModifyIptrunkForm + user_input = yield ModifyIptrunkForm + + return user_input.dict() @step("Hilversum Stinks") -def temp_test_step(subscription: Iptrunk) -> Iptrunk: - return subscription +def temp_test_step(subscription: Iptrunk) -> State: + return {"test": subscription} @workflow( @@ -73,11 +84,4 @@ def temp_test_step(subscription: Iptrunk) -> Iptrunk: target=Target.MODIFY, ) def migrate_iptrunk() -> StepList: - return ( - init - >> store_process_subscription(Target.MODIFY) - >> unsync - >> temp_test_step - >> resync - >> done - ) + return init >> store_process_subscription(Target.MODIFY) >> unsync >> temp_test_step >> resync >> done