Skip to content
Snippets Groups Projects
deploy_twamp.py 4.20 KiB
"""Workflow for adding :term:`TWAMP` to an existing IP trunk."""

import json

from orchestrator.forms import FormPage
from orchestrator.forms.validators import Label
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, UUIDstr
from orchestrator.utils.json import json_dumps
from orchestrator.workflow import StepList, begin, done, step, workflow
from orchestrator.workflows.steps import resync, store_process_subscription, unsync
from orchestrator.workflows.utils import wrap_modify_initial_input_form

from gso.products.product_types.iptrunk import Iptrunk
from gso.services.lso_client import LSOState, lso_interaction
from gso.utils.types.tt_number import TTNumber


def _initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
    trunk = Iptrunk.from_subscription(subscription_id)

    class DeployTWAMPForm(FormPage):
        info_label: Label = (
            "Please confirm deployment of TWAMP on IP trunk from "
            f"{trunk.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn} to "
            f"{trunk.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn}"
        )
        tt_number: TTNumber

    user_input = yield DeployTWAMPForm

    return user_input.model_dump()


@step("[DRY RUN] Deploy TWAMP on both sides")
def deploy_twamp_dry(subscription: Iptrunk, process_id: UUIDstr, tt_number: str) -> LSOState:
    """Perform a dry run of deploying the :term:`TWAMP` session."""
    extra_vars = {
        "subscription": json.loads(json_dumps(subscription)),
        "process_id": process_id,
        "dry_run": True,
        "verb": "deploy",
        "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Deploy TWAMP",
    }

    return {
        "playbook_name": "gap_ansible/playbooks/deploy_twamp.yaml",
        "inventory": {
            "all": {
                "hosts": {
                    subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn: None,
                    subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn: None,
                }
            }
        },
        "extra_vars": extra_vars,
    }


@step("[FOR REAL] Deploy TWAMP on both sides")
def deploy_twamp_real(subscription: Iptrunk, process_id: UUIDstr, tt_number: str) -> LSOState:
    """Deploy the :term:`TWAMP` session."""
    extra_vars = {
        "subscription": json.loads(json_dumps(subscription)),
        "process_id": process_id,
        "dry_run": False,
        "verb": "deploy",
        "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Deploy TWAMP",
    }

    return {
        "playbook_name": "gap_ansible/playbooks/deploy_twamp.yaml",
        "inventory": {
            "all": {
                "hosts": {
                    subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn: None,
                    subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn: None,
                }
            }
        },
        "extra_vars": extra_vars,
    }


@step("Check TWAMP status on both sides")
def check_twamp_status(subscription: Iptrunk) -> LSOState:
    """Check TWAMP session."""
    extra_vars = {
        "subscription": json.loads(json_dumps(subscription)),
        "verb": "check_twamp",
    }

    return {
        "playbook_name": "gap_ansible/playbooks/deploy_twamp.yaml",
        "inventory": {
            "all": {
                "hosts": {
                    subscription.iptrunk.iptrunk_sides[0].iptrunk_side_node.router_fqdn: None,
                    subscription.iptrunk.iptrunk_sides[1].iptrunk_side_node.router_fqdn: None,
                }
            }
        },
        "extra_vars": extra_vars,
    }


@workflow(
    "Deploy TWAMP",
    initial_input_form=wrap_modify_initial_input_form(_initial_input_form_generator),
    target=Target.MODIFY,
)
def deploy_twamp() -> StepList:
    """Deploy a :term:`TWAMP` session on an IP trunk.

    * Run the :term:`TWAMP` playbook, including an initial dry run
    """
    return (
        begin
        >> store_process_subscription(Target.MODIFY)
        >> unsync
        >> lso_interaction(deploy_twamp_dry)
        >> lso_interaction(deploy_twamp_real)
        >> lso_interaction(check_twamp_status)
        >> resync
        >> done
    )