Skip to content
Snippets Groups Projects
Verified Commit aa0a800f authored by Karel van Klink's avatar Karel van Klink :smiley_cat:
Browse files

Add unit tests for the GÉANT IP modification workflow

parent 1e940120
Branches
Tags
1 merge request!286Add Edge Port, GÉANT IP and IAS products
This commit is part of merge request !286. Comments created here will be created in the context of that merge request.
......@@ -29,12 +29,12 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
subscription = GeantIP.from_subscription(subscription_id)
class AccessPortSelection(BaseModel):
geant_ip_ep: active_edge_port_selector(partner_id=subscription.customer_id) # type: ignore[valid-type]
geant_ip_ep: active_edge_port_selector(partner_id=subscription.customer_id) | str # type: ignore[valid-type]
nren_ap_type: APType
def validate_edge_ports_are_unique(access_ports: list[AccessPortSelection]) -> list[AccessPortSelection]:
"""Verify if interfaces are unique."""
edge_ports = [port.geant_ip_ep.name for port in access_ports]
edge_ports = [str(port.geant_ip_ep) for port in access_ports]
if len(edge_ports) != len(set(edge_ports)):
msg = "Edge Ports must be unique."
raise ValueError(msg)
......
......@@ -209,7 +209,7 @@ def edge_port_data(temp_file, faker, nokia_router_subscription_factory, partner_
"minimum_links": 2,
"geant_ga_id": faker.geant_gid(),
"mac_address": faker.mac_address(),
"partner": partner_factory(name="GAAR", email=faker.email())["name"],
"partner": partner_factory()["name"],
"enable_lacp": True,
"ignore_if_down": False,
"ae_members": [
......
......@@ -120,7 +120,7 @@ class FakerProvider(BaseProvider):
]
def vlan_id(self) -> int:
return self.generator.random_int(min=1, max=4096)
return self.generator.random_int(min=1, max=4095)
@pytest.fixture(scope="session")
......@@ -272,15 +272,15 @@ def test_client(fastapi_app):
@pytest.fixture(scope="session")
def partner_factory():
def partner_factory(faker):
def _create_partner(
name: str,
email: str,
name: str | None = None,
email: str | None = None,
) -> dict:
return create_partner(
PartnerSchema(
name=name,
email=email,
name=name or faker.company(),
email=email or faker.email(),
)
)
......
......@@ -38,7 +38,7 @@ def edge_port_subscription_factory(faker, partner_factory, nokia_router_subscrip
ignore_if_down=False,
is_imported=True,
) -> UUIDstr:
partner = partner or partner_factory(name=faker.company(), email=faker.email())
partner = partner or partner_factory()
edge_port_node = Router.from_subscription(nokia_router_subscription_factory()).router
if is_imported:
product_id = subscriptions.get_product_id_by_name(ProductName.EDGE_PORT)
......
......@@ -30,6 +30,8 @@ def bgp_session_subscription_factory(faker):
bfd_enabled: bool = True,
multipath_enabled: bool | None = True,
send_default_route: bool | None = True,
is_passive: bool | None = False,
rtbh_enabled: bool | None = False,
):
return BGPSession.new(
subscription_id=uuid4(),
......@@ -43,6 +45,8 @@ def bgp_session_subscription_factory(faker):
is_multi_hop=is_multi_hop,
bfd_interval=bfd_interval,
bfd_multiplier=bfd_multiplier,
rtbh_enabled=rtbh_enabled,
is_passive=is_passive,
)
return create_bgp_session
......@@ -71,19 +75,27 @@ def service_binding_port_factory(faker, bgp_session_subscription_factory, edge_p
ipv6_address=ipv6_address or faker.ipv6(),
custom_firewall_filters=custom_firewall_filters,
geant_sid=geant_sid or faker.geant_sid(),
sbp_bgp_session_list=sbp_bgp_session_list or [bgp_session_subscription_factory() for _ in range(2)],
edge_port=edge_port or edge_port_subscription_factory(),
sbp_bgp_session_list=sbp_bgp_session_list
or [
bgp_session_subscription_factory(families=[IPFamily.V4UNICAST]),
bgp_session_subscription_factory(families=[IPFamily.V6UNICAST], peer_address=faker.ipv6()),
],
edge_port=edge_port or EdgePort.from_subscription(edge_port_subscription_factory()).edge_port,
)
return create_service_binding_port
@pytest.fixture()
def nren_access_port_factory(faker):
def create_nren_access_port(nren_ap_type: APType | None = None):
def nren_access_port_factory(faker, service_binding_port_factory):
def create_nren_access_port(
nren_ap_type: APType | None = None,
service_binding_port: ServiceBindingPort | None = None,
):
return NRENAccessPort.new(
subscription_id=uuid4(),
nren_ap_type=nren_ap_type or random.choice(list(APType)), # noqa: S311
geant_ip_sbp=service_binding_port or service_binding_port_factory(),
)
return create_nren_access_port
......@@ -93,9 +105,6 @@ def nren_access_port_factory(faker):
def geant_ip_subscription_factory(
faker,
partner_factory,
edge_port_subscription_factory,
bgp_session_subscription_factory,
service_binding_port_factory,
nren_access_port_factory,
):
def create_geant_ip_subscription(
......@@ -106,7 +115,7 @@ def geant_ip_subscription_factory(
status: SubscriptionLifecycle | None = None,
) -> UUIDstr:
product_id = subscriptions.get_product_id_by_name(ProductName.GEANT_IP)
partner = partner or partner_factory(name=faker.company(), email=faker.email())
partner = partner or partner_factory()
# Create GEANT IP subscription with product_id and partner details
geant_ip_subscription = GeantIPInactive.from_product_id(
......@@ -114,32 +123,17 @@ def geant_ip_subscription_factory(
)
# Default nren_ap_list creation with primary and backup access ports
nren_ap_list = nren_ap_list or [
nren_access_port_factory(
nren_ap_type=APType.PRIMARY, edge_port=edge_port_subscription_factory(partner=partner)
),
nren_access_port_factory(
nren_ap_type=APType.BACKUP, edge_port=edge_port_subscription_factory(partner=partner)
),
geant_ip_subscription.geant_ip.geant_ip_ap_list = nren_ap_list or [
nren_access_port_factory(nren_ap_type=APType.PRIMARY),
nren_access_port_factory(nren_ap_type=APType.BACKUP),
]
# Assign and save edge port and service binding ports
for nren_ap in nren_ap_list:
edge_port = nren_ap.geant_ip_ep
edge_port.edge_port_sbp_list = [
service_binding_port_factory(),
service_binding_port_factory(),
]
# Update subscription with description, start date, and status
geant_ip_subscription = SubscriptionModel.from_other_lifecycle(
geant_ip_subscription,
SubscriptionLifecycle.ACTIVE,
)
if status:
geant_ip_subscription.status = status
geant_ip_subscription.description = description or faker.text(max_nb_chars=30)
geant_ip_subscription.description = description or faker.sentence()
geant_ip_subscription.start_date = start_date
geant_ip_subscription.status = status or SubscriptionLifecycle.ACTIVE
geant_ip_subscription.save()
......
......@@ -18,7 +18,7 @@ def imported_edge_port_creation_input_form_data(nokia_router_subscription_factor
"minimum_links": 2,
"geant_ga_id": faker.geant_gid(),
"mac_address": faker.mac_address(),
"partner": partner_factory(name="GAAR", email=faker.email())["name"],
"partner": partner_factory()["name"],
"enable_lacp": True,
"ignore_if_down": False,
"ae_members": [
......
......@@ -39,7 +39,7 @@ def test_create_geant_ip_success(
base_bgp_peer_input,
data_config_filename,
):
partner = partner_factory(name=faker.company(), email=faker.email())
partner = partner_factory()
product_id = get_product_id_by_name(ProductName.GEANT_IP)
edge_port_a = edge_port_subscription_factory(partner=partner)
......
import pytest
from gso.products.product_blocks.bgp_session import IPFamily
from gso.products.product_types.geant_ip import GeantIP
from gso.utils.shared_enums import APType
from test.workflows import extract_state, run_workflow
@pytest.mark.workflow()
def test_modify_geant_ip_remove_edge_port_success(geant_ip_subscription_factory):
subscription_id = geant_ip_subscription_factory()
subscription = GeantIP.from_subscription(subscription_id)
access_port = subscription.geant_ip.geant_ip_ap_list[0]
input_form_data = [
{"subscription_id": subscription_id},
{
"access_ports": [
{
"geant_ip_ep": str(access_port.geant_ip_sbp.edge_port.owner_subscription_id),
"nren_ap_type": APType.LOAD_BALANCED,
}
] # The factory generates a subscription with two Access Ports, this will remove the second one.
},
{},
]
result, _, _ = run_workflow("modify_geant_ip", input_form_data)
state = extract_state(result)
subscription = GeantIP.from_subscription(state["subscription_id"])
assert len(subscription.geant_ip.geant_ip_ap_list) == 1
assert subscription.geant_ip.geant_ip_ap_list[0].nren_ap_type == APType.LOAD_BALANCED
@pytest.mark.workflow()
def test_modify_geant_ip_add_new_edge_port_success(
geant_ip_subscription_factory, edge_port_subscription_factory, partner_factory, faker
):
partner = partner_factory()
new_edge_port = edge_port_subscription_factory(partner=partner)
subscription_id = geant_ip_subscription_factory(partner=partner)
subscription = GeantIP.from_subscription(subscription_id)
input_form_data = [
{"subscription_id": subscription_id},
{
"access_ports": [
{
"geant_ip_ep": str(port.geant_ip_sbp.edge_port.owner_subscription_id),
"nren_ap_type": port.nren_ap_type,
}
for port in subscription.geant_ip.geant_ip_ap_list
]
+ [
{
"geant_ip_ep": str(new_edge_port),
"nren_ap_type": APType.BACKUP,
}
]
},
{}, # The existing SBPs are unchanged
{},
{ # Adding configuration for the new SBP
"geant_sid": faker.geant_sid(),
"vlan_id": faker.vlan_id(),
"ipv4_address": faker.ipv4(),
"ipv6_address": faker.ipv6(),
"v4_bgp_peer": {
"authentication_key": faker.password(),
"peer_address": faker.ipv4(),
},
"v6_bgp_peer": {
"authentication_key": faker.password(),
"peer_address": faker.ipv6(),
},
},
]
result, _, _ = run_workflow("modify_geant_ip", input_form_data)
state = extract_state(result)
subscription = GeantIP.from_subscription(state["subscription_id"])
assert len(subscription.geant_ip.geant_ip_ap_list) == 3
@pytest.mark.workflow()
def test_modify_geant_ip_modify_edge_port_success(faker, geant_ip_subscription_factory):
subscription_id = geant_ip_subscription_factory()
subscription = GeantIP.from_subscription(subscription_id)
new_sbp_data = [
{
"geant_sid": faker.geant_sid(),
"is_tagged": True,
"vlan_id": faker.vlan_id(),
"ipv4_address": faker.ipv4(),
"ipv6_address": faker.ipv6(),
"custom_firewall_filters": True,
"v4_bgp_peer": {
"bfd_enabled": True,
"bfd_interval": faker.pyint(),
"bfd_multiplier": faker.pyint(),
"has_custom_policies": True,
"authentication_key": faker.password(),
"multipath_enabled": True,
"send_default_route": True,
"is_passive": True,
"peer_address": faker.ipv4(),
"add_v4_multicast": True,
},
"v6_bgp_peer": {
"bfd_enabled": True,
"bfd_interval": faker.pyint(),
"bfd_multiplier": faker.pyint(),
"has_custom_policies": True,
"authentication_key": faker.password(),
"multipath_enabled": True,
"send_default_route": True,
"is_passive": True,
"peer_address": faker.ipv6(),
"add_v6_multicast": True,
},
},
{
"geant_sid": faker.geant_sid(),
"is_tagged": True,
"vlan_id": faker.vlan_id(),
"ipv4_address": faker.ipv4(),
"ipv6_address": faker.ipv6(),
"custom_firewall_filters": True,
"v4_bgp_peer": {
"bfd_enabled": True,
"bfd_interval": faker.pyint(),
"bfd_multiplier": faker.pyint(),
"has_custom_policies": True,
"authentication_key": faker.password(),
"multipath_enabled": True,
"send_default_route": True,
"is_passive": True,
"peer_address": faker.ipv4(),
"add_v4_multicast": True,
},
"v6_bgp_peer": {
"bfd_enabled": True,
"bfd_interval": faker.pyint(),
"bfd_multiplier": faker.pyint(),
"has_custom_policies": True,
"authentication_key": faker.password(),
"multipath_enabled": True,
"send_default_route": True,
"is_passive": True,
"peer_address": faker.ipv6(),
"add_v6_multicast": True,
},
},
]
input_form_data = [
{"subscription_id": subscription_id},
{
"access_ports": [
{
"geant_ip_ep": str(port.geant_ip_sbp.edge_port.owner_subscription_id),
"nren_ap_type": port.nren_ap_type,
}
for port in subscription.geant_ip.geant_ip_ap_list
]
},
{**new_sbp_data[0]},
{**new_sbp_data[1]},
]
result, _, _ = run_workflow("modify_geant_ip", input_form_data)
state = extract_state(result)
subscription = GeantIP.from_subscription(state["subscription_id"])
assert len(subscription.geant_ip.geant_ip_ap_list) == 2
for i in range(2):
assert subscription.geant_ip.geant_ip_ap_list[i].geant_ip_sbp.geant_sid == new_sbp_data[i]["geant_sid"]
assert subscription.geant_ip.geant_ip_ap_list[i].geant_ip_sbp.is_tagged == new_sbp_data[i]["is_tagged"]
assert subscription.geant_ip.geant_ip_ap_list[i].geant_ip_sbp.vlan_id == new_sbp_data[i]["vlan_id"]
assert (
str(subscription.geant_ip.geant_ip_ap_list[i].geant_ip_sbp.ipv4_address) == new_sbp_data[i]["ipv4_address"]
)
assert (
str(subscription.geant_ip.geant_ip_ap_list[i].geant_ip_sbp.ipv6_address) == new_sbp_data[i]["ipv6_address"]
)
assert (
subscription.geant_ip.geant_ip_ap_list[i].geant_ip_sbp.custom_firewall_filters
== new_sbp_data[i]["custom_firewall_filters"]
)
assert (
subscription.geant_ip.geant_ip_ap_list[i].geant_ip_sbp.sbp_bgp_session_list[0].bfd_enabled
== new_sbp_data[i]["v4_bgp_peer"]["bfd_enabled"]
)
assert (
subscription.geant_ip.geant_ip_ap_list[i].geant_ip_sbp.sbp_bgp_session_list[0].bfd_interval
== new_sbp_data[i]["v4_bgp_peer"]["bfd_interval"]
)
assert (
subscription.geant_ip.geant_ip_ap_list[i].geant_ip_sbp.sbp_bgp_session_list[0].bfd_multiplier
== new_sbp_data[i]["v4_bgp_peer"]["bfd_multiplier"]
)
assert (
subscription.geant_ip.geant_ip_ap_list[i].geant_ip_sbp.sbp_bgp_session_list[0].has_custom_policies
== new_sbp_data[i]["v4_bgp_peer"]["has_custom_policies"]
)
assert (
subscription.geant_ip.geant_ip_ap_list[i].geant_ip_sbp.sbp_bgp_session_list[0].authentication_key
== new_sbp_data[i]["v4_bgp_peer"]["authentication_key"]
)
assert (
subscription.geant_ip.geant_ip_ap_list[i].geant_ip_sbp.sbp_bgp_session_list[0].multipath_enabled
== new_sbp_data[i]["v4_bgp_peer"]["multipath_enabled"]
)
assert (
subscription.geant_ip.geant_ip_ap_list[i].geant_ip_sbp.sbp_bgp_session_list[0].send_default_route
== new_sbp_data[i]["v4_bgp_peer"]["send_default_route"]
)
assert (
subscription.geant_ip.geant_ip_ap_list[i].geant_ip_sbp.sbp_bgp_session_list[0].is_passive
== new_sbp_data[i]["v4_bgp_peer"]["is_passive"]
)
assert (
str(subscription.geant_ip.geant_ip_ap_list[i].geant_ip_sbp.sbp_bgp_session_list[0].peer_address)
== new_sbp_data[i]["v4_bgp_peer"]["peer_address"]
)
assert (
bool(
IPFamily.V4MULTICAST
in subscription.geant_ip.geant_ip_ap_list[i].geant_ip_sbp.sbp_bgp_session_list[0].families
)
== new_sbp_data[i]["v4_bgp_peer"]["add_v4_multicast"]
)
assert (
subscription.geant_ip.geant_ip_ap_list[i].geant_ip_sbp.sbp_bgp_session_list[1].bfd_enabled
== new_sbp_data[i]["v6_bgp_peer"]["bfd_enabled"]
)
assert (
subscription.geant_ip.geant_ip_ap_list[i].geant_ip_sbp.sbp_bgp_session_list[1].bfd_interval
== new_sbp_data[i]["v6_bgp_peer"]["bfd_interval"]
)
assert (
subscription.geant_ip.geant_ip_ap_list[i].geant_ip_sbp.sbp_bgp_session_list[1].bfd_multiplier
== new_sbp_data[i]["v6_bgp_peer"]["bfd_multiplier"]
)
assert (
subscription.geant_ip.geant_ip_ap_list[i].geant_ip_sbp.sbp_bgp_session_list[1].has_custom_policies
== new_sbp_data[i]["v6_bgp_peer"]["has_custom_policies"]
)
assert (
subscription.geant_ip.geant_ip_ap_list[i].geant_ip_sbp.sbp_bgp_session_list[1].authentication_key
== new_sbp_data[i]["v6_bgp_peer"]["authentication_key"]
)
assert (
subscription.geant_ip.geant_ip_ap_list[i].geant_ip_sbp.sbp_bgp_session_list[1].multipath_enabled
== new_sbp_data[i]["v6_bgp_peer"]["multipath_enabled"]
)
assert (
subscription.geant_ip.geant_ip_ap_list[i].geant_ip_sbp.sbp_bgp_session_list[1].send_default_route
== new_sbp_data[i]["v6_bgp_peer"]["send_default_route"]
)
assert (
subscription.geant_ip.geant_ip_ap_list[i].geant_ip_sbp.sbp_bgp_session_list[1].is_passive
== new_sbp_data[i]["v6_bgp_peer"]["is_passive"]
)
assert (
str(subscription.geant_ip.geant_ip_ap_list[i].geant_ip_sbp.sbp_bgp_session_list[1].peer_address)
== new_sbp_data[i]["v6_bgp_peer"]["peer_address"]
)
assert (
bool(
IPFamily.V6MULTICAST
in subscription.geant_ip.geant_ip_ap_list[i].geant_ip_sbp.sbp_bgp_session_list[1].families
)
== new_sbp_data[i]["v6_bgp_peer"]["add_v6_multicast"]
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment