"""Workflow for adding TWAMP to an existing IP trunk.""" import json from orchestrator.utils.json import json_dumps from orchestrator.forms import FormPage from orchestrator.forms.validators import Label from orchestrator.targets import Target from orchestrator.types import FormGenerator, State, UUIDstr from orchestrator.workflow import StepList, done, init, step, workflow from orchestrator.workflows.steps import resync, store_process_subscription, unsync from orchestrator.workflows.utils import wrap_modify_initial_input_form from pydantic import validator from gso.products.product_types.iptrunk import Iptrunk from gso.services.lso_client import execute_playbook, lso_interaction from gso.utils.helpers import validate_tt_number def _initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: trunk = Iptrunk.from_subscription(subscription_id) class DeployTWAMPForm(FormPage): info_label: Label = ( "Please confirm deployment of TWAMP on IP trunk from " f"{trunk.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn} to " f"{trunk.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn}" # type: ignore[assignment] ) tt_number: str @validator("tt_number", allow_reuse=True) def validate_tt_number(cls, tt_number: str) -> str: return validate_tt_number(tt_number) user_input = yield DeployTWAMPForm return user_input.dict() @step("[DRY RUN] Deploy TWAMP on both sides") def deploy_twamp_dry(subscription: Iptrunk, process_id: UUIDstr, callback_route: str, tt_number: str) -> State: """Perform a dry run of deploying the TWAMP session.""" extra_vars = { "subscription": json.loads(json_dumps(subscription)), "process_id": process_id, "dry_run": True, "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Deploy TWAMP", } inventory = ( f"{subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn}" f"\n{subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn}" ) execute_playbook("deploy_twamp.yaml", callback_route, inventory, extra_vars) return {"subscription": subscription} @step("[FOR REAL] Deploy TWAMP on both sides") def deploy_twamp_real(subscription: Iptrunk, process_id: UUIDstr, callback_route: str, tt_number: str) -> State: """Deploy the TWAMP session.""" extra_vars = { "subscription": json.loads(json_dumps(subscription)), "process_id": process_id, "dry_run": False, "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Deploy TWAMP", } inventory = ( f"{subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn}" f"\n{subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn}" ) execute_playbook("deploy_twamp.yaml", callback_route, inventory, extra_vars) return {"subscription": subscription} @workflow( "Deploy TWAMP", initial_input_form=wrap_modify_initial_input_form( _initial_input_form_generator), target=Target.MODIFY, ) def deploy_twamp() -> StepList: """Deploy a TWAMP session on an IP trunk. * Run the TWAMP playbook, including an initial dry run """ return ( init >> store_process_subscription(Target.MODIFY) >> unsync >> lso_interaction(deploy_twamp_dry) >> lso_interaction(deploy_twamp_real) >> resync >> done )