Skip to content
Snippets Groups Projects
Verified Commit 3f439722 authored by Karel van Klink's avatar Karel van Klink :smiley_cat:
Browse files

Add unit test for GÉANT IP creation

parent bd21286a
No related branches found
No related tags found
1 merge request!286Add Edge Port, GÉANT IP and IAS products
......@@ -88,7 +88,7 @@ SUBSCRIPTION_MODEL_REGISTRY.update(
ProductName.IMPORTED_OPENGEAR.value: ImportedOpengear,
ProductName.EDGE_PORT.value: EdgePort,
ProductName.IMPORTED_EDGE_PORT.value: ImportedEdgePort,
ProductType.GEANT_IP.value: GeantIP,
ProductType.IMPORTED_GEANT_IP.value: ImportedGeantIP,
ProductName.GEANT_IP.value: GeantIP,
ProductName.IMPORTED_GEANT_IP.value: ImportedGeantIP,
},
)
from unittest.mock import patch
import pytest
from orchestrator.types import SubscriptionLifecycle
from gso.products import ProductName
from gso.products.product_types.geant_ip import GeantIP
from gso.services.subscriptions import get_product_id_by_name
from gso.utils.shared_enums import APType
from test.workflows import assert_complete, run_workflow
from test.workflows import assert_complete, assert_lso_interaction_success, extract_state, run_workflow
@pytest.fixture()
......@@ -25,8 +29,15 @@ def base_bgp_peer_input(faker):
@pytest.mark.workflow()
@patch("gso.services.lso_client._send_request")
def test_create_geant_ip_success(
responses, faker, partner_factory, edge_port_subscription_factory, base_bgp_peer_input
mock_lso_client,
responses,
faker,
partner_factory,
edge_port_subscription_factory,
base_bgp_peer_input,
data_config_filename,
):
partner = partner_factory(name=faker.company(), email=faker.email())
product_id = get_product_id_by_name(ProductName.GEANT_IP)
......@@ -47,7 +58,20 @@ def test_create_geant_ip_success(
"v6_bgp_peer": base_bgp_peer_input() | {"add_v6_multicast": faker.boolean(), "peer_address": faker.ipv6()},
},
]
lso_interaction_count = 6
result, process_stat, step_log = run_workflow("create_geant_ip", form_input_data)
assert process_stat, step_log
for _ in range(lso_interaction_count):
result, step_log = assert_lso_interaction_success(result, process_stat, step_log)
assert_complete(result)
state = extract_state(result)
subscription = GeantIP.from_subscription(state["subscription_id"])
assert mock_lso_client.call_count == lso_interaction_count
assert subscription.status == SubscriptionLifecycle.ACTIVE
assert len(subscription.geant_ip.geant_ip_ap_list) == 1
assert (
str(subscription.geant_ip.geant_ip_ap_list[0].geant_ip_sbp.edge_port.owner_subscription_id)
== form_input_data[2]["edge_ports"][0]["edge_port"]
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment