"""Workflow for adding TWAMP to an existing IP trunk.""" from orchestrator.forms import FormPage from orchestrator.forms.validators import Label from orchestrator.targets import Target from orchestrator.types import FormGenerator, State, UUIDstr from orchestrator.workflow import StepList, done, init, step, workflow from orchestrator.workflows.steps import resync, store_process_subscription, unsync from orchestrator.workflows.utils import wrap_modify_initial_input_form from gso.products.product_types.iptrunk import Iptrunk from gso.services.provisioning_proxy import execute_playbook, pp_interaction def _initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: trunk = Iptrunk.from_subscription(subscription_id) class DeployTWAMPForm(FormPage): info_label: Label = ( "Please confirm deployment of TWAMP on IP trunk from " f"{trunk.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn} to " f"{trunk.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn}" # type: ignore[assignment] ) tt_number: str user_input = yield DeployTWAMPForm return user_input.dict() @step("[DRY RUN] Deploy TWAMP on both sides") def deploy_twamp_dry(subscription: Iptrunk, process_id: UUIDstr, callback_route: str, tt_number: str) -> State: """Perform a dry run of deploying the TWAMP session.""" extra_vars = { "subscription": subscription, "process_id": process_id, "dry_run": True, "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Deploy TWAMP", } inventory = ( f"{subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn}" f"\n{subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn}" ) execute_playbook("deploy_twamp.yaml", callback_route, inventory, extra_vars) return {"subscription": subscription} @step("[FOR REAL] Deploy TWAMP on both sides") def deploy_twamp_real(subscription: Iptrunk, process_id: UUIDstr, callback_route: str, tt_number: str) -> State: """Deploy the TWAMP session.""" extra_vars = { "subscription": subscription, "process_id": process_id, "dry_run": False, "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Deploy TWAMP", } inventory = ( f"{subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn}" f"\n{subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn}" ) execute_playbook("deploy_twamp.yaml", callback_route, inventory, extra_vars) return {"subscription": subscription} @workflow( "Deploy TWAMP", initial_input_form=wrap_modify_initial_input_form(_initial_input_form_generator), target=Target.MODIFY, ) def deploy_twamp() -> StepList: """Deploy a TWAMP session on an IP trunk. * Run the TWAMP playbook, including an initial dry run """ return ( init >> store_process_subscription(Target.MODIFY) >> unsync >> pp_interaction(deploy_twamp_dry) >> pp_interaction(deploy_twamp_real) >> resync >> done )