"""A workflow for terminating a site subscription. The `terminate_site` workflow will take an existing and active site subscription from an `ACTIVE` to a `TERMINATED` state. This requires all dependant subscription instances to already be terminated. If this is not the case, the workflow will be unavailable for an operator to run, accompanied by an error message explaining this fact. """ from orchestrator.forms import FormPage from orchestrator.forms.validators import Label from orchestrator.targets import Target from orchestrator.types import FormGenerator, SubscriptionLifecycle, UUIDstr from orchestrator.workflow import StepList, begin, done, workflow from orchestrator.workflows.steps import ( resync, set_status, store_process_subscription, unsync, ) from orchestrator.workflows.utils import wrap_modify_initial_input_form from gso.products.product_types.site import Site def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: """Ask the user for confirmation whether to terminate the selected site.""" site = Site.from_subscription(subscription_id) class TerminateForm(FormPage): if site.status == SubscriptionLifecycle.INITIAL: info_label_2: Label = ( "This will immediately mark the subscription as terminated, preventing any other workflows from " "interacting with this product subscription." ) info_label_3: Label = "ONLY EXECUTE THIS WORKFLOW WHEN YOU ARE ABSOLUTELY SURE WHAT YOU ARE DOING." termination_label: Label = "Are you sure you want to delete this site?" user_input = yield TerminateForm return user_input.model_dump() @workflow( "Terminate Site", initial_input_form=wrap_modify_initial_input_form(initial_input_form_generator), target=Target.TERMINATE, ) def terminate_site() -> StepList: """Terminate a site subscription.""" return ( begin >> store_process_subscription(Target.TERMINATE) >> unsync >> set_status(SubscriptionLifecycle.TERMINATED) >> resync >> done )