diff --git a/gso/cli/imports.py b/gso/cli/imports.py index 71c3c03a1703894df6ce0f454d36acbc2061ce37..584cbce839e0af13b0ca785e2b9592ee035e5771 100644 --- a/gso/cli/imports.py +++ b/gso/cli/imports.py @@ -307,7 +307,7 @@ def _import_partners_from_csv(file_path: Path) -> list[dict]: def _generic_import_product( - file_path: Path, imported_product_type: ProductType, workflow_suffix: str, name_key: str, import_model: type[T] + file_path: Path, imported_product_type: ProductType, workflow_suffix: str, name_key: str, import_model: type[T] ) -> None: """Import subscriptions from a JSON or YAML file.""" successfully_imported_data = [] @@ -553,4 +553,3 @@ def import_geant_ip(filepath: str = common_filepath_option) -> None: typer.echo("Successfully created imported GEANT IPs:") for item in successfully_imported_data: typer.echo(f"- {item}") - diff --git a/gso/products/product_blocks/service_binding_port.py b/gso/products/product_blocks/service_binding_port.py index 44423f04480459e6ddc752d15da18b124f5f671a..1aaf0862ec8b9a8b636aa0f3a3da196d850498c7 100644 --- a/gso/products/product_blocks/service_binding_port.py +++ b/gso/products/product_blocks/service_binding_port.py @@ -44,7 +44,7 @@ class ServiceBindingPortProvisioning(ServiceBindingPortInactive, lifecycle=[Subs custom_firewall_filters: bool geant_sid: str sbp_bgp_session_list: list[BGPSessionProvisioning] # type: ignore[assignment] - edge_port: EdgePortBlockProvisioning # type: ignore[assignment] + edge_port: EdgePortBlockProvisioning class ServiceBindingPort(ServiceBindingPortProvisioning, lifecycle=[SubscriptionLifecycle.ACTIVE]): @@ -67,4 +67,4 @@ class ServiceBindingPort(ServiceBindingPortProvisioning, lifecycle=[Subscription #: The :term:`BGP` sessions associated with this service binding port. sbp_bgp_session_list: list[BGPSession] # type: ignore[assignment] #: The Edge Port on which this :term:`SBP` resides. - edge_port: EdgePortBlock # type: ignore[assignment] + edge_port: EdgePortBlock diff --git a/gso/workflows/edge_port/create_edge_port.py b/gso/workflows/edge_port/create_edge_port.py index 399013cd7f2186136073e5556e030b1985376078..c6b3eaf8850a27db098eb104ee23c2a5303242c7 100644 --- a/gso/workflows/edge_port/create_edge_port.py +++ b/gso/workflows/edge_port/create_edge_port.py @@ -19,8 +19,7 @@ from pynetbox.models.dcim import Interfaces from gso.products.product_blocks.edge_port import EdgePortAEMemberBlockInactive, EdgePortType, EncapsulationType from gso.products.product_types.edge_port import EdgePortInactive, EdgePortProvisioning from gso.products.product_types.router import Router -from gso.services import lso_client -from gso.services.lso_client import lso_interaction +from gso.services.lso_client import LSOState, lso_interaction from gso.services.netbox_client import NetboxClient from gso.services.partners import get_partner_by_id from gso.utils.helpers import ( @@ -185,9 +184,7 @@ def allocate_interfaces_in_netbox(subscription: EdgePortProvisioning) -> None: @step("[DRY RUN] Create edge port") -def create_edge_port_dry( - subscription: dict[str, Any], callback_route: str, tt_number: str, process_id: UUIDstr -) -> None: +def create_edge_port_dry(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> LSOState: """Create a new edge port in the network as a dry run.""" extra_vars = { "dry_run": True, @@ -196,18 +193,15 @@ def create_edge_port_dry( "verb": "create", } - lso_client.execute_playbook( - playbook_name="edge_port.yaml", - callback_route=callback_route, - inventory=subscription["edge_port"]["edge_port_node"]["router_fqdn"], - extra_vars=extra_vars, - ) + return { + "playbook_name": "edge_port.yaml", + "inventory": {"all": {"hosts": {subscription["edge_port"]["edge_port_node"]["router_fqdn"]: None}}}, + "extra_vars": extra_vars, + } @step("[FOR REAL] Create edge port") -def create_edge_port_real( - subscription: dict[str, Any], callback_route: str, tt_number: str, process_id: UUIDstr -) -> None: +def create_edge_port_real(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> LSOState: """Create a new edge port in the network for real.""" extra_vars = { "dry_run": False, @@ -216,12 +210,11 @@ def create_edge_port_real( "verb": "create", } - lso_client.execute_playbook( - playbook_name="edge_port.yaml", - callback_route=callback_route, - inventory=subscription["edge_port"]["edge_port_node"]["router_fqdn"], - extra_vars=extra_vars, - ) + return { + "playbook_name": "edge_port.yaml", + "inventory": {"all": {"hosts": {subscription["edge_port"]["edge_port_node"]["router_fqdn"]: None}}}, + "extra_vars": extra_vars, + } @workflow( diff --git a/gso/workflows/edge_port/modify_edge_port.py b/gso/workflows/edge_port/modify_edge_port.py index 32251212800e90f8b5dd00d41e90610a6f3bf478..4775b64f3f00c3d0bf1d144dbfce49fb3959a6c2 100644 --- a/gso/workflows/edge_port/modify_edge_port.py +++ b/gso/workflows/edge_port/modify_edge_port.py @@ -11,12 +11,12 @@ from orchestrator.workflow import StepList, begin, conditional, done, step from orchestrator.workflows.steps import resync, store_process_subscription, unsync from orchestrator.workflows.utils import wrap_modify_initial_input_form from pydantic import AfterValidator, ConfigDict, model_validator -from pydantic_forms.types import FormGenerator, UUIDstr +from pydantic_forms.types import FormGenerator, State, UUIDstr from pydantic_forms.validators import ReadOnlyField, validate_unique_list from gso.products.product_blocks.edge_port import EdgePortAEMemberBlock, EncapsulationType from gso.products.product_types.edge_port import EdgePort -from gso.services.lso_client import execute_playbook, lso_interaction +from gso.services.lso_client import LSOState, lso_interaction from gso.services.netbox_client import NetboxClient from gso.services.partners import get_partner_by_id from gso.utils.helpers import ( @@ -122,7 +122,7 @@ def modify_edge_port_subscription( ae_members: list[dict[str, str]], ignore_if_down: bool, # noqa: FBT001 description: str | None = None, -) -> dict[str, Any]: +) -> State: """Modify the edge port subscription with the given parameters.""" previous_ae_members = [ { @@ -158,10 +158,8 @@ def modify_edge_port_subscription( @step("Update interfaces in NetBox") def update_interfaces_in_netbox( - subscription: EdgePort, - removed_ae_members: list[dict], - previous_ae_members: list[dict], -) -> dict[str, Any]: + subscription: EdgePort, removed_ae_members: list[dict], previous_ae_members: list[dict] +) -> State: """Update the interfaces in NetBox.""" nbclient = NetboxClient() # Free removed interfaces @@ -186,12 +184,8 @@ def update_interfaces_in_netbox( @step("[DRY RUN] Update edge port configuration.") def update_edge_port_dry( - subscription: dict[str, Any], - process_id: UUIDstr, - tt_number: str, - callback_route: str, - removed_ae_members: list[dict], -) -> dict[str, Any]: + subscription: dict[str, Any], process_id: UUIDstr, tt_number: str, removed_ae_members: list[dict] +) -> LSOState: """Perform a dry run of updating the edge port configuration.""" extra_vars = { "subscription": subscription, @@ -203,24 +197,18 @@ def update_edge_port_dry( "removed_ae_members": removed_ae_members, } - execute_playbook( - playbook_name="edge_ports.yaml", - callback_route=callback_route, - inventory=subscription["edge_port"]["edge_port_node"]["router_fqdn"], - extra_vars=extra_vars, - ) - - return {"subscription": subscription} + return { + "playbook_name": "edge_ports.yaml", + "inventory": {"all": {"hosts": {subscription["edge_port"]["edge_port_node"]["router_fqdn"]}}}, + "extra_vars": extra_vars, + "subscription": subscription, + } @step("[FOR REAL] Update edge port configuration.") def update_edge_port_real( - subscription: dict[str, Any], - process_id: UUIDstr, - tt_number: str, - callback_route: str, - removed_ae_members: list[str], -) -> dict[str, Any]: + subscription: dict[str, Any], process_id: UUIDstr, tt_number: str, removed_ae_members: list[str] +) -> LSOState: """Update the edge port configuration.""" extra_vars = { "subscription": subscription, @@ -232,14 +220,12 @@ def update_edge_port_real( "removed_ae_members": removed_ae_members, } - execute_playbook( - playbook_name="edge_ports.yaml", - callback_route=callback_route, - inventory=subscription["edge_port"]["edge_port_node"]["router_fqdn"], - extra_vars=extra_vars, - ) - - return {"subscription": subscription} + return { + "subscription": subscription, + "playbook_name": "edge_ports.yaml", + "inventory": {"all": {"hosts": {subscription["edge_port"]["edge_port_node"]["router_fqdn"]: None}}}, + "extra_vars": extra_vars, + } @step("Allocate/Deallocate interfaces in NetBox") diff --git a/gso/workflows/edge_port/terminate_edge_port.py b/gso/workflows/edge_port/terminate_edge_port.py index 893a8c1ebdf53130694a8eca604632e015d0884c..9aecae2045a07285b53c5c6eed3bef1cf0b6b78f 100644 --- a/gso/workflows/edge_port/terminate_edge_port.py +++ b/gso/workflows/edge_port/terminate_edge_port.py @@ -12,7 +12,7 @@ from orchestrator.workflows.utils import wrap_modify_initial_input_form from pydantic_forms.types import FormGenerator, UUIDstr from gso.products.product_types.edge_port import EdgePort -from gso.services.lso_client import execute_playbook, lso_interaction +from gso.services.lso_client import LSOState, lso_interaction from gso.services.netbox_client import NetboxClient from gso.utils.types.tt_number import TTNumber @@ -28,9 +28,7 @@ def initial_input_form_generator() -> FormGenerator: @step("[DRY RUN] Remove Edge Port") -def remove_edge_port_dry( - subscription: dict[str, Any], tt_number: str, process_id: UUIDstr, callback_route: str -) -> dict[str, Any]: +def remove_edge_port_dry(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> dict[str, Any]: """Remove an edge port from the network.""" extra_vars = { "subscription": subscription, @@ -39,19 +37,16 @@ def remove_edge_port_dry( "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Delete Edge Port", } - execute_playbook( - playbook_name="edge_port.yaml", - callback_route=callback_route, - inventory=subscription["edge_port"]["edge_port_node"]["router_fqdn"], - extra_vars=extra_vars, - ) - return {"subscription": subscription} + return { + "subscription": subscription, + "playbook_name": "edge_port.yaml", + "inventory": {"all": {"hosts": {subscription["edge_port"]["edge_port_node"]["router_fqdn"]: None}}}, + "extra_vars": extra_vars, + } @step("[FOR REAL] Remove Edge Port") -def remove_edge_port_real( - subscription: dict[str, Any], tt_number: str, process_id: UUIDstr, callback_route: str -) -> None: +def remove_edge_port_real(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> LSOState: """Remove an edge port from the network.""" extra_vars = { "subscription": subscription, @@ -60,12 +55,12 @@ def remove_edge_port_real( "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Delete Edge Port", } - execute_playbook( - playbook_name="edge_port.yaml", - callback_route=callback_route, - inventory=subscription["edge_port"]["edge_port_node"]["router_fqdn"], - extra_vars=extra_vars, - ) + return { + "subscription": subscription, + "playbook_name": "edge_port.yaml", + "inventory": {"all": {"hosts": {subscription["edge_port"]["edge_port_node"]["router_fqdn"]: None}}}, + "extra_vars": extra_vars, + } @step("Netbox Clean Up") diff --git a/gso/workflows/edge_port/validate_edge_port.py b/gso/workflows/edge_port/validate_edge_port.py index 81714721ee2cb411da22270a854b5b47edee32ac..b9561d4ca2f3af6e57db8051124a28bdf654e636 100644 --- a/gso/workflows/edge_port/validate_edge_port.py +++ b/gso/workflows/edge_port/validate_edge_port.py @@ -10,7 +10,7 @@ from orchestrator.workflows.steps import resync, store_process_subscription from orchestrator.workflows.utils import wrap_modify_initial_input_form from gso.products.product_types.edge_port import EdgePort -from gso.services.lso_client import anonymous_lso_interaction, execute_playbook +from gso.services.lso_client import LSOState, anonymous_lso_interaction from gso.services.netbox_client import NetboxClient @@ -56,19 +56,18 @@ def verify_netbox_entries(subscription: EdgePort) -> None: @step("Check base config for drift") -def verify_base_config(subscription: dict[str, Any], callback_route: str) -> None: +def verify_base_config(subscription: dict[str, Any]) -> LSOState: """Workflow step for running a playbook that checks whether base config has drifted.""" - execute_playbook( - playbook_name="edge_port.yaml", - callback_route=callback_route, - inventory=subscription["edge_port"]["edge_port_node"]["router_fqdn"], - extra_vars={ + return { + "playbook_name": "edge_port.yaml", + "inventory": {"all": {"hosts": {subscription["edge_port"]["edge_port_node"]["router_fqdn"]: None}}}, + "extra_vars": { "dry_run": True, "subscription": subscription, "verb": "create", "is_verification_workflow": "true", }, - ) + } @workflow( diff --git a/gso/workflows/geant_ip/create_geant_ip.py b/gso/workflows/geant_ip/create_geant_ip.py index 45ceb1b97c5b06a5977b7995ae503d4f32080d7a..9e62accbf34dddfdb87e0b0e194d5aed6713b120 100644 --- a/gso/workflows/geant_ip/create_geant_ip.py +++ b/gso/workflows/geant_ip/create_geant_ip.py @@ -18,7 +18,7 @@ from gso.products.product_blocks.geant_ip import NRENAccessPortInactive from gso.products.product_blocks.service_binding_port import VLAN_ID, ServiceBindingPort from gso.products.product_types.edge_port import EdgePort from gso.products.product_types.geant_ip import GeantIPInactive -from gso.services.lso_client import execute_playbook, lso_interaction +from gso.services.lso_client import LSOState, lso_interaction from gso.utils.helpers import ( active_edge_port_selector, partner_choice, @@ -173,12 +173,8 @@ def initialize_subscription( @step("[DRY RUN] Deploy service binding port") def provision_sbp_dry( - subscription: dict[str, Any], - callback_route: str, - process_id: UUIDstr, - tt_number: str, - edge_port_fqdn_list: list[str], -) -> None: + subscription: dict[str, Any], process_id: UUIDstr, tt_number: str, edge_port_fqdn_list: list[str] +) -> LSOState: """Perform a dry run of deploying Service Binding Ports.""" extra_vars = { "subscription": subscription, @@ -188,22 +184,17 @@ def provision_sbp_dry( f"Deploy config for {subscription["description"]}", } - execute_playbook( - playbook_name="manage_sbp.yaml", - callback_route=callback_route, - inventory="\n".join(edge_port_fqdn_list), - extra_vars=extra_vars, - ) + return { + "playbook_name": "manage_sbp.yaml", + "inventory": {"all": {"hosts": dict.fromkeys(edge_port_fqdn_list)}}, + "extra_vars": extra_vars, + } @step("[FOR REAL] Deploy service binding port") def provision_sbp_real( - subscription: dict[str, Any], - callback_route: str, - process_id: UUIDstr, - tt_number: str, - edge_port_fqdn_list: list[str], -) -> None: + subscription: dict[str, Any], process_id: UUIDstr, tt_number: str, edge_port_fqdn_list: list[str] +) -> LSOState: """Deploy Service Binding Ports.""" extra_vars = { "subscription": subscription, @@ -213,35 +204,29 @@ def provision_sbp_real( f"Deploy config for {subscription["description"]}", } - execute_playbook( - playbook_name="manage_sbp.yaml", - callback_route=callback_route, - inventory="\n".join(edge_port_fqdn_list), - extra_vars=extra_vars, - ) + return { + "playbook_name": "manage_sbp.yaml", + "inventory": {"all": {"hosts": dict.fromkeys(edge_port_fqdn_list)}}, + "extra_vars": extra_vars, + } @step("Check service binding port functionality") -def check_sbp_functionality(subscription: dict[str, Any], callback_route: str, edge_port_fqdn_list: list[str]) -> None: +def check_sbp_functionality(subscription: dict[str, Any], edge_port_fqdn_list: list[str]) -> LSOState: """Check functionality of deployed Service Binding Ports.""" extra_vars = {"subscription": subscription, "verb": "check"} - execute_playbook( - playbook_name="manage_sbp.yaml", - callback_route=callback_route, - inventory="\n".join(edge_port_fqdn_list), - extra_vars=extra_vars, - ) + return { + "playbook_name": "manage_sbp.yaml", + "inventory": {"all": {"hosts": dict.fromkeys(edge_port_fqdn_list)}}, + "extra_vars": extra_vars, + } @step("[DRY RUN] Deploy BGP peers") def deploy_bgp_peers_dry( - subscription: dict[str, Any], - callback_route: str, - edge_port_fqdn_list: list[str], - tt_number: str, - process_id: UUIDstr, -) -> None: + subscription: dict[str, Any], edge_port_fqdn_list: list[str], tt_number: str, process_id: UUIDstr +) -> LSOState: """Perform a dry run of deploying :term:`BGP` peers.""" extra_vars = { "subscription": subscription, @@ -251,22 +236,17 @@ def deploy_bgp_peers_dry( f"Deploying BGP peers for {subscription["description"]}", } - execute_playbook( - playbook_name="manage_bgp_peers.yaml", - callback_route=callback_route, - inventory="\n".join(edge_port_fqdn_list), - extra_vars=extra_vars, - ) + return { + "playbook_name": "manage_sbp.yaml", + "inventory": {"all": {"hosts": dict.fromkeys(edge_port_fqdn_list)}}, + "extra_vars": extra_vars, + } @step("[FOR REAL] Deploy BGP peers") def deploy_bgp_peers_real( - subscription: dict[str, Any], - callback_route: str, - edge_port_fqdn_list: list[str], - tt_number: str, - process_id: UUIDstr, -) -> None: + subscription: dict[str, Any], edge_port_fqdn_list: list[str], tt_number: str, process_id: UUIDstr +) -> LSOState: """Deploy :term:`BGP` peers.""" extra_vars = { "subscription": subscription, @@ -276,25 +256,23 @@ def deploy_bgp_peers_real( f"Deploying BGP peers for {subscription["description"]}", } - execute_playbook( - playbook_name="manage_bgp_peers.yaml", - callback_route=callback_route, - inventory="\n".join(edge_port_fqdn_list), - extra_vars=extra_vars, - ) + return { + "playbook_name": "manage_sbp.yaml", + "inventory": {"all": {"hosts": dict.fromkeys(edge_port_fqdn_list)}}, + "extra_vars": extra_vars, + } @step("Check BGP peers") -def check_bgp_peers(subscription: dict[str, Any], callback_route: str, edge_port_fqdn_list: list[str]) -> None: +def check_bgp_peers(subscription: dict[str, Any], edge_port_fqdn_list: list[str]) -> LSOState: """Check correct deployment of :term:`BGP` peers.""" extra_vars = {"subscription": subscription, "verb": "check"} - execute_playbook( - playbook_name="manage_bgp_peers.yaml", - callback_route=callback_route, - inventory="\n".join(edge_port_fqdn_list), - extra_vars=extra_vars, - ) + return { + "playbook_name": "manage_sbp.yaml", + "inventory": {"all": {"hosts": dict.fromkeys(edge_port_fqdn_list)}}, + "extra_vars": extra_vars, + } @step("Update Infoblox") diff --git a/gso/workflows/geant_ip/create_imported_geant_ip.py b/gso/workflows/geant_ip/create_imported_geant_ip.py index 7b57a24b6b0fea4746814424e0feda7f8d3ffd73..dda9c3d93011c60c6c901e090c65e62ab408d7db 100644 --- a/gso/workflows/geant_ip/create_imported_geant_ip.py +++ b/gso/workflows/geant_ip/create_imported_geant_ip.py @@ -1,4 +1,5 @@ """A creation workflow for adding an existing GEANT IP to the service database.""" + from uuid import uuid4 from orchestrator import workflow @@ -10,10 +11,11 @@ from orchestrator.workflows.steps import resync, set_status, store_process_subsc from pydantic import BaseModel from pydantic_forms.types import UUIDstr -from gso.products import EdgePort, ProductName -from gso.products.product_blocks.bgp_session import IPFamily, BGPSession +from gso.products import ProductName +from gso.products.product_blocks.bgp_session import BGPSession, IPFamily from gso.products.product_blocks.geant_ip import NRENAccessPortInactive from gso.products.product_blocks.service_binding_port import VLAN_ID, ServiceBindingPort +from gso.products.product_types.edge_port import EdgePort from gso.products.product_types.geant_ip import ImportedGeantIPInactive from gso.services.partners import get_partner_by_name from gso.services.subscriptions import get_product_id_by_name @@ -79,8 +81,7 @@ def initialize_subscription(subscription: ImportedGeantIPInactive, initial_input for service_binding_port in initial_input["service_binding_ports"]: edge_port_subscription = EdgePort.from_subscription(service_binding_port["edge_port"]) sbp_bgp_session_list = [ - BGPSession.new(subscription_id=uuid4(), **session) - for session in service_binding_port["bgp_peers"] + BGPSession.new(subscription_id=uuid4(), **session) for session in service_binding_port["bgp_peers"] ] ServiceBindingPort.new( subscription_id=uuid4(), @@ -109,11 +110,11 @@ def initialize_subscription(subscription: ImportedGeantIPInactive, initial_input def create_imported_geant_ip() -> StepList: """Import a GÉANT IP without provisioning it.""" return ( - begin - >> create_subscription - >> store_process_subscription(Target.CREATE) - >> initialize_subscription - >> set_status(SubscriptionLifecycle.ACTIVE) - >> resync - >> done + begin + >> create_subscription + >> store_process_subscription(Target.CREATE) + >> initialize_subscription + >> set_status(SubscriptionLifecycle.ACTIVE) + >> resync + >> done ) diff --git a/gso/workflows/geant_ip/modify_geant_ip.py b/gso/workflows/geant_ip/modify_geant_ip.py index a1773d97b5dc1ca40d658d99de351299f1ccc733..db56802f1199ff114f62522f051657e0d8ec20b1 100644 --- a/gso/workflows/geant_ip/modify_geant_ip.py +++ b/gso/workflows/geant_ip/modify_geant_ip.py @@ -7,9 +7,11 @@ from orchestrator import begin, conditional, done, step, workflow from orchestrator.forms import FormPage from orchestrator.targets import Target from orchestrator.types import FormGenerator, UUIDstr -from orchestrator.workflows.steps import State, resync, store_process_subscription, unsync +from orchestrator.workflow import StepList +from orchestrator.workflows.steps import resync, store_process_subscription, unsync from orchestrator.workflows.utils import wrap_modify_initial_input_form from pydantic import AfterValidator, BaseModel, ConfigDict, Field, computed_field +from pydantic_forms.types import State from pydantic_forms.validators import Divider, Label from gso.products.product_blocks.bgp_session import BGPSession, IPFamily @@ -106,7 +108,7 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: if str(access_port.geant_ip_sbp.edge_port.owner_subscription_id) in input_ep_list ] added_ap_list = [ - (ep, next((ap.nren_ap_type for ap in input_ap_list if str(ap.geant_ip_ep) == ep), None)) + (ep, next(ap.nren_ap_type for ap in input_ap_list if str(ap.geant_ip_ep) == ep)) for ep in input_ep_list if ep not in existing_ep_list ] @@ -116,8 +118,8 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: for access_port_index, ap_entry in enumerate(modified_ap_list): access_port, new_ap_type = ap_entry current_sbp = access_port.geant_ip_sbp - v4_peer = next((peer for peer in current_sbp.sbp_bgp_session_list if IPFamily.V4UNICAST in peer.families), None) - v6_peer = next((peer for peer in current_sbp.sbp_bgp_session_list if IPFamily.V6UNICAST in peer.families), None) + v4_peer = next(peer for peer in current_sbp.sbp_bgp_session_list if IPFamily.V4UNICAST in peer.families) + v6_peer = next(peer for peer in current_sbp.sbp_bgp_session_list if IPFamily.V6UNICAST in peer.families) class BindingPortModificationForm(FormPage): model_config = ConfigDict( @@ -131,9 +133,11 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: geant_sid: str = current_sbp.geant_sid is_tagged: bool = current_sbp.is_tagged - vlan_id: VLAN_ID = current_sbp.vlan_id - ipv4_address: IPv4AddressType = current_sbp.ipv4_address - ipv6_address: IPv6AddressType = current_sbp.ipv6_address + # The SBP model doesn't require these three fields, but in the case of GÉANT IP this will never occur since + # it's a layer 3 service. The ignore statements are there to put our type checker at ease. + vlan_id: VLAN_ID = current_sbp.vlan_id # type: ignore[assignment] + ipv4_address: IPv4AddressType = current_sbp.ipv4_address # type: ignore[assignment] + ipv6_address: IPv6AddressType = current_sbp.ipv6_address # type: ignore[assignment] custom_firewall_filters: bool = current_sbp.custom_firewall_filters divider: Divider = Field(None, exclude=True) v4_bgp_peer: IPv4BGPPeer = IPv4BGPPeer( @@ -156,8 +160,8 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: # Second, newly added Edge Ports are configured binding_port_inputs = [] - for ap_index, access_port in enumerate(added_ap_list): - edge_port_id, ap_type = access_port + for ap_index, access_port_tuple in enumerate(added_ap_list): + edge_port_id, ap_type = access_port_tuple class BindingPortInputForm(FormPage): model_config = ConfigDict( @@ -204,7 +208,7 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator: @step("Clean up removed Edge Ports") -def remove_old_sbp_blocks(subscription: GeantIP, removed_access_ports: list[UUIDstr]): +def remove_old_sbp_blocks(subscription: GeantIP, removed_access_ports: list[UUIDstr]) -> State: """Remove old :term:`SBP` product blocks from the GÉANT IP subscription.""" subscription.geant_ip.geant_ip_ap_list = [ ap @@ -221,15 +225,14 @@ def modify_existing_sbp_blocks(subscription: GeantIP, modified_sbp_list: list[di for access_port in subscription.geant_ip.geant_ip_ap_list: current_sbp = access_port.geant_ip_sbp modified_sbp_data = next( - (sbp for sbp in modified_sbp_list if sbp["current_sbp_id"] == str(current_sbp.subscription_instance_id)), - None, + sbp for sbp in modified_sbp_list if sbp["current_sbp_id"] == str(current_sbp.subscription_instance_id) ) - v4_peer = next((peer for peer in current_sbp.sbp_bgp_session_list if IPFamily.V4UNICAST in peer.families), None) + v4_peer = next(peer for peer in current_sbp.sbp_bgp_session_list if IPFamily.V4UNICAST in peer.families) for attribute in modified_sbp_data["v4_bgp_peer"]: setattr(v4_peer, attribute, modified_sbp_data["v4_bgp_peer"][attribute]) - v6_peer = next((peer for peer in current_sbp.sbp_bgp_session_list if IPFamily.V6UNICAST in peer.families), None) + v6_peer = next(peer for peer in current_sbp.sbp_bgp_session_list if IPFamily.V6UNICAST in peer.families) for attribute in modified_sbp_data["v6_bgp_peer"]: setattr(v6_peer, attribute, modified_sbp_data["v6_bgp_peer"][attribute]) @@ -246,7 +249,7 @@ def modify_existing_sbp_blocks(subscription: GeantIP, modified_sbp_list: list[di @step("Instantiate new Service Binding Ports") -def create_new_sbp_blocks(subscription: GeantIP, added_service_binding_ports: list[dict[str, Any]]): +def create_new_sbp_blocks(subscription: GeantIP, added_service_binding_ports: list[dict[str, Any]]) -> State: """Add new :term:`SBP`s to the GÉANT IP subscription.""" for sbp_input in added_service_binding_ports: edge_port = EdgePort.from_subscription(sbp_input["edge_port_id"]) @@ -277,7 +280,7 @@ def create_new_sbp_blocks(subscription: GeantIP, added_service_binding_ports: li initial_input_form=wrap_modify_initial_input_form(initial_input_form_generator), target=Target.MODIFY, ) -def modify_geant_ip(): +def modify_geant_ip() -> StepList: """Modify a GÉANT IP subscription.""" access_ports_are_removed = conditional(lambda state: bool(len(state["removed_access_ports"]) > 0)) access_ports_are_modified = conditional(lambda state: bool(len(state["modified_sbp_list"]) > 0)) diff --git a/test/workflows/geant_ip/test_create_geant_ip.py b/test/workflows/geant_ip/test_create_geant_ip.py index 56f63954b6bbacbe8fa0b7ea8881d39f444a3c7f..2d0306604bab8c2020cc8537a743739869ed9d8c 100644 --- a/test/workflows/geant_ip/test_create_geant_ip.py +++ b/test/workflows/geant_ip/test_create_geant_ip.py @@ -49,4 +49,5 @@ def test_create_geant_ip_success( ] result, process_stat, step_log = run_workflow("create_geant_ip", form_input_data) + assert process_stat, step_log assert_complete(result)