Skip to content
Snippets Groups Projects
Verified Commit e074c22e authored by Karel van Klink's avatar Karel van Klink :smiley_cat:
Browse files

Add terminate_switch workflow

parent 07ef5f2e
No related branches found
No related tags found
1 merge request!300Feature/lan switch interconnect
......@@ -27,7 +27,7 @@ from gso.utils.types import TTNumber
from gso.utils.workflow_steps import prompt_sharepoint_checklist_url
def initial_input_form_generator(product_name: str) -> FormGenerator:
def _initial_input_form_generator(product_name: str) -> FormGenerator:
"""Input form for creating a new Switch."""
class CreateSwitchForm(FormPage):
......@@ -192,7 +192,7 @@ def create_new_sharepoint_checklist(subscription: SwitchInactive, tt_number: str
@workflow(
"Create Switch",
initial_input_form=wrap_create_initial_input_form(initial_input_form_generator),
initial_input_form=wrap_create_initial_input_form(_initial_input_form_generator),
target=Target.CREATE,
)
def create_switch() -> StepList:
......
"""Workflow for terminating a switch."""
from orchestrator import begin, done, workflow
from orchestrator.forms import FormPage
from orchestrator.targets import Target
from orchestrator.types import SubscriptionLifecycle
from orchestrator.utils.errors import ProcessFailureError
from orchestrator.workflow import StepList, step
from orchestrator.workflows.steps import resync, set_status, store_process_subscription, unsync
from orchestrator.workflows.utils import wrap_modify_initial_input_form
from pydantic_forms.types import FormGenerator, UUIDstr
from pydantic_forms.validators import Label
from gso.products.product_types.switch import Switch
from gso.utils.types import TTNumber
def _input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
"""Input form to confirm that this switch indeed must be terminated."""
switch = Switch.from_subscription(subscription_id)
class TerminateForm(FormPage):
if switch.status == SubscriptionLifecycle.INITIAL:
info_label: Label = (
"This will immediately mark the subscription as terminated, preventing any other workflows from "
"interacting with this product subscription."
)
info_label_2: Label = "ONLY EXECUTE THIS WORKFLOW WHEN YOU ARE ABSOLUTELY SURE WHAT YOU ARE DOING."
tt_number: TTNumber
yield TerminateForm
return {"subscription": switch}
@step("Remove switch from Netbox")
def remove_device_from_netbox(subscription: dict) -> None:
"""Remove the switch from Netbox."""
if subscription["switch"]:
msg = "Removal from Netbox is not implemented."
raise ProcessFailureError(msg)
@workflow(
"Terminate switch",
initial_input_form=wrap_modify_initial_input_form(_input_form_generator),
target=Target.TERMINATE,
)
def terminate_switch() -> StepList:
"""Terminate a switch subscription.
* Remove the switch from Netbox
* Mark the service as terminated in the service database
"""
return (
begin
>> store_process_subscription(Target.TERMINATE)
>> unsync
>> remove_device_from_netbox
>> set_status(SubscriptionLifecycle.TERMINATED)
>> resync
>> done
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment