"""Workflow for adding :term:`TWAMP` to an existing IP trunk.""" import json from orchestrator.forms import FormPage from orchestrator.forms.validators import Label from orchestrator.targets import Target from orchestrator.types import FormGenerator, State, UUIDstr from orchestrator.utils.json import json_dumps from orchestrator.workflow import StepList, begin, done, step, workflow from orchestrator.workflows.steps import resync, store_process_subscription, unsync from orchestrator.workflows.utils import wrap_modify_initial_input_form from gso.products.product_types.iptrunk import Iptrunk from gso.services.lso_client import execute_playbook, lso_interaction from gso.utils.types import TTNumber def _initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: trunk = Iptrunk.from_subscription(subscription_id) class DeployTWAMPForm(FormPage): info_label: Label = ( "Please confirm deployment of TWAMP on IP trunk from " f"{trunk.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn} to " f"{trunk.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn}" ) tt_number: TTNumber user_input = yield DeployTWAMPForm return user_input.model_dump() @step("[DRY RUN] Deploy TWAMP on both sides") def deploy_twamp_dry(subscription: Iptrunk, process_id: UUIDstr, callback_route: str, tt_number: str) -> State: """Perform a dry run of deploying the :term:`TWAMP` session.""" extra_vars = { "subscription": json.loads(json_dumps(subscription)), "process_id": process_id, "dry_run": True, "verb": "deploy", "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Deploy TWAMP", } inventory = ( f"{subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn}" f"\n{subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn}" ) execute_playbook("deploy_twamp.yaml", callback_route, inventory, extra_vars) return {"subscription": subscription} @step("[FOR REAL] Deploy TWAMP on both sides") def deploy_twamp_real(subscription: Iptrunk, process_id: UUIDstr, callback_route: str, tt_number: str) -> State: """Deploy the :term:`TWAMP` session.""" extra_vars = { "subscription": json.loads(json_dumps(subscription)), "process_id": process_id, "dry_run": False, "verb": "deploy", "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Deploy TWAMP", } inventory = ( f"{subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn}" f"\n{subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn}" ) execute_playbook("deploy_twamp.yaml", callback_route, inventory, extra_vars) return {"subscription": subscription} @step("Check TWAMP status on both sides") def check_twamp_status(subscription: Iptrunk, callback_route: str) -> State: """Check TWAMP session.""" extra_vars = { "subscription": json.loads(json_dumps(subscription)), "verb": "check_twamp", } inventory = ( f"{subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn}" f"\n{subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn}" ) execute_playbook("deploy_twamp.yaml", callback_route, inventory, extra_vars) return {"subscription": subscription} @workflow( "Deploy TWAMP", initial_input_form=wrap_modify_initial_input_form(_initial_input_form_generator), target=Target.MODIFY, ) def deploy_twamp() -> StepList: """Deploy a :term:`TWAMP` session on an IP trunk. * Run the :term:`TWAMP` playbook, including an initial dry run """ return ( begin >> store_process_subscription(Target.MODIFY) >> unsync >> lso_interaction(deploy_twamp_dry) >> lso_interaction(deploy_twamp_real) >> lso_interaction(check_twamp_status) >> resync >> done )