Skip to content
Snippets Groups Projects
activate_iptrunk.py 2.08 KiB
"""Activate IP trunk takes a provisioning trunk to the active lifecycle state."""

from orchestrator.config.assignee import Assignee
from orchestrator.forms import FormPage
from orchestrator.forms.validators import Label
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, SubscriptionLifecycle, UUIDstr
from orchestrator.workflow import StepList, begin, done, inputstep, workflow
from orchestrator.workflows.steps import resync, set_status, store_process_subscription, unsync
from orchestrator.workflows.utils import wrap_modify_initial_input_form

from gso.products.product_types.iptrunk import Iptrunk


def _initial_input_form(subscription_id: UUIDstr) -> FormGenerator:
    trunk = Iptrunk.from_subscription(subscription_id)

    class ActivateTrunkForm(FormPage):
        info_label: Label = "Start approval process for IP trunk activation."

    user_input = yield ActivateTrunkForm

    return user_input.model_dump() | {"subscription": trunk}


@inputstep("Verify checklist completion", assignee=Assignee.SYSTEM)
def verify_complete_checklist() -> FormGenerator:
    """Show a form for the operator to input a link to the completed checklist."""

    class VerifyCompleteForm(FormPage):
        info_label: Label = "Verify that the checklist has been completed. Then continue this workflow."
        checklist_url: str = ""

    user_input = yield VerifyCompleteForm

    return {"checklist_url": user_input.model_dump()["checklist_url"]}


@workflow(
    "Activate an IP Trunk",
    initial_input_form=wrap_modify_initial_input_form(_initial_input_form),
    target=Target.MODIFY,
)
def activate_iptrunk() -> StepList:
    """Move an IP Trunk from a ``PROVISIONING`` state to an ``ACTIVE`` state.

    * Send email notifications to different teams.
    * Wait for approval to be given.
    * Update the subscription lifecycle state to ``ACTIVE``.
    """
    return (
        begin
        >> store_process_subscription(Target.MODIFY)
        >> unsync
        >> verify_complete_checklist
        >> set_status(SubscriptionLifecycle.ACTIVE)
        >> resync
        >> done
    )