Skip to content
Snippets Groups Projects
Verified Commit bd21286a authored by Karel van Klink's avatar Karel van Klink :smiley_cat:
Browse files

Update LSO interactions

parent 4a1f1dbd
No related branches found
No related tags found
1 merge request!286Add Edge Port, GÉANT IP and IAS products
......@@ -307,7 +307,7 @@ def _import_partners_from_csv(file_path: Path) -> list[dict]:
def _generic_import_product(
file_path: Path, imported_product_type: ProductType, workflow_suffix: str, name_key: str, import_model: type[T]
file_path: Path, imported_product_type: ProductType, workflow_suffix: str, name_key: str, import_model: type[T]
) -> None:
"""Import subscriptions from a JSON or YAML file."""
successfully_imported_data = []
......@@ -553,4 +553,3 @@ def import_geant_ip(filepath: str = common_filepath_option) -> None:
typer.echo("Successfully created imported GEANT IPs:")
for item in successfully_imported_data:
typer.echo(f"- {item}")
......@@ -44,7 +44,7 @@ class ServiceBindingPortProvisioning(ServiceBindingPortInactive, lifecycle=[Subs
custom_firewall_filters: bool
geant_sid: str
sbp_bgp_session_list: list[BGPSessionProvisioning] # type: ignore[assignment]
edge_port: EdgePortBlockProvisioning # type: ignore[assignment]
edge_port: EdgePortBlockProvisioning
class ServiceBindingPort(ServiceBindingPortProvisioning, lifecycle=[SubscriptionLifecycle.ACTIVE]):
......@@ -67,4 +67,4 @@ class ServiceBindingPort(ServiceBindingPortProvisioning, lifecycle=[Subscription
#: The :term:`BGP` sessions associated with this service binding port.
sbp_bgp_session_list: list[BGPSession] # type: ignore[assignment]
#: The Edge Port on which this :term:`SBP` resides.
edge_port: EdgePortBlock # type: ignore[assignment]
edge_port: EdgePortBlock
......@@ -19,8 +19,7 @@ from pynetbox.models.dcim import Interfaces
from gso.products.product_blocks.edge_port import EdgePortAEMemberBlockInactive, EdgePortType, EncapsulationType
from gso.products.product_types.edge_port import EdgePortInactive, EdgePortProvisioning
from gso.products.product_types.router import Router
from gso.services import lso_client
from gso.services.lso_client import lso_interaction
from gso.services.lso_client import LSOState, lso_interaction
from gso.services.netbox_client import NetboxClient
from gso.services.partners import get_partner_by_id
from gso.utils.helpers import (
......@@ -185,9 +184,7 @@ def allocate_interfaces_in_netbox(subscription: EdgePortProvisioning) -> None:
@step("[DRY RUN] Create edge port")
def create_edge_port_dry(
subscription: dict[str, Any], callback_route: str, tt_number: str, process_id: UUIDstr
) -> None:
def create_edge_port_dry(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> LSOState:
"""Create a new edge port in the network as a dry run."""
extra_vars = {
"dry_run": True,
......@@ -196,18 +193,15 @@ def create_edge_port_dry(
"verb": "create",
}
lso_client.execute_playbook(
playbook_name="edge_port.yaml",
callback_route=callback_route,
inventory=subscription["edge_port"]["edge_port_node"]["router_fqdn"],
extra_vars=extra_vars,
)
return {
"playbook_name": "edge_port.yaml",
"inventory": {"all": {"hosts": {subscription["edge_port"]["edge_port_node"]["router_fqdn"]: None}}},
"extra_vars": extra_vars,
}
@step("[FOR REAL] Create edge port")
def create_edge_port_real(
subscription: dict[str, Any], callback_route: str, tt_number: str, process_id: UUIDstr
) -> None:
def create_edge_port_real(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> LSOState:
"""Create a new edge port in the network for real."""
extra_vars = {
"dry_run": False,
......@@ -216,12 +210,11 @@ def create_edge_port_real(
"verb": "create",
}
lso_client.execute_playbook(
playbook_name="edge_port.yaml",
callback_route=callback_route,
inventory=subscription["edge_port"]["edge_port_node"]["router_fqdn"],
extra_vars=extra_vars,
)
return {
"playbook_name": "edge_port.yaml",
"inventory": {"all": {"hosts": {subscription["edge_port"]["edge_port_node"]["router_fqdn"]: None}}},
"extra_vars": extra_vars,
}
@workflow(
......
......@@ -11,12 +11,12 @@ from orchestrator.workflow import StepList, begin, conditional, done, step
from orchestrator.workflows.steps import resync, store_process_subscription, unsync
from orchestrator.workflows.utils import wrap_modify_initial_input_form
from pydantic import AfterValidator, ConfigDict, model_validator
from pydantic_forms.types import FormGenerator, UUIDstr
from pydantic_forms.types import FormGenerator, State, UUIDstr
from pydantic_forms.validators import ReadOnlyField, validate_unique_list
from gso.products.product_blocks.edge_port import EdgePortAEMemberBlock, EncapsulationType
from gso.products.product_types.edge_port import EdgePort
from gso.services.lso_client import execute_playbook, lso_interaction
from gso.services.lso_client import LSOState, lso_interaction
from gso.services.netbox_client import NetboxClient
from gso.services.partners import get_partner_by_id
from gso.utils.helpers import (
......@@ -122,7 +122,7 @@ def modify_edge_port_subscription(
ae_members: list[dict[str, str]],
ignore_if_down: bool, # noqa: FBT001
description: str | None = None,
) -> dict[str, Any]:
) -> State:
"""Modify the edge port subscription with the given parameters."""
previous_ae_members = [
{
......@@ -158,10 +158,8 @@ def modify_edge_port_subscription(
@step("Update interfaces in NetBox")
def update_interfaces_in_netbox(
subscription: EdgePort,
removed_ae_members: list[dict],
previous_ae_members: list[dict],
) -> dict[str, Any]:
subscription: EdgePort, removed_ae_members: list[dict], previous_ae_members: list[dict]
) -> State:
"""Update the interfaces in NetBox."""
nbclient = NetboxClient()
# Free removed interfaces
......@@ -186,12 +184,8 @@ def update_interfaces_in_netbox(
@step("[DRY RUN] Update edge port configuration.")
def update_edge_port_dry(
subscription: dict[str, Any],
process_id: UUIDstr,
tt_number: str,
callback_route: str,
removed_ae_members: list[dict],
) -> dict[str, Any]:
subscription: dict[str, Any], process_id: UUIDstr, tt_number: str, removed_ae_members: list[dict]
) -> LSOState:
"""Perform a dry run of updating the edge port configuration."""
extra_vars = {
"subscription": subscription,
......@@ -203,24 +197,18 @@ def update_edge_port_dry(
"removed_ae_members": removed_ae_members,
}
execute_playbook(
playbook_name="edge_ports.yaml",
callback_route=callback_route,
inventory=subscription["edge_port"]["edge_port_node"]["router_fqdn"],
extra_vars=extra_vars,
)
return {"subscription": subscription}
return {
"playbook_name": "edge_ports.yaml",
"inventory": {"all": {"hosts": {subscription["edge_port"]["edge_port_node"]["router_fqdn"]}}},
"extra_vars": extra_vars,
"subscription": subscription,
}
@step("[FOR REAL] Update edge port configuration.")
def update_edge_port_real(
subscription: dict[str, Any],
process_id: UUIDstr,
tt_number: str,
callback_route: str,
removed_ae_members: list[str],
) -> dict[str, Any]:
subscription: dict[str, Any], process_id: UUIDstr, tt_number: str, removed_ae_members: list[str]
) -> LSOState:
"""Update the edge port configuration."""
extra_vars = {
"subscription": subscription,
......@@ -232,14 +220,12 @@ def update_edge_port_real(
"removed_ae_members": removed_ae_members,
}
execute_playbook(
playbook_name="edge_ports.yaml",
callback_route=callback_route,
inventory=subscription["edge_port"]["edge_port_node"]["router_fqdn"],
extra_vars=extra_vars,
)
return {"subscription": subscription}
return {
"subscription": subscription,
"playbook_name": "edge_ports.yaml",
"inventory": {"all": {"hosts": {subscription["edge_port"]["edge_port_node"]["router_fqdn"]: None}}},
"extra_vars": extra_vars,
}
@step("Allocate/Deallocate interfaces in NetBox")
......
......@@ -12,7 +12,7 @@ from orchestrator.workflows.utils import wrap_modify_initial_input_form
from pydantic_forms.types import FormGenerator, UUIDstr
from gso.products.product_types.edge_port import EdgePort
from gso.services.lso_client import execute_playbook, lso_interaction
from gso.services.lso_client import LSOState, lso_interaction
from gso.services.netbox_client import NetboxClient
from gso.utils.types.tt_number import TTNumber
......@@ -28,9 +28,7 @@ def initial_input_form_generator() -> FormGenerator:
@step("[DRY RUN] Remove Edge Port")
def remove_edge_port_dry(
subscription: dict[str, Any], tt_number: str, process_id: UUIDstr, callback_route: str
) -> dict[str, Any]:
def remove_edge_port_dry(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> dict[str, Any]:
"""Remove an edge port from the network."""
extra_vars = {
"subscription": subscription,
......@@ -39,19 +37,16 @@ def remove_edge_port_dry(
"commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Delete Edge Port",
}
execute_playbook(
playbook_name="edge_port.yaml",
callback_route=callback_route,
inventory=subscription["edge_port"]["edge_port_node"]["router_fqdn"],
extra_vars=extra_vars,
)
return {"subscription": subscription}
return {
"subscription": subscription,
"playbook_name": "edge_port.yaml",
"inventory": {"all": {"hosts": {subscription["edge_port"]["edge_port_node"]["router_fqdn"]: None}}},
"extra_vars": extra_vars,
}
@step("[FOR REAL] Remove Edge Port")
def remove_edge_port_real(
subscription: dict[str, Any], tt_number: str, process_id: UUIDstr, callback_route: str
) -> None:
def remove_edge_port_real(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> LSOState:
"""Remove an edge port from the network."""
extra_vars = {
"subscription": subscription,
......@@ -60,12 +55,12 @@ def remove_edge_port_real(
"commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Delete Edge Port",
}
execute_playbook(
playbook_name="edge_port.yaml",
callback_route=callback_route,
inventory=subscription["edge_port"]["edge_port_node"]["router_fqdn"],
extra_vars=extra_vars,
)
return {
"subscription": subscription,
"playbook_name": "edge_port.yaml",
"inventory": {"all": {"hosts": {subscription["edge_port"]["edge_port_node"]["router_fqdn"]: None}}},
"extra_vars": extra_vars,
}
@step("Netbox Clean Up")
......
......@@ -10,7 +10,7 @@ from orchestrator.workflows.steps import resync, store_process_subscription
from orchestrator.workflows.utils import wrap_modify_initial_input_form
from gso.products.product_types.edge_port import EdgePort
from gso.services.lso_client import anonymous_lso_interaction, execute_playbook
from gso.services.lso_client import LSOState, anonymous_lso_interaction
from gso.services.netbox_client import NetboxClient
......@@ -56,19 +56,18 @@ def verify_netbox_entries(subscription: EdgePort) -> None:
@step("Check base config for drift")
def verify_base_config(subscription: dict[str, Any], callback_route: str) -> None:
def verify_base_config(subscription: dict[str, Any]) -> LSOState:
"""Workflow step for running a playbook that checks whether base config has drifted."""
execute_playbook(
playbook_name="edge_port.yaml",
callback_route=callback_route,
inventory=subscription["edge_port"]["edge_port_node"]["router_fqdn"],
extra_vars={
return {
"playbook_name": "edge_port.yaml",
"inventory": {"all": {"hosts": {subscription["edge_port"]["edge_port_node"]["router_fqdn"]: None}}},
"extra_vars": {
"dry_run": True,
"subscription": subscription,
"verb": "create",
"is_verification_workflow": "true",
},
)
}
@workflow(
......
......@@ -18,7 +18,7 @@ from gso.products.product_blocks.geant_ip import NRENAccessPortInactive
from gso.products.product_blocks.service_binding_port import VLAN_ID, ServiceBindingPort
from gso.products.product_types.edge_port import EdgePort
from gso.products.product_types.geant_ip import GeantIPInactive
from gso.services.lso_client import execute_playbook, lso_interaction
from gso.services.lso_client import LSOState, lso_interaction
from gso.utils.helpers import (
active_edge_port_selector,
partner_choice,
......@@ -173,12 +173,8 @@ def initialize_subscription(
@step("[DRY RUN] Deploy service binding port")
def provision_sbp_dry(
subscription: dict[str, Any],
callback_route: str,
process_id: UUIDstr,
tt_number: str,
edge_port_fqdn_list: list[str],
) -> None:
subscription: dict[str, Any], process_id: UUIDstr, tt_number: str, edge_port_fqdn_list: list[str]
) -> LSOState:
"""Perform a dry run of deploying Service Binding Ports."""
extra_vars = {
"subscription": subscription,
......@@ -188,22 +184,17 @@ def provision_sbp_dry(
f"Deploy config for {subscription["description"]}",
}
execute_playbook(
playbook_name="manage_sbp.yaml",
callback_route=callback_route,
inventory="\n".join(edge_port_fqdn_list),
extra_vars=extra_vars,
)
return {
"playbook_name": "manage_sbp.yaml",
"inventory": {"all": {"hosts": dict.fromkeys(edge_port_fqdn_list)}},
"extra_vars": extra_vars,
}
@step("[FOR REAL] Deploy service binding port")
def provision_sbp_real(
subscription: dict[str, Any],
callback_route: str,
process_id: UUIDstr,
tt_number: str,
edge_port_fqdn_list: list[str],
) -> None:
subscription: dict[str, Any], process_id: UUIDstr, tt_number: str, edge_port_fqdn_list: list[str]
) -> LSOState:
"""Deploy Service Binding Ports."""
extra_vars = {
"subscription": subscription,
......@@ -213,35 +204,29 @@ def provision_sbp_real(
f"Deploy config for {subscription["description"]}",
}
execute_playbook(
playbook_name="manage_sbp.yaml",
callback_route=callback_route,
inventory="\n".join(edge_port_fqdn_list),
extra_vars=extra_vars,
)
return {
"playbook_name": "manage_sbp.yaml",
"inventory": {"all": {"hosts": dict.fromkeys(edge_port_fqdn_list)}},
"extra_vars": extra_vars,
}
@step("Check service binding port functionality")
def check_sbp_functionality(subscription: dict[str, Any], callback_route: str, edge_port_fqdn_list: list[str]) -> None:
def check_sbp_functionality(subscription: dict[str, Any], edge_port_fqdn_list: list[str]) -> LSOState:
"""Check functionality of deployed Service Binding Ports."""
extra_vars = {"subscription": subscription, "verb": "check"}
execute_playbook(
playbook_name="manage_sbp.yaml",
callback_route=callback_route,
inventory="\n".join(edge_port_fqdn_list),
extra_vars=extra_vars,
)
return {
"playbook_name": "manage_sbp.yaml",
"inventory": {"all": {"hosts": dict.fromkeys(edge_port_fqdn_list)}},
"extra_vars": extra_vars,
}
@step("[DRY RUN] Deploy BGP peers")
def deploy_bgp_peers_dry(
subscription: dict[str, Any],
callback_route: str,
edge_port_fqdn_list: list[str],
tt_number: str,
process_id: UUIDstr,
) -> None:
subscription: dict[str, Any], edge_port_fqdn_list: list[str], tt_number: str, process_id: UUIDstr
) -> LSOState:
"""Perform a dry run of deploying :term:`BGP` peers."""
extra_vars = {
"subscription": subscription,
......@@ -251,22 +236,17 @@ def deploy_bgp_peers_dry(
f"Deploying BGP peers for {subscription["description"]}",
}
execute_playbook(
playbook_name="manage_bgp_peers.yaml",
callback_route=callback_route,
inventory="\n".join(edge_port_fqdn_list),
extra_vars=extra_vars,
)
return {
"playbook_name": "manage_sbp.yaml",
"inventory": {"all": {"hosts": dict.fromkeys(edge_port_fqdn_list)}},
"extra_vars": extra_vars,
}
@step("[FOR REAL] Deploy BGP peers")
def deploy_bgp_peers_real(
subscription: dict[str, Any],
callback_route: str,
edge_port_fqdn_list: list[str],
tt_number: str,
process_id: UUIDstr,
) -> None:
subscription: dict[str, Any], edge_port_fqdn_list: list[str], tt_number: str, process_id: UUIDstr
) -> LSOState:
"""Deploy :term:`BGP` peers."""
extra_vars = {
"subscription": subscription,
......@@ -276,25 +256,23 @@ def deploy_bgp_peers_real(
f"Deploying BGP peers for {subscription["description"]}",
}
execute_playbook(
playbook_name="manage_bgp_peers.yaml",
callback_route=callback_route,
inventory="\n".join(edge_port_fqdn_list),
extra_vars=extra_vars,
)
return {
"playbook_name": "manage_sbp.yaml",
"inventory": {"all": {"hosts": dict.fromkeys(edge_port_fqdn_list)}},
"extra_vars": extra_vars,
}
@step("Check BGP peers")
def check_bgp_peers(subscription: dict[str, Any], callback_route: str, edge_port_fqdn_list: list[str]) -> None:
def check_bgp_peers(subscription: dict[str, Any], edge_port_fqdn_list: list[str]) -> LSOState:
"""Check correct deployment of :term:`BGP` peers."""
extra_vars = {"subscription": subscription, "verb": "check"}
execute_playbook(
playbook_name="manage_bgp_peers.yaml",
callback_route=callback_route,
inventory="\n".join(edge_port_fqdn_list),
extra_vars=extra_vars,
)
return {
"playbook_name": "manage_sbp.yaml",
"inventory": {"all": {"hosts": dict.fromkeys(edge_port_fqdn_list)}},
"extra_vars": extra_vars,
}
@step("Update Infoblox")
......
"""A creation workflow for adding an existing GEANT IP to the service database."""
from uuid import uuid4
from orchestrator import workflow
......@@ -10,10 +11,11 @@ from orchestrator.workflows.steps import resync, set_status, store_process_subsc
from pydantic import BaseModel
from pydantic_forms.types import UUIDstr
from gso.products import EdgePort, ProductName
from gso.products.product_blocks.bgp_session import IPFamily, BGPSession
from gso.products import ProductName
from gso.products.product_blocks.bgp_session import BGPSession, IPFamily
from gso.products.product_blocks.geant_ip import NRENAccessPortInactive
from gso.products.product_blocks.service_binding_port import VLAN_ID, ServiceBindingPort
from gso.products.product_types.edge_port import EdgePort
from gso.products.product_types.geant_ip import ImportedGeantIPInactive
from gso.services.partners import get_partner_by_name
from gso.services.subscriptions import get_product_id_by_name
......@@ -79,8 +81,7 @@ def initialize_subscription(subscription: ImportedGeantIPInactive, initial_input
for service_binding_port in initial_input["service_binding_ports"]:
edge_port_subscription = EdgePort.from_subscription(service_binding_port["edge_port"])
sbp_bgp_session_list = [
BGPSession.new(subscription_id=uuid4(), **session)
for session in service_binding_port["bgp_peers"]
BGPSession.new(subscription_id=uuid4(), **session) for session in service_binding_port["bgp_peers"]
]
ServiceBindingPort.new(
subscription_id=uuid4(),
......@@ -109,11 +110,11 @@ def initialize_subscription(subscription: ImportedGeantIPInactive, initial_input
def create_imported_geant_ip() -> StepList:
"""Import a GÉANT IP without provisioning it."""
return (
begin
>> create_subscription
>> store_process_subscription(Target.CREATE)
>> initialize_subscription
>> set_status(SubscriptionLifecycle.ACTIVE)
>> resync
>> done
begin
>> create_subscription
>> store_process_subscription(Target.CREATE)
>> initialize_subscription
>> set_status(SubscriptionLifecycle.ACTIVE)
>> resync
>> done
)
......@@ -7,9 +7,11 @@ from orchestrator import begin, conditional, done, step, workflow
from orchestrator.forms import FormPage
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, UUIDstr
from orchestrator.workflows.steps import State, resync, store_process_subscription, unsync
from orchestrator.workflow import StepList
from orchestrator.workflows.steps import resync, store_process_subscription, unsync
from orchestrator.workflows.utils import wrap_modify_initial_input_form
from pydantic import AfterValidator, BaseModel, ConfigDict, Field, computed_field
from pydantic_forms.types import State
from pydantic_forms.validators import Divider, Label
from gso.products.product_blocks.bgp_session import BGPSession, IPFamily
......@@ -106,7 +108,7 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
if str(access_port.geant_ip_sbp.edge_port.owner_subscription_id) in input_ep_list
]
added_ap_list = [
(ep, next((ap.nren_ap_type for ap in input_ap_list if str(ap.geant_ip_ep) == ep), None))
(ep, next(ap.nren_ap_type for ap in input_ap_list if str(ap.geant_ip_ep) == ep))
for ep in input_ep_list
if ep not in existing_ep_list
]
......@@ -116,8 +118,8 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
for access_port_index, ap_entry in enumerate(modified_ap_list):
access_port, new_ap_type = ap_entry
current_sbp = access_port.geant_ip_sbp
v4_peer = next((peer for peer in current_sbp.sbp_bgp_session_list if IPFamily.V4UNICAST in peer.families), None)
v6_peer = next((peer for peer in current_sbp.sbp_bgp_session_list if IPFamily.V6UNICAST in peer.families), None)
v4_peer = next(peer for peer in current_sbp.sbp_bgp_session_list if IPFamily.V4UNICAST in peer.families)
v6_peer = next(peer for peer in current_sbp.sbp_bgp_session_list if IPFamily.V6UNICAST in peer.families)
class BindingPortModificationForm(FormPage):
model_config = ConfigDict(
......@@ -131,9 +133,11 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
geant_sid: str = current_sbp.geant_sid
is_tagged: bool = current_sbp.is_tagged
vlan_id: VLAN_ID = current_sbp.vlan_id
ipv4_address: IPv4AddressType = current_sbp.ipv4_address
ipv6_address: IPv6AddressType = current_sbp.ipv6_address
# The SBP model doesn't require these three fields, but in the case of GÉANT IP this will never occur since
# it's a layer 3 service. The ignore statements are there to put our type checker at ease.
vlan_id: VLAN_ID = current_sbp.vlan_id # type: ignore[assignment]
ipv4_address: IPv4AddressType = current_sbp.ipv4_address # type: ignore[assignment]
ipv6_address: IPv6AddressType = current_sbp.ipv6_address # type: ignore[assignment]
custom_firewall_filters: bool = current_sbp.custom_firewall_filters
divider: Divider = Field(None, exclude=True)
v4_bgp_peer: IPv4BGPPeer = IPv4BGPPeer(
......@@ -156,8 +160,8 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
# Second, newly added Edge Ports are configured
binding_port_inputs = []
for ap_index, access_port in enumerate(added_ap_list):
edge_port_id, ap_type = access_port
for ap_index, access_port_tuple in enumerate(added_ap_list):
edge_port_id, ap_type = access_port_tuple
class BindingPortInputForm(FormPage):
model_config = ConfigDict(
......@@ -204,7 +208,7 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
@step("Clean up removed Edge Ports")
def remove_old_sbp_blocks(subscription: GeantIP, removed_access_ports: list[UUIDstr]):
def remove_old_sbp_blocks(subscription: GeantIP, removed_access_ports: list[UUIDstr]) -> State:
"""Remove old :term:`SBP` product blocks from the GÉANT IP subscription."""
subscription.geant_ip.geant_ip_ap_list = [
ap
......@@ -221,15 +225,14 @@ def modify_existing_sbp_blocks(subscription: GeantIP, modified_sbp_list: list[di
for access_port in subscription.geant_ip.geant_ip_ap_list:
current_sbp = access_port.geant_ip_sbp
modified_sbp_data = next(
(sbp for sbp in modified_sbp_list if sbp["current_sbp_id"] == str(current_sbp.subscription_instance_id)),
None,
sbp for sbp in modified_sbp_list if sbp["current_sbp_id"] == str(current_sbp.subscription_instance_id)
)
v4_peer = next((peer for peer in current_sbp.sbp_bgp_session_list if IPFamily.V4UNICAST in peer.families), None)
v4_peer = next(peer for peer in current_sbp.sbp_bgp_session_list if IPFamily.V4UNICAST in peer.families)
for attribute in modified_sbp_data["v4_bgp_peer"]:
setattr(v4_peer, attribute, modified_sbp_data["v4_bgp_peer"][attribute])
v6_peer = next((peer for peer in current_sbp.sbp_bgp_session_list if IPFamily.V6UNICAST in peer.families), None)
v6_peer = next(peer for peer in current_sbp.sbp_bgp_session_list if IPFamily.V6UNICAST in peer.families)
for attribute in modified_sbp_data["v6_bgp_peer"]:
setattr(v6_peer, attribute, modified_sbp_data["v6_bgp_peer"][attribute])
......@@ -246,7 +249,7 @@ def modify_existing_sbp_blocks(subscription: GeantIP, modified_sbp_list: list[di
@step("Instantiate new Service Binding Ports")
def create_new_sbp_blocks(subscription: GeantIP, added_service_binding_ports: list[dict[str, Any]]):
def create_new_sbp_blocks(subscription: GeantIP, added_service_binding_ports: list[dict[str, Any]]) -> State:
"""Add new :term:`SBP`s to the GÉANT IP subscription."""
for sbp_input in added_service_binding_ports:
edge_port = EdgePort.from_subscription(sbp_input["edge_port_id"])
......@@ -277,7 +280,7 @@ def create_new_sbp_blocks(subscription: GeantIP, added_service_binding_ports: li
initial_input_form=wrap_modify_initial_input_form(initial_input_form_generator),
target=Target.MODIFY,
)
def modify_geant_ip():
def modify_geant_ip() -> StepList:
"""Modify a GÉANT IP subscription."""
access_ports_are_removed = conditional(lambda state: bool(len(state["removed_access_ports"]) > 0))
access_ports_are_modified = conditional(lambda state: bool(len(state["modified_sbp_list"]) > 0))
......
......@@ -49,4 +49,5 @@ def test_create_geant_ip_success(
]
result, process_stat, step_log = run_workflow("create_geant_ip", form_input_data)
assert process_stat, step_log
assert_complete(result)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment