Skip to content
Snippets Groups Projects
Verified Commit b229d0f7 authored by Karel van Klink's avatar Karel van Klink :smiley_cat:
Browse files

Update GÉANT IP modification workflow

parent a8f28ca0
No related branches found
No related tags found
1 merge request!286Add Edge Port, GÉANT IP and IAS products
......@@ -7,7 +7,6 @@ from orchestrator.forms import FormPage
from orchestrator.forms.validators import Label
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, State, SubscriptionLifecycle, UUIDstr
from orchestrator.utils.errors import ProcessFailureError
from orchestrator.workflow import StepList, begin, done, step, workflow
from orchestrator.workflows.steps import resync, set_status, store_process_subscription
from orchestrator.workflows.utils import wrap_create_initial_input_form
......@@ -91,7 +90,6 @@ def initial_input_form_generator(product_name: str) -> FormGenerator:
def families(self) -> list[IPFamily]:
return [IPFamily.V6UNICAST, IPFamily.V6MULTICAST] if self.add_v6_multicast else [IPFamily.V6UNICAST]
bgp_peer_defaults = {"rtbh_enabled": True, "is_multi_hop": True}
binding_port_inputs = []
for ep_index, edge_port in enumerate(ep_list):
......@@ -115,19 +113,20 @@ def initial_input_form_generator(product_name: str) -> FormGenerator:
v6_bgp_peer: IPv6BGPPeer
binding_port_input_form = yield BindingPortsInputForm
binding_port_input = binding_port_input_form.model_dump()
binding_port_input["sbp_type"] = SBPType.L3
binding_port_input["bgp_peers"] = [
binding_port_input_form.v4_bgp_peer.model_dump() | bgp_peer_defaults,
binding_port_input_form.v6_bgp_peer.model_dump() | bgp_peer_defaults,
]
binding_port_inputs.append(binding_port_input)
binding_port_inputs.append(
binding_port_input_form.model_dump()
| {
"bgp_peers": [
binding_port_input_form.v4_bgp_peer.model_dump(),
binding_port_input_form.v6_bgp_peer.model_dump(),
]
}
)
return (
initial_user_input.model_dump()
| selected_edge_ports.model_dump()
| {"binding_port_inputs": binding_port_inputs}
| {"binding_port_inputs": binding_port_inputs, "product_name": product_name}
)
......@@ -147,20 +146,24 @@ def initialize_subscription(
edge_port_fqdn_list = []
for edge_port_input, sbp_input in zip(edge_ports, binding_port_inputs, strict=False):
edge_port_subscription = EdgePort.from_subscription(edge_port_input["edge_port"])
sbp_bgp_session_list = [
BGPSession.new(subscription_id=uuid4(), **session, rtbh_enabled=True, is_multi_hop=True)
for session in sbp_input["bgp_peers"]
]
service_binding_port = ServiceBindingPort.new(
subscription_id=uuid4(), **sbp_input, sbp_bgp_session_list=sbp_bgp_session_list, sbp_type=SBPType.L3
)
subscription.geant_ip.geant_ip_ap_list.append(
NRENAccessPortInactive.new(
subscription_id=uuid4(),
nren_ap_type=edge_port_input["ap_type"],
geant_ip_ep=edge_port_subscription.edge_port,
geant_ip_sbp=service_binding_port,
)
)
sbp_bgp_session_list = [
BGPSession.new(subscription_id=uuid4(), **session) for session in sbp_input["bgp_peers"]
]
edge_port_subscription.edge_port.edge_port_sbp_list.append(
ServiceBindingPort.new(subscription_id=uuid4(), **sbp_input, sbp_bgp_session_list=sbp_bgp_session_list)
)
edge_port_subscription.edge_port.edge_port_sbp_list.append(service_binding_port)
edge_port_fqdn_list.append(edge_port_subscription.edge_port.edge_port_node.router_fqdn)
edge_port_subscription.save()
subscription.description = "GEANT IP service"
......@@ -294,9 +297,10 @@ def check_bgp_peers(subscription: dict[str, Any], callback_route: str, edge_port
@step("Update Infoblox")
def update_dns_records(subscription: GeantIPInactive) -> None:
def update_dns_records(subscription: GeantIPInactive) -> State:
"""Update :term:`DNS` records in Infoblox."""
raise ProcessFailureError(subscription.description)
# TODO: implement
return {"subscription": subscription}
@workflow(
......@@ -325,7 +329,7 @@ def create_geant_ip() -> StepList:
>> lso_interaction(deploy_bgp_peers_real)
>> lso_interaction(check_bgp_peers)
>> update_dns_records
>> set_status(SubscriptionLifecycle.PROVISIONING)
>> set_status(SubscriptionLifecycle.ACTIVE)
>> resync
>> done
)
"""A modification workflow for a GÉANT IP subscription."""
from orchestrator import begin, done, step, workflow
from typing import Annotated, Any, ClassVar
from uuid import UUID, uuid4
from orchestrator import begin, conditional, done, step, workflow
from orchestrator.forms import FormPage
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, UUIDstr
from orchestrator.workflows.steps import State, resync, store_process_subscription, unsync
from orchestrator.workflows.utils import wrap_modify_initial_input_form
from pydantic import BaseModel, ConfigDict
from pydantic import AfterValidator, BaseModel, ConfigDict, Field, computed_field
from pydantic_forms.validators import Divider, Label
from gso.products.product_blocks.bgp_session import BGPSession, IPFamily
from gso.products.product_blocks.geant_ip import NRENAccessPort
from gso.products.product_blocks.service_binding_port import VLAN_ID, ServiceBindingPort
from gso.products.product_types.edge_port import EdgePort
from gso.products.product_types.geant_ip import GeantIP
from gso.utils.helpers import active_edge_port_selector
from gso.utils.shared_enums import APType
from gso.utils.shared_enums import APType, SBPType
from gso.utils.types.ip_address import IPv4AddressType, IPv6AddressType
def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
"""Get input about added, removed, and modified Access Ports."""
subscription = GeantIP.from_subscription(subscription_id)
class AccessPortSelection(BaseModel):
geant_ip_ep: active_edge_port_selector(partner_id=subscription.customer_id) # type: ignore[valid-type]
nren_ap_type: APType
def validate_edge_ports_are_unique(edge_ports: list[AccessPortSelection]) -> list[AccessPortSelection]:
def validate_edge_ports_are_unique(access_ports: list[AccessPortSelection]) -> list[AccessPortSelection]:
"""Verify if interfaces are unique."""
port_names = [port.geant_ip_ep for port in edge_ports]
if len(port_names) != len(set(port_names)):
edge_ports = [port.geant_ip_ep.name for port in access_ports]
if len(edge_ports) != len(set(edge_ports)):
msg = "Edge Ports must be unique."
raise ValueError(msg)
return edge_ports
return access_ports
class ModifyGeantIPAccessPortsForm(FormPage):
model_config = ConfigDict(title="Modify GÉANT IP")
access_ports: list[AccessPortSelection] = [
access_ports: ClassVar[Annotated[list[AccessPortSelection], AfterValidator(validate_edge_ports_are_unique)]] = [
AccessPortSelection(
geant_ip_ep=str(access_port.geant_ip_ep.owner_subscription_id), nren_ap_type=access_port.nren_ap_type
)
......@@ -38,19 +48,233 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
]
access_port_input = yield ModifyGeantIPAccessPortsForm
ap_list = access_port_input.access_ports
total_ap_count = len(ap_list)
input_ap_list = access_port_input.access_ports
input_ep_list = [str(ap.geant_ip_ep) for ap in input_ap_list]
existing_ep_list = [str(ap.geant_ip_ep.owner_subscription_id) for ap in subscription.geant_ip.geant_ip_ap_list]
class BaseBGPPeer(BaseModel):
bfd_enabled: bool = False
bfd_interval: int | None = None
bfd_multiplier: int | None = None
has_custom_policies: bool = False
authentication_key: str
multipath_enabled: bool = False
send_default_route: bool = False
is_passive: bool = False
class IPv4BGPPeer(BaseBGPPeer):
peer_address: IPv4AddressType
add_v4_multicast: bool = Field(default=False, exclude=True)
@computed_field # type: ignore[misc]
@property
def families(self) -> list[IPFamily]:
return [IPFamily.V4UNICAST, IPFamily.V4MULTICAST] if self.add_v4_multicast else [IPFamily.V4UNICAST]
class IPv6BGPPeer(BaseBGPPeer):
peer_address: IPv6AddressType
add_v6_multicast: bool = Field(default=False, exclude=True)
@computed_field # type: ignore[misc]
@property
def families(self) -> list[IPFamily]:
return [IPFamily.V6UNICAST, IPFamily.V6MULTICAST] if self.add_v6_multicast else [IPFamily.V6UNICAST]
# There are three possible scenarios for Edge Ports. They can be added, removed, or their relevant SBP can be
# modified. SBPs need to be removed and added accordingly to keep the Edge Port subscriptions up to date.
removed_ap_list = [
access_port.subscription_instance_id
for access_port in subscription.geant_ip.geant_ip_ap_list
if str(access_port.geant_ip_ep.owner_subscription_id) not in input_ep_list
]
modified_ap_list = [
(
access_port,
next(
(
ap.nren_ap_type
for ap in input_ap_list
if str(ap.geant_ip_ep) == str(access_port.geant_ip_ep.owner_subscription_id)
),
None,
),
)
for access_port in subscription.geant_ip.geant_ip_ap_list
if str(access_port.geant_ip_ep.owner_subscription_id) in input_ep_list
]
added_ap_list = [
(ep, next((ap.nren_ap_type for ap in input_ap_list if str(ap.geant_ip_ep) == ep), None))
for ep in input_ep_list
if ep not in existing_ep_list
]
# First, the user can modify existing Edge Ports
sbp_inputs = []
for access_port_index, ap_entry in enumerate(modified_ap_list):
access_port, new_ap_type = ap_entry
current_sbp = access_port.geant_ip_sbp
v4_peer = next((peer for peer in current_sbp.sbp_bgp_session_list if IPFamily.V4UNICAST in peer.families), None)
v6_peer = next((peer for peer in current_sbp.sbp_bgp_session_list if IPFamily.V6UNICAST in peer.families), None)
class BindingPortModificationForm(FormPage):
model_config = ConfigDict(
title=f"GÉANT IP - Modify Edge Port configuration ({access_port_index + 1}/{len(input_ap_list)})"
)
current_ep_label: Label = Field(
f"Currently configuring on {access_port.geant_ip_ep.description} "
f"(Access Port type: {access_port.nren_ap_type})",
exclude=True,
)
geant_sid: str = current_sbp.geant_sid
is_tagged: bool = current_sbp.is_tagged
vlan_id: VLAN_ID = current_sbp.vlan_id
ipv4_address: IPv4AddressType = current_sbp.ipv4_address
ipv6_address: IPv6AddressType = current_sbp.ipv6_address
custom_firewall_filters: bool = current_sbp.custom_firewall_filters
divider: Divider = Field(None, exclude=True)
v4_bgp_peer: IPv4BGPPeer = IPv4BGPPeer(
**v4_peer.model_dump(exclude=set("families")),
add_v4_multicast=bool(IPFamily.V4MULTICAST in v4_peer.families),
)
v6_bgp_peer: IPv6BGPPeer = IPv6BGPPeer(
**v6_peer.model_dump(exclude=set("families")),
add_v6_multicast=bool(IPFamily.V6MULTICAST in v6_peer.families),
)
binding_port_input_form = yield BindingPortModificationForm
sbp_inputs.append(
binding_port_input_form.model_dump()
| {
"new_ap_type": new_ap_type,
"current_sbp_id": current_sbp.subscription_instance_id,
}
)
# Second, newly added Edge Ports are configured
binding_port_inputs = []
for ap_index, access_port in enumerate(added_ap_list):
edge_port_id, ap_type = access_port
class BindingPortInputForm(FormPage):
model_config = ConfigDict(
title=f"GÉANT IP - Configure new Edge Port "
f"({len(modified_ap_list) + ap_index + 1}/{len(input_ap_list)})"
)
info_label: Label = Field(
"Please configure the Service Binding Ports for each newly added Edge Port", exclude=True
)
current_ep_label: Label = Field(
f"Currently configuring on {EdgePort.from_subscription(edge_port_id).description} "
f"(Access Port type: {ap_type})",
exclude=True,
)
access_port_inputs = []
for access_port_index, access_port in enumerate(ap_list):
access_port_input.append(access_port)
geant_sid: str
is_tagged: bool = False
vlan_id: VLAN_ID
ipv4_address: IPv4AddressType
ipv6_address: IPv6AddressType
custom_firewall_filters: bool = False
divider: Divider = Field(None, exclude=True)
v4_bgp_peer: IPv4BGPPeer
v6_bgp_peer: IPv6BGPPeer
return access_port_input.model_dump()
binding_port_input_form = yield BindingPortInputForm
binding_port_inputs.append(
binding_port_input_form.model_dump()
| {
"bgp_peers": [
binding_port_input_form.v4_bgp_peer.model_dump(),
binding_port_input_form.v6_bgp_peer.model_dump(),
],
"edge_port_id": edge_port_id,
"ap_type": ap_type,
}
)
return access_port_input.model_dump() | {
"added_service_binding_ports": binding_port_inputs,
"removed_access_ports": removed_ap_list,
"modified_sbp_list": sbp_inputs,
}
@step("Update subscription model")
def modify_geant_ip_subscription(subscription: GeantIP) -> State:
@step("Clean up removed Edge Ports")
def remove_old_sbp_blocks(subscription: GeantIP, removed_access_ports: list[UUIDstr]):
"""Remove old :term:`SBP` product blocks from the GÉANT IP subscription."""
subscription.geant_ip.geant_ip_ap_list = [
ap
for ap in subscription.geant_ip.geant_ip_ap_list
if str(ap.subscription_instance_id) not in removed_access_ports
]
for ap in removed_access_ports:
access_port = NRENAccessPort.from_db(UUID(ap))
# Also remove the :term:`SBP` from the related Edge Port subscription.
edge_port = EdgePort.from_subscription(access_port.geant_ip_ep.owner_subscription_id)
edge_port.edge_port.edge_port_sbp_list = [
sbp
for sbp in edge_port.edge_port.edge_port_sbp_list
if str(sbp.subscription_instance_id) != access_port.geant_ip_sbp.subscription_instance_id
]
edge_port.save()
return {"subscription": subscription}
@step("Instantiate new Service Binding Ports")
def create_new_sbp_blocks(subscription: GeantIP, added_service_binding_ports: list[dict[str, Any]]):
"""Add new :term:`SBP`s to the GÉANT IP subscription."""
for sbp_input in added_service_binding_ports:
edge_port = EdgePort.from_subscription(sbp_input["edge_port_id"])
sbp_bgp_session_list = [
BGPSession.new(subscription_id=uuid4(), **session, rtbh_enabled=True, is_multi_hop=True)
for session in sbp_input["bgp_peers"]
]
service_binding_port = ServiceBindingPort.new(
subscription_id=uuid4(), **sbp_input, sbp_bgp_session_list=sbp_bgp_session_list, sbp_type=SBPType.L3
)
subscription.geant_ip.geant_ip_ap_list.append(
NRENAccessPort.new(
subscription_id=uuid4(),
nren_ap_type=sbp_input["ap_type"],
geant_ip_ep=edge_port.edge_port,
geant_ip_sbp=service_binding_port,
)
)
edge_port.edge_port.edge_port_sbp_list.append(service_binding_port)
edge_port.save()
return {"subscription": subscription}
@step("Modify existing Service Binding Ports")
def modify_existing_sbp_blocks(subscription: GeantIP, modified_sbp_list: list[dict[str, Any]]) -> State:
"""Update the subscription model."""
for access_port in subscription.geant_ip.geant_ip_ap_list:
current_sbp = access_port.geant_ip_sbp
modified_sbp_data = next(
(sbp for sbp in modified_sbp_list if sbp["current_sbp_id"] == str(current_sbp.subscription_instance_id)),
None,
)
modified_sbp_data.pop("current_sbp_id", None)
v4_peer = next((peer for peer in current_sbp.sbp_bgp_session_list if IPFamily.V4UNICAST in peer.families), None)
for attribute in modified_sbp_data["v4_bgp_peer"]:
setattr(v4_peer, attribute, modified_sbp_data["v4_bgp_peer"][attribute])
modified_sbp_data.pop("v4_bgp_peer")
v6_peer = next((peer for peer in current_sbp.sbp_bgp_session_list if IPFamily.V6UNICAST in peer.families), None)
for attribute in modified_sbp_data["v6_bgp_peer"]:
setattr(v6_peer, attribute, modified_sbp_data["v6_bgp_peer"][attribute])
modified_sbp_data.pop("v6_bgp_peer")
current_sbp.sbp_bgp_session_list = [v4_peer, v6_peer]
access_port.nren_ap_type = modified_sbp_data.pop("new_ap_type")
for attribute in modified_sbp_data:
setattr(current_sbp, attribute, modified_sbp_data[attribute])
return {"subscription": subscription}
......@@ -61,13 +285,17 @@ def modify_geant_ip_subscription(subscription: GeantIP) -> State:
)
def modify_geant_ip():
"""Modify a GÉANT IP subscription."""
access_ports_are_removed = conditional(lambda state: bool(len(state["removed_access_ports"]) > 0))
access_ports_are_added = conditional(lambda state: bool(len(state["added_service_binding_ports"]) > 0))
access_ports_are_modified = conditional(lambda state: bool(len(state["modified_sbp_list"]) > 0))
return (
begin >> store_process_subscription(Target.MODIFY) >> unsync >> modify_geant_ip_subscription >> resync >> done
begin
>> store_process_subscription(Target.MODIFY)
>> unsync
>> access_ports_are_removed(remove_old_sbp_blocks)
>> access_ports_are_added(create_new_sbp_blocks)
>> access_ports_are_modified(modify_existing_sbp_blocks)
>> resync
>> done
)
# we can change the list of edge ports, and reflect this in new SBPs
# we can change BFD
# we can change firewall filters and policies
# for BGP peers:
# is_passive chan change
......@@ -13,17 +13,17 @@ from test.fixtures.site_fixtures import site_subscription_factory
from test.fixtures.super_pop_switch_fixtures import super_pop_switch_subscription_factory
__all__ = [
"bgp_session_subscription_factory",
"edge_port_subscription_factory",
"geant_ip_subscription_factory",
"iptrunk_side_subscription_factory",
"iptrunk_subscription_factory",
"juniper_router_subscription_factory",
"nokia_router_subscription_factory",
"nren_access_port_factory",
"office_router_subscription_factory",
"opengear_subscription_factory",
"service_binding_port_factory",
"site_subscription_factory",
"super_pop_switch_subscription_factory",
"geant_ip_subscription_factory",
"bgp_session_subscription_factory",
"service_binding_port_factory",
"nren_access_port_factory",
]
......@@ -19,17 +19,17 @@ from gso.utils.types.ip_address import IPAddress
@pytest.fixture()
def bgp_session_subscription_factory(faker):
def create_bgp_session(
peer_address: IPAddress | None = None,
bfd_interval: int = 2,
bfd_multiplier: int = 2,
families: list[IPFamily] | None = None,
authentication_key: str | None = None,
*,
is_multi_hop: bool = False,
has_custom_policies: bool = False,
bfd_enabled: bool = True,
multipath_enabled: bool | None = True,
send_default_route: bool | None = True,
peer_address: IPAddress | None = None,
bfd_interval: int = 2,
bfd_multiplier: int = 2,
families: list[IPFamily] | None = None,
authentication_key: str | None = None,
*,
is_multi_hop: bool = False,
has_custom_policies: bool = False,
bfd_enabled: bool = True,
multipath_enabled: bool | None = True,
send_default_route: bool | None = True,
):
return BGPSession.new(
subscription_id=uuid4(),
......@@ -51,15 +51,15 @@ def bgp_session_subscription_factory(faker):
@pytest.fixture()
def service_binding_port_factory(faker, bgp_session_subscription_factory):
def create_service_binding_port(
sbp_bgp_session_list: list | None = None,
geant_sid: str | None = None,
sbp_type: SBPType = SBPType.L3,
ipv4_address: str | None = None,
ipv6_address: str | None = None,
vlan_id: int | None = None,
*,
custom_firewall_filters: bool = False,
is_tagged: bool = False,
sbp_bgp_session_list: list | None = None,
geant_sid: str | None = None,
sbp_type: SBPType = SBPType.L3,
ipv4_address: str | None = None,
ipv6_address: str | None = None,
vlan_id: int | None = None,
*,
custom_firewall_filters: bool = False,
is_tagged: bool = False,
):
return ServiceBindingPort.new(
subscription_id=uuid4(),
......@@ -79,8 +79,8 @@ def service_binding_port_factory(faker, bgp_session_subscription_factory):
@pytest.fixture()
def nren_access_port_factory(faker, edge_port_subscription_factory):
def create_nren_access_port(
nren_ap_type: APType | None = None,
edge_port: UUIDstr | None = None,
nren_ap_type: APType | None = None,
edge_port: UUIDstr | None = None,
):
edge_port = edge_port or edge_port_subscription_factory()
geant_ip_ep = EdgePort.from_subscription(edge_port).edge_port
......@@ -95,19 +95,19 @@ def nren_access_port_factory(faker, edge_port_subscription_factory):
@pytest.fixture()
def geant_ip_subscription_factory(
faker,
partner_factory,
edge_port_subscription_factory,
bgp_session_subscription_factory,
service_binding_port_factory,
nren_access_port_factory,
faker,
partner_factory,
edge_port_subscription_factory,
bgp_session_subscription_factory,
service_binding_port_factory,
nren_access_port_factory,
):
def create_geant_ip_subscription(
description=None,
partner: dict | None = None,
nren_ap_list: list[NRENAccessPort] | None = None,
start_date="2023-05-24T00:00:00+00:00",
status: SubscriptionLifecycle | None = None,
description=None,
partner: dict | None = None,
nren_ap_list: list[NRENAccessPort] | None = None,
start_date="2023-05-24T00:00:00+00:00",
status: SubscriptionLifecycle | None = None,
) -> UUIDstr:
product_id = subscriptions.get_product_id_by_name(ProductName.GEANT_IP)
partner = partner or partner_factory(name=faker.company(), email=faker.email())
......@@ -119,10 +119,12 @@ def geant_ip_subscription_factory(
# Default nren_ap_list creation with primary and backup access ports
nren_ap_list = nren_ap_list or [
nren_access_port_factory(nren_ap_type=APType.PRIMARY,
edge_port=edge_port_subscription_factory(partner=partner)),
nren_access_port_factory(nren_ap_type=APType.BACKUP,
edge_port=edge_port_subscription_factory(partner=partner)),
nren_access_port_factory(
nren_ap_type=APType.PRIMARY, edge_port=edge_port_subscription_factory(partner=partner)
),
nren_access_port_factory(
nren_ap_type=APType.BACKUP, edge_port=edge_port_subscription_factory(partner=partner)
),
]
# Assign and save edge port and service binding ports
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment