Skip to content
Snippets Groups Projects

update intake form of device termination workflow

Merged Karel van Klink requested to merge feature/NAT-210-termination-checkboxes into develop
1 file
+ 24
13
Compare changes
  • Side-by-side
  • Inline
# noinspection PyProtectedMember
import ipaddress
from orchestrator.forms import FormPage
from orchestrator.forms.validators import Label
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, State, SubscriptionLifecycle, UUIDstr
from orchestrator.workflow import StepList, done, init, step, workflow
from orchestrator.workflow import StepList, conditional, done, init, step, workflow
from orchestrator.workflows.steps import resync, set_status, store_process_subscription, unsync
from orchestrator.workflows.utils import wrap_modify_initial_input_form
@@ -15,17 +14,21 @@ from gso.services.ipam import V4ServiceNetwork, V6ServiceNetwork
from gso.services.provisioning_proxy import pp_interaction
def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
subscription = Iptrunk.from_subscription(subscription_id)
def initial_input_form_generator() -> FormGenerator:
class TerminateForm(FormPage):
are_you_sure: Label = f"Are you sure you want to remove {subscription.description}?" # type: ignore
termination_label: Label = (
"Please confirm whether configuration should get removed from the A and B sides of the trunk, and whether "
"IPAM resources should be released." # type: ignore
)
remove_configuration: bool = True
clean_up_ipam: bool = True
return TerminateForm # type: ignore
user_input = yield TerminateForm
return user_input.dict()
@step("Set iptrunk ISIS metric to 9000")
def modify_iptrunk_subscription(subscription: Iptrunk) -> State:
def update_isis_metric(subscription: Iptrunk) -> State:
subscription.iptrunk.iptrunk_isis_metric = 9000
return {"subscription": subscription}
@@ -87,15 +90,23 @@ def deprovision_ip_trunk_ipv6(subscription: Iptrunk) -> dict[str, V4ServiceNetwo
target=Target.TERMINATE,
)
def terminate_iptrunk() -> StepList:
run_config_steps = conditional(lambda state: state.get("remove_configuration", True))
run_ipam_steps = conditional(lambda state: state.get("clean_up_ipam", True))
config_steps = (
StepList([update_isis_metric])
>> pp_interaction(drain_traffic_from_ip_trunk, 3)
>> pp_interaction(deprovision_ip_trunk_dry, 3)
>> pp_interaction(deprovision_ip_trunk_real, 3)
)
ipam_steps = StepList([deprovision_ip_trunk_ipv4, deprovision_ip_trunk_ipv6])
return (
init
>> store_process_subscription(Target.TERMINATE)
>> unsync
>> modify_iptrunk_subscription
>> pp_interaction(drain_traffic_from_ip_trunk, 3)
>> pp_interaction(deprovision_ip_trunk_dry, 3)
>> deprovision_ip_trunk_ipv4
>> deprovision_ip_trunk_ipv6
>> run_config_steps(config_steps)
>> run_ipam_steps(ipam_steps)
>> set_status(SubscriptionLifecycle.TERMINATED)
>> resync
>> done
Loading