"""Workflow for terminating a switch.""" from orchestrator import begin, done, workflow from orchestrator.forms import SubmitFormPage from orchestrator.targets import Target from orchestrator.types import SubscriptionLifecycle from orchestrator.workflow import StepList, step from orchestrator.workflows.steps import resync, set_status, store_process_subscription, unsync from orchestrator.workflows.utils import wrap_modify_initial_input_form from pydantic_forms.types import FormGenerator, UUIDstr from pydantic_forms.validators import Label from gso.products.product_types.switch import Switch from gso.services.infoblox import delete_host_by_fqdn from gso.services.netbox_client import NetboxClient from gso.utils.types.tt_number import TTNumber def _input_form_generator(subscription_id: UUIDstr) -> FormGenerator: """Input form to confirm that this switch indeed must be terminated.""" switch = Switch.from_subscription(subscription_id) class TerminateForm(SubmitFormPage): if switch.status == SubscriptionLifecycle.INITIAL: info_label: Label = ( "This will immediately mark the subscription as terminated, preventing any other workflows from " "interacting with this product subscription." ) info_label_2: Label = "ONLY EXECUTE THIS WORKFLOW WHEN YOU ARE ABSOLUTELY SURE WHAT YOU ARE DOING." tt_number: TTNumber user_input = yield TerminateForm return {"subscription": switch} | user_input.model_dump() @step("Remove switch from Netbox") def remove_device_from_netbox(subscription: Switch) -> None: """Remove the switch from Netbox.""" NetboxClient().delete_device(subscription.switch.fqdn) @step("Remove switch from IPAM") def remove_device_from_ipam(subscription: Switch) -> None: """Remove the switch from IPAM.""" delete_host_by_fqdn(subscription.switch.fqdn) @workflow( "Terminate switch", initial_input_form=wrap_modify_initial_input_form(_input_form_generator), target=Target.TERMINATE, ) def terminate_switch() -> StepList: """Terminate a switch subscription. * Remove the switch from Netbox * Mark the service as terminated in the service database """ return ( begin >> store_process_subscription(Target.TERMINATE) >> unsync >> remove_device_from_netbox >> remove_device_from_ipam >> set_status(SubscriptionLifecycle.TERMINATED) >> resync >> done )