Skip to content
Snippets Groups Projects
activate_switch.py 1.89 KiB
"""Workflow for activating a switch, making it available to other subscriptions."""

from orchestrator.config.assignee import Assignee
from orchestrator.forms import FormPage
from orchestrator.forms.validators import Label
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, SubscriptionLifecycle, UUIDstr
from orchestrator.workflow import StepList, begin, done, inputstep, workflow
from orchestrator.workflows.steps import resync, set_status, store_process_subscription, unsync
from orchestrator.workflows.utils import wrap_modify_initial_input_form

from gso.products.product_types.switch import Switch


def _initial_input_form(subscription_id: UUIDstr) -> FormGenerator:
    switch = Switch.from_subscription(subscription_id)

    class ActivateSwitchForm(FormPage):
        info_label: Label = "Start approval process for switch activation."

    yield ActivateSwitchForm
    return {"subscription": switch}


@inputstep("Verify checklist completion", assignee=Assignee.SYSTEM)
def verify_complete_checklist() -> FormGenerator:
    """Ask the operator to provide a link to the completed checklist in SharePoint."""

    class VerifyCompleteForm(FormPage):
        info_label: Label = "Please enter URL to the completed checklist in SharePoint. Then continue this workflow."
        checklist_url: str = ""

    user_input = yield VerifyCompleteForm
    return {"checklist_url": user_input.model_dump()["checklist_url"]}


@workflow(
    "Activate switch", initial_input_form=(wrap_modify_initial_input_form(_initial_input_form)), target=Target.MODIFY
)
def activate_switch() -> StepList:
    """Take a switch and move it from a `PROVISIONING` to an `ACTIVE` state."""
    return (
        begin
        >> store_process_subscription(Target.MODIFY)
        >> unsync
        >> verify_complete_checklist
        >> set_status(SubscriptionLifecycle.ACTIVE)
        >> resync
        >> done
    )