Skip to content
Snippets Groups Projects
Commit b49654c8 authored by Neda Moeini's avatar Neda Moeini
Browse files

Add terminate VRF WF.

parent 1ca8bec9
No related branches found
Tags 2.27
1 merge request!314Feature/vrf
......@@ -29,6 +29,12 @@ new_workflows = [
"target": "MODIFY",
"description": "Modify VRF router list",
"product_type": "VRF"
},
{
"name": "terminate_vrf",
"target": "TERMINATE",
"description": "Terminate a VRF subscription",
"product_type": "VRF"
}
]
......
......@@ -133,3 +133,4 @@ LazyWorkflowInstance("gso.workflows.l2_circuit.import_layer_2_circuit", "import_
# VRF workflows
LazyWorkflowInstance("gso.workflows.vrf.create_vrf", "create_vrf")
LazyWorkflowInstance("gso.workflows.vrf.modify_vrf_router_list", "modify_vrf_router_list")
LazyWorkflowInstance("gso.workflows.vrf.terminate_vrf", "terminate_vrf")
"""A workflow for terminating a VRF subscription."""
from typing import Any
from orchestrator.forms import FormPage
from orchestrator.forms.validators import Label
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, SubscriptionLifecycle, UUIDstr
from orchestrator.workflow import StepList, begin, done, workflow
from orchestrator.workflows.steps import (
resync,
set_status,
store_process_subscription,
unsync,
)
from orchestrator.workflows.utils import wrap_modify_initial_input_form
from pydantic import model_validator
from gso.products.product_types.vrf import VRF
from gso.utils.types.tt_number import TTNumber
def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
"""Ask the user for confirmation whether to terminate the selected VRF."""
vrf_subscription = VRF.from_subscription(subscription_id)
class TerminateForm(FormPage):
termination_label: Label = f"Are you sure you want to delete {vrf_subscription.vrf.vrf_name} VRF?"
tt_number: TTNumber
@model_validator(mode="before")
def vrf_must_have_empty_router_list(cls, data: Any) -> Any:
if vrf_subscription.vrf.vrf_router_list:
msg = ("VRF must not have any routers assigned to it before it can be deleted. Please remove all"
" routers from the VRF first.")
raise ValueError(msg)
return data
user_input = yield TerminateForm
return user_input.model_dump()
@workflow(
"Terminate VRF",
initial_input_form=wrap_modify_initial_input_form(initial_input_form_generator),
target=Target.TERMINATE,
)
def terminate_vrf() -> StepList:
"""Terminate a VRF subscription."""
return (
begin
>> store_process_subscription(Target.TERMINATE)
>> unsync
>> set_status(SubscriptionLifecycle.TERMINATED)
>> resync
>> done
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment