Skip to content
Snippets Groups Projects
Verified Commit fbb32d65 authored by Karel van Klink's avatar Karel van Klink :smiley_cat:
Browse files

Allow skipping Ansible steps in the Edge Port termination workflow

parent d30d62be
No related branches found
No related tags found
No related merge requests found
Pipeline #94064 passed
......@@ -9,7 +9,9 @@ from orchestrator.types import SubscriptionLifecycle
from orchestrator.workflow import StepList, begin, done, step
from orchestrator.workflows.steps import resync, set_status, store_process_subscription, unsync
from orchestrator.workflows.utils import wrap_modify_initial_input_form
from pydantic import Field
from pydantic_forms.types import FormGenerator, UUIDstr
from pydantic_forms.validators import Label
from gso.products.product_types.edge_port import EdgePort
from gso.services.lso_client import LSOState, lso_interaction
......@@ -24,6 +26,9 @@ def initial_input_form_generator() -> FormGenerator:
class TerminateForm(SubmitFormPage):
tt_number: TTNumber
label: Label = Field("Should this workflow run Ansible playbooks to remove configuration from the router?")
run_ansible_steps: bool = True
user_input = yield TerminateForm
return user_input.model_dump()
......@@ -83,13 +88,14 @@ def netbox_clean_up(subscription: EdgePort) -> None:
def terminate_edge_port() -> StepList:
"""Terminate a new edge port in the network."""
router_is_nokia = conditional(lambda state: state["subscription"]["edge_port"]["node"]["vendor"] == Vendor.NOKIA)
run_ansible_steps = conditional(lambda state: state["run_ansible_steps"])
return (
begin
>> store_process_subscription(Target.TERMINATE)
>> unsync
>> lso_interaction(remove_edge_port_dry)
>> lso_interaction(remove_edge_port_real)
>> run_ansible_steps(lso_interaction(remove_edge_port_dry))
>> run_ansible_steps(lso_interaction(remove_edge_port_real))
>> router_is_nokia(netbox_clean_up)
>> set_status(SubscriptionLifecycle.TERMINATED)
>> resync
......
......@@ -13,12 +13,14 @@ from test.workflows import (
@pytest.mark.workflow()
@pytest.mark.parametrize("include_lso_steps", [True, False])
@patch("gso.services.netbox_client.NetboxClient.delete_interface")
@patch("gso.services.netbox_client.NetboxClient.free_interface")
def test_successful_edge_port_termination(
mocked_free_interface,
mocked_delete_interface,
edge_port_subscription_factory,
include_lso_steps,
faker,
):
# Set up mock return values
......@@ -32,12 +34,14 @@ def test_successful_edge_port_termination(
{"subscription_id": subscription_id},
{
"tt_number": faker.tt_number(),
"run_ansible_steps": include_lso_steps,
},
]
result, process_stat, step_log = run_workflow("terminate_edge_port", initial_data)
for _ in range(2):
result, step_log = assert_lso_interaction_success(result, process_stat, step_log)
if include_lso_steps:
for _ in range(2):
result, step_log = assert_lso_interaction_success(result, process_stat, step_log)
assert_complete(result)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment