"""Cancel a subscription that is in the initial lifecycle state.""" from orchestrator.forms import FormPage from orchestrator.forms.validators import Label from orchestrator.targets import Target from orchestrator.types import FormGenerator, SubscriptionLifecycle, UUIDstr from orchestrator.workflow import StepList, done, init, workflow from orchestrator.workflows.steps import resync, set_status, store_process_subscription, unsync from orchestrator.workflows.utils import wrap_modify_initial_input_form def _initial_input_form(subscription_id: UUIDstr) -> FormGenerator: class CancelSubscriptionForm(FormPage): info_label: Label = f"Canceling subscription with ID {subscription_id}" # type:ignore[assignment] info_label_2: Label = ( "This will immediately mark the subscription as terminated, preventing any other workflows from interacting" # type:ignore[assignment] " with this product subscription." ) info_label_3: Label = "ONLY EXECUTE THIS WORKFLOW WHEN YOU ARE ABSOLUTELY SURE WHAT YOU ARE DOING." # type:ignore[assignment] info_label_4: Label = "THIS WORKFLOW IS IRREVERSIBLE AND MAY HAVE UNFORESEEN CONSEQUENCES." # type:ignore[assignment] yield CancelSubscriptionForm return {"subscription_id": subscription_id} @workflow( "Cancel an initial subscription", initial_input_form=wrap_modify_initial_input_form(_initial_input_form), target=Target.TERMINATE, ) def cancel_subscription() -> StepList: """Cancel an initial subscription, taking it from the ``INITIAL`` state straight to ``TERMINATED``. This workflow can be used when a creation workflow has failed, and the process needs to be restarted. This workflow will prevent a stray subscription, forever stuck in the initial state, to stick around. * Update the subscription lifecycle state to ``TERMINATED``. """ return ( init >> store_process_subscription(Target.TERMINATE) >> unsync >> set_status(SubscriptionLifecycle.TERMINATED) >> resync >> done )