"""Activate router takes a provisioning router to the active lifecycle state.""" from orchestrator.config.assignee import Assignee from orchestrator.forms import FormPage from orchestrator.forms.validators import Label from orchestrator.targets import Target from orchestrator.types import FormGenerator, SubscriptionLifecycle, UUIDstr from orchestrator.workflow import StepList, done, init, inputstep, workflow from orchestrator.workflows.steps import resync, set_status, store_process_subscription, unsync from orchestrator.workflows.utils import wrap_modify_initial_input_form from gso.products.product_types.router import Router def _initial_input_form(subscription_id: UUIDstr) -> FormGenerator: router = Router.from_subscription(subscription_id) class ActivateRouterForm(FormPage): info_label: Label = "Start approval process for router activation." # type:ignore[assignment] user_input = yield ActivateRouterForm return user_input.dict() | {"subscription": router} @inputstep("Verify checklist completion", assignee=Assignee.SYSTEM) def verify_complete_checklist() -> FormGenerator: """Show a form for the operator to input a link to the completed checklist.""" class VerifyCompleteForm(FormPage): info_label: Label = "Verify that the checklist has been completed. Then continue this workflow." # type: ignore[assignment] checklist_url: str = "" user_input = yield VerifyCompleteForm return {"checklist_url": user_input.dict()["checklist_url"]} @workflow( "Activate a router", initial_input_form=wrap_modify_initial_input_form(_initial_input_form), target=Target.MODIFY, ) def activate_router() -> StepList: """Move a router from a ``PROVISIONING`` state to an ``ACTIVE`` state. * Send email notifications to different teams. * Wait for approval to be given. * Update the subscription lifecycle state to ``ACTIVE``. """ return ( init >> store_process_subscription(Target.MODIFY) >> unsync >> verify_complete_checklist >> set_status(SubscriptionLifecycle.ACTIVE) >> resync >> done )