Newer
Older
"""Workflow for adding TWAMP to an existing IP trunk."""
import json
from orchestrator.utils.json import json_dumps
from orchestrator.forms import FormPage
from orchestrator.forms.validators import Label
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, State, UUIDstr
from orchestrator.workflow import StepList, done, init, step, workflow
from orchestrator.workflows.steps import resync, store_process_subscription, unsync
from orchestrator.workflows.utils import wrap_modify_initial_input_form
from gso.products.product_types.iptrunk import Iptrunk
from gso.services.lso_client import execute_playbook, lso_interaction
from gso.utils.helpers import validate_tt_number
def _initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
trunk = Iptrunk.from_subscription(subscription_id)
class DeployTWAMPForm(FormPage):
info_label: Label = (
"Please confirm deployment of TWAMP on IP trunk from "
f"{trunk.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn} to "
f"{trunk.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn}" # type: ignore[assignment]
)
@validator("tt_number", allow_reuse=True)
def validate_tt_number(cls, tt_number: str) -> str:
return validate_tt_number(tt_number)
user_input = yield DeployTWAMPForm
return user_input.dict()
@step("[DRY RUN] Deploy TWAMP on both sides")
def deploy_twamp_dry(subscription: Iptrunk, process_id: UUIDstr, callback_route: str, tt_number: str) -> State:
"""Perform a dry run of deploying the TWAMP session."""
extra_vars = {
"subscription": json.loads(json_dumps(subscription)),
"process_id": process_id,
"dry_run": True,
"commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Deploy TWAMP",
}
inventory = (
f"{subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn}"
f"\n{subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn}"
)
execute_playbook("deploy_twamp.yaml", callback_route, inventory, extra_vars)
return {"subscription": subscription}
@step("[FOR REAL] Deploy TWAMP on both sides")
def deploy_twamp_real(subscription: Iptrunk, process_id: UUIDstr, callback_route: str, tt_number: str) -> State:
"""Deploy the TWAMP session."""
extra_vars = {
"subscription": json.loads(json_dumps(subscription)),
"process_id": process_id,
"dry_run": False,
"commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Deploy TWAMP",
}
inventory = (
f"{subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn}"
f"\n{subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn}"
)
execute_playbook("deploy_twamp.yaml", callback_route, inventory, extra_vars)
return {"subscription": subscription}
@workflow(
"Deploy TWAMP",
initial_input_form=wrap_modify_initial_input_form(
_initial_input_form_generator),
target=Target.MODIFY,
)
def deploy_twamp() -> StepList:
"""Deploy a TWAMP session on an IP trunk.
* Run the TWAMP playbook, including an initial dry run
"""
return (
init
>> store_process_subscription(Target.MODIFY)
>> unsync
>> lso_interaction(deploy_twamp_dry)
>> lso_interaction(deploy_twamp_real)