Skip to content
Snippets Groups Projects
Verified Commit 54a9a0d1 authored by Karel van Klink's avatar Karel van Klink :smiley_cat:
Browse files

Add unit tests for switch workflows

parent 82268d29
No related branches found
No related tags found
1 merge request!300Feature/lan switch interconnect
Showing
with 236 additions and 27 deletions
......@@ -17,6 +17,7 @@ WF_USABLE_MAP.update({
"deploy_twamp": [SubscriptionLifecycle.PROVISIONING, SubscriptionLifecycle.ACTIVE],
"modify_trunk_interface": [SubscriptionLifecycle.PROVISIONING, SubscriptionLifecycle.ACTIVE],
"activate_iptrunk": [SubscriptionLifecycle.PROVISIONING],
"activate_switch": [SubscriptionLifecycle.PROVISIONING],
"terminate_site": ALL_ALIVE_STATES,
"terminate_router": ALL_ALIVE_STATES,
"terminate_iptrunk": ALL_ALIVE_STATES,
......
......@@ -6,7 +6,6 @@ from orchestrator.config.assignee import Assignee
from orchestrator.forms import FormPage
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, State, SubscriptionLifecycle, UUIDstr
from orchestrator.utils.errors import ProcessFailureError
from orchestrator.workflow import StepList, begin, done, inputstep, step, workflow
from orchestrator.workflows.steps import resync, set_status, store_process_subscription
from orchestrator.workflows.utils import wrap_create_initial_input_form
......@@ -15,7 +14,7 @@ from pydantic_forms.validators import Label, ReadOnlyField
from gso.products.product_blocks.switch import SwitchModel
from gso.products.product_types.site import Site
from gso.products.product_types.switch import SwitchInactive
from gso.products.product_types.switch import SwitchInactive, SwitchProvisioning
from gso.services import infoblox
from gso.services.lso_client import LSOState, lso_interaction
from gso.services.partners import get_partner_by_name
......@@ -36,7 +35,7 @@ def _initial_input_form_generator(product_name: str) -> FormGenerator:
tt_number: TTNumber
partner: ReadOnlyField("GEANT", default_type=str) # type: ignore[valid-type]
switch_site: active_site_selector() # type: ignore[valid-type]
switch_site: active_site_selector() or str # type: ignore[valid-type]
hostname: str
ts_port: PortNumber
vendor: ReadOnlyField(Vendor.JUNIPER, default_type=Vendor) # type: ignore[valid-type]
......@@ -168,12 +167,8 @@ def run_post_deploy_checks(subscription: dict) -> LSOState:
@step("Create a new SharePoint checklist")
def create_new_sharepoint_checklist(subscription: SwitchInactive, tt_number: str, process_id: UUIDstr) -> State:
def create_new_sharepoint_checklist(subscription: SwitchProvisioning, tt_number: str, process_id: UUIDstr) -> State:
"""Create a new checklist in SharePoint for approving this router."""
if not subscription.switch.fqdn:
msg = "Switch is missing an FQDN."
raise ProcessFailureError(msg, details=subscription.subscription_id)
new_list_item_url = SharePointClient().add_list_item(
list_name="switch",
fields={
......
......@@ -3,15 +3,16 @@
from orchestrator import begin, done, workflow
from orchestrator.forms import FormPage
from orchestrator.targets import Target
from orchestrator.types import SubscriptionLifecycle
from orchestrator.utils.errors import ProcessFailureError
from orchestrator.types import SubscriptionLifecycle, UUIDstr
from orchestrator.workflow import StepList, step
from orchestrator.workflows.steps import resync, set_status, store_process_subscription, unsync
from orchestrator.workflows.utils import wrap_modify_initial_input_form
from pydantic_forms.types import FormGenerator, UUIDstr
from pydantic_forms.types import FormGenerator
from pydantic_forms.validators import Label
from gso.products.product_types.switch import Switch
from gso.services.infoblox import delete_host_by_fqdn
from gso.services.netbox_client import NetboxClient
from gso.utils.types.tt_number import TTNumber
......@@ -34,11 +35,15 @@ def _input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
@step("Remove switch from Netbox")
def remove_device_from_netbox(subscription: dict) -> None:
def remove_device_from_netbox(subscription: Switch) -> None:
"""Remove the switch from Netbox."""
if subscription["switch"]:
msg = "Removal from Netbox is not implemented."
raise ProcessFailureError(msg)
NetboxClient().delete_device(subscription.switch.fqdn)
@step("Remove switch from IPAM")
def remove_device_from_ipam(subscription: Switch) -> None:
"""Remove the switch from :term:`IPAM`."""
delete_host_by_fqdn(subscription.switch.fqdn)
@workflow(
......@@ -57,6 +62,7 @@ def terminate_switch() -> StepList:
>> store_process_subscription(Target.TERMINATE)
>> unsync
>> remove_device_from_netbox
>> remove_device_from_ipam
>> set_status(SubscriptionLifecycle.TERMINATED)
>> resync
>> done
......
......@@ -14,7 +14,7 @@ from gso.services.netbox_client import NetboxClient
@step("Validate switch in Netbox")
def check_netbox_device(subscription: Switch) -> None:
"""Fetch the device in Netbox. Will raise an exception when it is not found."""
"""Fetch the device in Netbox. Will raise an exception if it is not found."""
NetboxClient().get_device_by_name(subscription.switch.fqdn)
......
......@@ -81,7 +81,6 @@ def input_form_wizard_data(request, router_subscription_factory, partner_factory
@patch("gso.services.lso_client._send_request")
def test_successful_edge_port_creation(
mock_execute_playbook,
responses,
input_form_wizard_data,
faker,
_netbox_client_mock, # noqa: PT019
......
......@@ -119,7 +119,6 @@ def test_successful_iptrunk_creation_with_standard_lso_result(
mock_allocate_v4_network,
mock_allocate_v6_network,
mock_execute_playbook,
responses,
input_form_wizard_data,
faker,
_netbox_client_mock, # noqa: PT019
......@@ -176,7 +175,6 @@ def test_iptrunk_creation_fails_when_lso_return_code_is_one(
mock_allocate_v4_network,
mock_allocate_v6_network,
mock_execute_playbook,
responses,
input_form_wizard_data,
faker,
_netbox_client_mock, # noqa: PT019
......@@ -217,7 +215,6 @@ def test_successful_iptrunk_creation_with_juniper_interface_names(
mock_allocate_v4_network,
mock_allocate_v6_network,
mock_execute_playbook,
responses,
input_form_wizard_data,
faker,
data_config_filename: PathLike,
......
......@@ -34,7 +34,6 @@ def base_bgp_peer_input(faker):
def test_create_nren_l3_core_service_success(
mock_lso_client,
l3_core_type,
responses,
faker,
partner_factory,
edge_port_subscription_factory,
......
......@@ -6,7 +6,7 @@ from test.workflows import assert_complete, run_workflow
@pytest.mark.workflow()
def test_modify_connection_strategy(responses, router_subscription_factory):
def test_modify_connection_strategy(router_subscription_factory):
subscription_id = router_subscription_factory(router_access_via_ts=True)
subscription = Router.from_subscription(subscription_id)
assert subscription.router.router_access_via_ts is True
......
......@@ -9,7 +9,7 @@ from test.workflows import assert_complete, extract_state, run_workflow
@pytest.mark.workflow()
def test_create_site(responses, faker):
def test_create_site(faker):
product_id = get_product_id_by_name(ProductName.SITE)
initial_site_data = [
{"product": product_id},
......@@ -41,7 +41,7 @@ def test_create_site(responses, faker):
@pytest.mark.workflow()
def test_site_name_is_incorrect(responses, faker):
def test_site_name_is_incorrect(faker):
"""Test validate site name on site creation.
The site name is a string with 3 upper case letter and one digit.
......
......@@ -6,7 +6,7 @@ from test.workflows import assert_complete, extract_state, run_workflow
@pytest.mark.workflow()
def test_modify_site(responses, site_subscription_factory, faker):
def test_modify_site(site_subscription_factory, faker):
subscription_id = site_subscription_factory()
initial_site_data = [
{"subscription_id": subscription_id},
......
......@@ -5,7 +5,7 @@ from test.workflows import assert_complete, extract_state, run_workflow
@pytest.mark.workflow()
def test_terminate_site(responses, site_subscription_factory):
def test_terminate_site(site_subscription_factory):
subscription_id = site_subscription_factory()
initial_site_data = [{"subscription_id": subscription_id}, {}]
result, _, _ = run_workflow("terminate_site", initial_site_data)
......
import pytest
from gso.products.product_types.switch import Switch
from test.workflows import (
assert_complete,
assert_suspended,
extract_state,
resume_workflow,
run_workflow,
)
@pytest.mark.workflow()
def test_activate_switch_success(
switch_subscription_factory,
faker,
):
# Set up mock return values
product_id = switch_subscription_factory(status="provisioning")
# Sanity check
assert Switch.from_subscription(product_id).status == "provisioning"
# Run workflow
initial_input_data = [{"subscription_id": product_id}, {}]
result, process_stat, step_log = run_workflow("activate_switch", initial_input_data)
assert_suspended(result)
result, step_log = resume_workflow(process_stat, step_log, input_data=[{"checklist_url": "http://localhost"}])
assert_complete(result)
state = extract_state(result)
subscription_id = state["subscription_id"]
subscription = Switch.from_subscription(subscription_id)
assert subscription.status == "active"
import pytest
from orchestrator.types import SubscriptionLifecycle
from gso.products import ProductName
from gso.products.product_blocks.switch import SwitchModel
from gso.products.product_types.switch import ImportedSwitch
from gso.utils.shared_enums import Vendor
from test.workflows import (
assert_complete,
extract_state,
run_workflow,
)
@pytest.fixture()
def workflow_input_data(faker, site_subscription_factory):
return {
"fqdn": faker.domain_name(levels=4),
"ts_port": faker.port_number(is_user=True),
"site": site_subscription_factory(),
"switch_vendor": Vendor.JUNIPER,
"switch_model": SwitchModel.EX3400,
}
@pytest.mark.workflow()
def test_create_imported_switch_success(workflow_input_data):
result, _, _ = run_workflow("create_imported_switch", [workflow_input_data])
state = extract_state(result)
subscription = ImportedSwitch.from_subscription(state["subscription_id"])
assert_complete(result)
assert subscription.product.name == ProductName.IMPORTED_SWITCH
assert subscription.status == SubscriptionLifecycle.ACTIVE
from unittest.mock import patch
import pytest
from gso.products import ProductName
from gso.products.product_types.switch import Switch
from gso.services.subscriptions import get_product_id_by_name
from test import USER_CONFIRM_EMPTY_FORM
from test.services.conftest import MockedSharePointClient
from test.workflows import (
assert_complete,
assert_lso_interaction_success,
assert_suspended,
extract_state,
resume_workflow,
run_workflow,
)
@pytest.mark.workflow()
@patch("gso.services.lso_client._send_request")
@patch("gso.services.infoblox.hostname_available")
@patch("gso.services.sharepoint.SharePointClient")
def test_create_switch_success(
mock_sharepoint_client, mock_hostname_available, mock_execute_playbook, faker, site_subscription_factory
):
product_id = get_product_id_by_name(ProductName.SWITCH)
initial_form_input = [
{"product": product_id},
{
"tt_number": faker.tt_number(),
"switch_site": site_subscription_factory(),
"hostname": faker.domain_word(),
"ts_port": faker.port_number(is_user=True),
},
]
mock_hostname_available.return_value = True
mock_sharepoint_client.return_value = MockedSharePointClient
result, process_stat, step_log = run_workflow("create_switch", initial_form_input)
# Two LSO interactions
for _ in range(2):
result, step_log = assert_lso_interaction_success(result, process_stat, step_log)
# Two user prompts
for _ in range(2):
assert_suspended(result)
result, step_log = resume_workflow(process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM)
# One LSO interaction
result, step_log = assert_lso_interaction_success(result, process_stat, step_log)
# Sharepoint list created
assert_suspended(result)
result, step_log = resume_workflow(process_stat, step_log, input_data=USER_CONFIRM_EMPTY_FORM)
assert_complete(result)
state = extract_state(result)
subscription_id = state["subscription_id"]
subscription = Switch.from_subscription(subscription_id)
assert subscription.status == "provisioning"
assert mock_execute_playbook.call_count == 3
import pytest
from orchestrator.types import SubscriptionLifecycle
from gso.products import ProductName
from gso.products.product_types.switch import Switch
from test.workflows import assert_complete, run_workflow
@pytest.mark.workflow()
def test_import_switch_success(switch_subscription_factory):
imported_switch = switch_subscription_factory(is_imported=False)
result, _, _ = run_workflow("import_switch", [{"subscription_id": imported_switch}])
subscription = Switch.from_subscription(imported_switch)
assert_complete(result)
assert subscription.product.name == ProductName.SWITCH
assert subscription.status == SubscriptionLifecycle.ACTIVE
assert subscription.insync is True
from unittest.mock import patch
import pytest
from gso.products.product_types.switch import Switch
from test.workflows import assert_complete, extract_state, run_workflow
@pytest.mark.workflow()
@patch("gso.services.netbox_client.NetboxClient.delete_device")
@patch("gso.services.infoblox.delete_host_by_fqdn")
def test_terminate_switch(mock_delete_host_by_fqdn, mock_delete_device, switch_subscription_factory, faker):
subscription_id = switch_subscription_factory()
initial_switch_data = [{"subscription_id": subscription_id}, {"tt_number": faker.tt_number()}]
result, _, _ = run_workflow("terminate_switch", initial_switch_data)
assert_complete(result)
state = extract_state(result)
subscription_id = state["subscription_id"]
subscription = Switch.from_subscription(subscription_id)
assert subscription.status == "terminated"
assert mock_delete_device.call_count == 1
assert mock_delete_host_by_fqdn.call_count == 1
from unittest.mock import patch
import pytest
from gso.products.product_types.switch import Switch
from test.workflows import (
assert_complete,
assert_lso_success,
extract_state,
run_workflow,
)
@pytest.mark.workflow()
@patch("gso.services.lso_client._send_request")
@patch("gso.services.netbox_client.NetboxClient.get_device_by_name")
def test_validate_switch_success(
mock_get_device_by_name,
mock_execute_playbook,
switch_subscription_factory,
faker,
data_config_filename,
geant_partner,
):
# Run workflow
subscription_id = switch_subscription_factory()
initial_switch_data = [{"subscription_id": subscription_id}]
result, process_stat, step_log = run_workflow("validate_switch", initial_switch_data)
result, step_log = assert_lso_success(result, process_stat, step_log)
assert_complete(result)
state = extract_state(result)
subscription = Switch.from_subscription(state["subscription_id"])
assert subscription.status == "active"
assert mock_execute_playbook.call_count == 1
assert mock_get_device_by_name.call_count == 1
......@@ -4,7 +4,7 @@ from test.workflows import assert_complete, extract_state, run_workflow
@pytest.mark.workflow()
def test_task_validate_geant_products(responses):
def test_task_validate_geant_products():
result, _, _ = run_workflow("task_validate_geant_products", [{}])
assert_complete(result)
state = extract_state(result)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment