-
Karel van Klink authoredKarel van Klink authored
activate_switch.py 1.89 KiB
"""Workflow for activating a switch, making it available to other subscriptions."""
from orchestrator.config.assignee import Assignee
from orchestrator.forms import FormPage
from orchestrator.forms.validators import Label
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, SubscriptionLifecycle, UUIDstr
from orchestrator.workflow import StepList, begin, done, inputstep, workflow
from orchestrator.workflows.steps import resync, set_status, store_process_subscription, unsync
from orchestrator.workflows.utils import wrap_modify_initial_input_form
from gso.products.product_types.switch import Switch
def _initial_input_form(subscription_id: UUIDstr) -> FormGenerator:
switch = Switch.from_subscription(subscription_id)
class ActivateSwitchForm(FormPage):
info_label: Label = "Start approval process for switch activation."
yield ActivateSwitchForm
return {"subscription": switch}
@inputstep("Verify checklist completion", assignee=Assignee.SYSTEM)
def verify_complete_checklist() -> FormGenerator:
"""Ask the operator to provide a link to the completed checklist in SharePoint."""
class VerifyCompleteForm(FormPage):
info_label: Label = "Please enter URL to the completed checklist in SharePoint. Then continue this workflow."
checklist_url: str = ""
user_input = yield VerifyCompleteForm
return {"checklist_url": user_input.model_dump()["checklist_url"]}
@workflow(
"Activate switch", initial_input_form=(wrap_modify_initial_input_form(_initial_input_form)), target=Target.MODIFY
)
def activate_switch() -> StepList:
"""Take a switch and move it from a `PROVISIONING` to an `ACTIVE` state."""
return (
begin
>> store_process_subscription(Target.MODIFY)
>> unsync
>> verify_complete_checklist
>> set_status(SubscriptionLifecycle.ACTIVE)
>> resync
>> done
)