Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision

Target

Select target project
  • goat/gap/geant-service-orchestrator
1 result
Select Git revision
Show changes
Showing
with 1743 additions and 32 deletions
"""An enumerator of SNMP version numbers."""
from enum import StrEnum
class SNMPVersion(StrEnum):
"""An enumerator for the two relevant versions of :term:`SNMP`: v2c and 3."""
V2C = "v2c"
V3 = "v3"
......@@ -76,3 +76,20 @@ LazyWorkflowInstance("gso.workflows.tasks.create_partners", "task_create_partner
LazyWorkflowInstance("gso.workflows.tasks.modify_partners", "task_modify_partners")
LazyWorkflowInstance("gso.workflows.tasks.delete_partners", "task_delete_partners")
LazyWorkflowInstance("gso.workflows.tasks.clean_old_tasks", "task_clean_old_tasks")
# Edge port workflows
LazyWorkflowInstance("gso.workflows.edge_port.create_edge_port", "create_edge_port")
LazyWorkflowInstance("gso.workflows.edge_port.modify_edge_port", "modify_edge_port")
LazyWorkflowInstance("gso.workflows.edge_port.terminate_edge_port", "terminate_edge_port")
LazyWorkflowInstance("gso.workflows.edge_port.validate_edge_port", "validate_edge_port")
LazyWorkflowInstance("gso.workflows.edge_port.create_imported_edge_port", "create_imported_edge_port")
LazyWorkflowInstance("gso.workflows.edge_port.import_edge_port", "import_edge_port")
# NREN L3 Core Service workflows
LazyWorkflowInstance("gso.workflows.nren_l3_core_service.create_nren_l3_core_service", "create_nren_l3_core_service")
LazyWorkflowInstance("gso.workflows.nren_l3_core_service.modify_nren_l3_core_service", "modify_nren_l3_core_service")
LazyWorkflowInstance(
"gso.workflows.nren_l3_core_service.create_imported_nren_l3_core_service", "create_imported_nren_l3_core_service"
)
LazyWorkflowInstance("gso.workflows.nren_l3_core_service.import_nren_l3_core_service", "import_nren_l3_core_service")
LazyWorkflowInstance("gso.workflows.nren_l3_core_service.migrate_nren_l3_core_service", "migrate_nren_l3_core_service")
"""All workflows that can be executed on Edge port."""
"""A creation workflow for adding a new edge port to the network."""
from typing import Annotated, Any, Self
from uuid import uuid4
from annotated_types import Len
from orchestrator import step, workflow
from orchestrator.forms import FormPage
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, State, SubscriptionLifecycle, UUIDstr
from orchestrator.utils.errors import ProcessFailureError
from orchestrator.workflow import StepList, begin, done
from orchestrator.workflows.steps import resync, set_status, store_process_subscription
from orchestrator.workflows.utils import wrap_create_initial_input_form
from pydantic import AfterValidator, ConfigDict, model_validator
from pydantic_forms.validators import validate_unique_list
from pynetbox.models.dcim import Interfaces
from gso.products.product_blocks.edge_port import EdgePortAEMemberBlockInactive, EdgePortType, EncapsulationType
from gso.products.product_types.edge_port import EdgePortInactive, EdgePortProvisioning
from gso.products.product_types.router import Router
from gso.services.lso_client import LSOState, lso_interaction
from gso.services.netbox_client import NetboxClient
from gso.services.partners import get_partner_by_id
from gso.utils.helpers import (
active_pe_router_selector,
available_interfaces_choices,
available_service_lags_choices,
partner_choice,
validate_edge_port_number_of_members_based_on_lacp,
)
from gso.utils.types.interfaces import LAGMember, PhysicalPortCapacity
from gso.utils.types.tt_number import TTNumber
def initial_input_form_generator(product_name: str) -> FormGenerator:
"""Gather information to create a new Edge Port."""
class CreateEdgePortForm(FormPage):
model_config = ConfigDict(title=product_name)
tt_number: TTNumber
node: active_pe_router_selector() # type: ignore[valid-type]
partner: partner_choice() # type: ignore[valid-type]
service_type: EdgePortType
enable_lacp: bool = False
speed: PhysicalPortCapacity
encapsulation: EncapsulationType = EncapsulationType.DOT1Q
number_of_members: int
minimum_links: int
mac_address: str | None = None
ignore_if_down: bool = False
geant_ga_id: str | None = None
@model_validator(mode="after")
def validate_number_of_members(self) -> Self:
validate_edge_port_number_of_members_based_on_lacp(
enable_lacp=self.enable_lacp, number_of_members=self.number_of_members
)
return self
initial_user_input = yield CreateEdgePortForm
class EdgePortLAGMember(LAGMember):
interface_name: available_interfaces_choices( # type: ignore[valid-type]
initial_user_input.node, initial_user_input.speed
)
lag_ae_members = Annotated[
list[EdgePortLAGMember],
AfterValidator(validate_unique_list),
Len(
min_length=initial_user_input.number_of_members,
max_length=initial_user_input.number_of_members,
),
]
class SelectInterfaceForm(FormPage):
model_config = ConfigDict(title="Select Interfaces")
name: available_service_lags_choices(initial_user_input.node) # type: ignore[valid-type]
description: str | None = None
ae_members: lag_ae_members
interface_form_input_data = yield SelectInterfaceForm
return initial_user_input.model_dump() | interface_form_input_data.model_dump()
@step("Create subscription")
def create_subscription(product: UUIDstr, partner: UUIDstr) -> State:
"""Create a new subscription object."""
subscription = EdgePortInactive.from_product_id(product, partner)
return {
"subscription": subscription,
"subscription_id": subscription.subscription_id,
}
@step("Initialize subscription")
def initialize_subscription(
subscription: EdgePortInactive,
node: UUIDstr,
service_type: EdgePortType,
speed: PhysicalPortCapacity,
encapsulation: EncapsulationType,
name: str,
minimum_links: int,
geant_ga_id: str | None,
mac_address: str | None,
partner: str,
enable_lacp: bool, # noqa: FBT001
ignore_if_down: bool, # noqa: FBT001
ae_members: list[dict[str, Any]],
description: str | None = None,
) -> State:
"""Initialise the subscription object in the service database."""
router = Router.from_subscription(node).router
subscription.edge_port.edge_port_node = router
subscription.edge_port.edge_port_type = service_type
subscription.edge_port.edge_port_enable_lacp = enable_lacp
subscription.edge_port.edge_port_member_speed = speed
subscription.edge_port.edge_port_encapsulation = encapsulation
subscription.edge_port.edge_port_name = name
subscription.edge_port.edge_port_minimum_links = minimum_links
subscription.edge_port.edge_port_ignore_if_down = ignore_if_down
subscription.edge_port.edge_port_geant_ga_id = geant_ga_id
subscription.edge_port.edge_port_mac_address = mac_address
partner_name = get_partner_by_id(partner).name
subscription.description = f"Edge Port {name} on {router.router_fqdn}, {partner_name}, {geant_ga_id or ""}"
subscription.edge_port.edge_port_description = description
for member in ae_members:
subscription.edge_port.edge_port_ae_members.append(
EdgePortAEMemberBlockInactive.new(subscription_id=uuid4(), **member)
)
subscription = EdgePortProvisioning.from_other_lifecycle(subscription, SubscriptionLifecycle.PROVISIONING)
return {"subscription": subscription}
@step("Reserve interfaces in NetBox")
def reserve_interfaces_in_netbox(subscription: EdgePortProvisioning) -> State:
"""Create the :term:`LAG` interfaces in NetBox and attach the lag interfaces to the physical interfaces."""
nbclient = NetboxClient()
edge_port = subscription.edge_port
# Create :term:`LAG` interfaces
lag_interface: Interfaces = nbclient.create_interface(
iface_name=edge_port.edge_port_name,
interface_type="lag",
device_name=edge_port.edge_port_node.router_fqdn,
description=str(subscription.subscription_id),
enabled=True,
)
# Attach physical interfaces to :term:`LAG`
# Update interface description to subscription ID
# Reserve interfaces
for interface in edge_port.edge_port_ae_members:
nbclient.attach_interface_to_lag(
device_name=edge_port.edge_port_node.router_fqdn,
lag_name=lag_interface.name,
iface_name=interface.interface_name,
description=str(subscription.subscription_id),
)
nbclient.reserve_interface(
device_name=edge_port.edge_port_node.router_fqdn,
iface_name=interface.interface_name,
)
return {
"subscription": subscription,
}
@step("Allocate interfaces in NetBox")
def allocate_interfaces_in_netbox(subscription: EdgePortProvisioning) -> None:
"""Allocate the interfaces in NetBox."""
for interface in subscription.edge_port.edge_port_ae_members:
fqdn = subscription.edge_port.edge_port_node.router_fqdn
iface_name = interface.interface_name
if not fqdn or not iface_name:
msg = "FQDN and/or interface name missing in subscription"
raise ProcessFailureError(msg, details=subscription.subscription_id)
NetboxClient().allocate_interface(device_name=fqdn, iface_name=iface_name)
@step("[DRY RUN] Create edge port")
def create_edge_port_dry(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> LSOState:
"""Create a new edge port in the network as a dry run."""
extra_vars = {
"dry_run": True,
"subscription": subscription,
"commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Create Edge Port",
"verb": "create",
}
return {
"playbook_name": "edge_port.yaml",
"inventory": {"all": {"hosts": {subscription["edge_port"]["edge_port_node"]["router_fqdn"]: None}}},
"extra_vars": extra_vars,
}
@step("[FOR REAL] Create edge port")
def create_edge_port_real(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> LSOState:
"""Create a new edge port in the network for real."""
extra_vars = {
"dry_run": False,
"subscription": subscription,
"commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Create Edge Port",
"verb": "create",
}
return {
"playbook_name": "edge_port.yaml",
"inventory": {"all": {"hosts": {subscription["edge_port"]["edge_port_node"]["router_fqdn"]: None}}},
"extra_vars": extra_vars,
}
@workflow(
"Create Edge Port",
initial_input_form=wrap_create_initial_input_form(initial_input_form_generator),
target=Target.CREATE,
)
def create_edge_port() -> StepList:
"""Create a new edge port in the network.
* Create and initialise the subscription object in the service database
* Deploy configuration on the new edge port, first as a dry run
* allocate :term:`LAG` and :term:`LAG` members in the Netbox.
"""
return (
begin
>> create_subscription
>> store_process_subscription(Target.CREATE)
>> initialize_subscription
>> reserve_interfaces_in_netbox
>> lso_interaction(create_edge_port_dry)
>> lso_interaction(create_edge_port_real)
>> allocate_interfaces_in_netbox
>> set_status(SubscriptionLifecycle.ACTIVE)
>> resync
>> done
)
"""A creation workflow that adds an existing Edge Port to the DB."""
from typing import Annotated, Any
from uuid import uuid4
from orchestrator import workflow
from orchestrator.forms import FormPage
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, State, SubscriptionLifecycle
from orchestrator.workflow import StepList, begin, done, step
from orchestrator.workflows.steps import resync, set_status, store_process_subscription
from pydantic import AfterValidator, ConfigDict
from pydantic_forms.types import UUIDstr
from pydantic_forms.validators import validate_unique_list
from gso.products import ProductName
from gso.products.product_blocks.edge_port import EdgePortAEMemberBlockInactive, EdgePortType, EncapsulationType
from gso.products.product_types.edge_port import EdgePortInactive, ImportedEdgePortInactive
from gso.products.product_types.router import Router
from gso.services.partners import get_partner_by_name
from gso.services.subscriptions import get_product_id_by_name
from gso.utils.helpers import active_pe_router_selector
from gso.utils.types.interfaces import LAGMember, PhysicalPortCapacity
@step("Create subscription")
def create_subscription(partner: str) -> State:
"""Create a new subscription object."""
partner_id = get_partner_by_name(partner)["partner_id"]
product_id = get_product_id_by_name(ProductName.IMPORTED_EDGE_PORT)
subscription = ImportedEdgePortInactive.from_product_id(product_id, partner_id)
return {
"subscription": subscription,
"subscription_id": subscription.subscription_id,
}
def initial_input_form_generator() -> FormGenerator:
"""Generate a form that is filled in using information passed through the :term:`API` endpoint."""
class ImportEdgePort(FormPage):
model_config = ConfigDict(title="Import Router")
node: active_pe_router_selector() # type: ignore[valid-type]
partner: str
service_type: EdgePortType
enable_lacp: bool
speed: PhysicalPortCapacity
encapsulation: EncapsulationType = EncapsulationType.DOT1Q
minimum_links: int
mac_address: str | None = None
ignore_if_down: bool = False
geant_ga_id: str | None = None
description: str | None = None
name: str
ae_members: Annotated[list[LAGMember], AfterValidator(validate_unique_list)]
user_input = yield ImportEdgePort
return user_input.model_dump()
@step("Initialize subscription")
def initialize_subscription(
subscription: EdgePortInactive,
node: UUIDstr,
service_type: EdgePortType,
speed: PhysicalPortCapacity,
encapsulation: EncapsulationType,
name: str,
minimum_links: int,
geant_ga_id: str | None,
mac_address: str | None,
partner: str,
enable_lacp: bool, # noqa: FBT001
ignore_if_down: bool, # noqa: FBT001
ae_members: list[dict[str, Any]],
description: str | None = None,
) -> State:
"""Initialise the subscription object in the service database."""
router = Router.from_subscription(node).router
subscription.edge_port.edge_port_node = router
subscription.edge_port.edge_port_type = service_type
subscription.edge_port.edge_port_enable_lacp = enable_lacp
subscription.edge_port.edge_port_member_speed = speed
subscription.edge_port.edge_port_encapsulation = encapsulation
subscription.edge_port.edge_port_name = name
subscription.edge_port.edge_port_minimum_links = minimum_links
subscription.edge_port.edge_port_ignore_if_down = ignore_if_down
subscription.edge_port.edge_port_geant_ga_id = geant_ga_id
subscription.edge_port.edge_port_mac_address = mac_address
subscription.description = f"Edge Port {name} on {router.router_fqdn}, {partner}, {geant_ga_id or ""}"
subscription.edge_port.edge_port_description = description
for member in ae_members:
subscription.edge_port.edge_port_ae_members.append(
EdgePortAEMemberBlockInactive.new(subscription_id=uuid4(), **member)
)
return {"subscription": subscription}
@workflow(
"Import Edge Port",
initial_input_form=initial_input_form_generator,
target=Target.CREATE,
)
def create_imported_edge_port() -> StepList:
"""Import a Edge Port without provisioning it."""
return (
begin
>> create_subscription
>> store_process_subscription(Target.CREATE)
>> initialize_subscription
>> set_status(SubscriptionLifecycle.ACTIVE)
>> resync
>> done
)
"""A modification workflow for migrating an ImportedEdgePort to an EdgePort subscription."""
from orchestrator.targets import Target
from orchestrator.types import State, UUIDstr
from orchestrator.workflow import StepList, done, init, step, workflow
from orchestrator.workflows.steps import resync, store_process_subscription, unsync
from orchestrator.workflows.utils import wrap_modify_initial_input_form
from gso.products import ProductName
from gso.products.product_types.edge_port import EdgePort, ImportedEdgePort
from gso.services.subscriptions import get_product_id_by_name
@step("Create new Edge Port subscription")
def import_edge_port_subscription(subscription_id: UUIDstr) -> State:
"""Take an ImportedEdgePort subscription, and turn it into an EdgePort subscription."""
old_edge_port = ImportedEdgePort.from_subscription(subscription_id)
new_subscription_id = get_product_id_by_name(ProductName.EDGE_PORT)
new_subscription = EdgePort.from_other_product(old_edge_port, new_subscription_id) # type: ignore[arg-type]
return {"subscription": new_subscription}
@workflow("Import Edge Port", target=Target.MODIFY, initial_input_form=wrap_modify_initial_input_form(None))
def import_edge_port() -> StepList:
"""Modify an ImportedEdgePort subscription into an EdgePort subscription to complete the import."""
return (
init >> store_process_subscription(Target.MODIFY) >> unsync >> import_edge_port_subscription >> resync >> done
)
"""Modify an existing edge port subscription."""
from typing import Annotated, Any, Self
from uuid import uuid4
from annotated_types import Len
from orchestrator import workflow
from orchestrator.forms import FormPage
from orchestrator.targets import Target
from orchestrator.workflow import StepList, begin, conditional, done, step
from orchestrator.workflows.steps import resync, store_process_subscription, unsync
from orchestrator.workflows.utils import wrap_modify_initial_input_form
from pydantic import AfterValidator, ConfigDict, model_validator
from pydantic_forms.types import FormGenerator, State, UUIDstr
from pydantic_forms.validators import ReadOnlyField, validate_unique_list
from gso.products.product_blocks.edge_port import EdgePortAEMemberBlock, EncapsulationType
from gso.products.product_types.edge_port import EdgePort
from gso.services.lso_client import LSOState, lso_interaction
from gso.services.netbox_client import NetboxClient
from gso.services.partners import get_partner_by_id
from gso.utils.helpers import (
available_interfaces_choices,
available_interfaces_choices_including_current_members,
validate_edge_port_number_of_members_based_on_lacp,
)
from gso.utils.types.interfaces import LAGMember, PhysicalPortCapacity
from gso.utils.types.tt_number import TTNumber
def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
"""Gather input from the operator on what to change about the selected edge port subscription."""
subscription = EdgePort.from_subscription(subscription_id)
class ModifyEdgePortForm(FormPage):
model_config = ConfigDict(title="Modify Edge Port")
tt_number: TTNumber
enable_lacp: bool = subscription.edge_port.edge_port_enable_lacp
member_speed: PhysicalPortCapacity = subscription.edge_port.edge_port_member_speed
encapsulation: EncapsulationType = subscription.edge_port.edge_port_encapsulation
number_of_members: int = len(subscription.edge_port.edge_port_ae_members)
minimum_links: int | None = subscription.edge_port.edge_port_minimum_links or None
mac_address: str | None = subscription.edge_port.edge_port_mac_address or None
ignore_if_down: bool = subscription.edge_port.edge_port_ignore_if_down
geant_ga_id: str | None = subscription.edge_port.edge_port_geant_ga_id or None
@model_validator(mode="after")
def validate_number_of_members(self) -> Self:
validate_edge_port_number_of_members_based_on_lacp(
enable_lacp=self.enable_lacp, number_of_members=self.number_of_members
)
return self
user_input = yield ModifyEdgePortForm
class EdgePortLAGMember(LAGMember):
interface_name: ( # type: ignore[valid-type]
available_interfaces_choices_including_current_members(
subscription.edge_port.edge_port_node.owner_subscription_id,
user_input.member_speed,
subscription.edge_port.edge_port_ae_members,
)
if user_input.member_speed == subscription.edge_port.edge_port_member_speed
else (
available_interfaces_choices(
subscription.edge_port.edge_port_node.owner_subscription_id, user_input.member_speed
)
)
)
lag_ae_members = Annotated[
list[EdgePortLAGMember],
AfterValidator(validate_unique_list),
Len(
min_length=user_input.number_of_members,
max_length=user_input.number_of_members,
),
]
current_lag_ae_members = (
[
EdgePortLAGMember(
interface_name=iface.interface_name,
interface_description=iface.interface_description,
)
for iface in subscription.edge_port.edge_port_ae_members
]
if user_input.member_speed == subscription.edge_port.edge_port_member_speed
else []
)
class ModifyEdgePortInterfaceForm(FormPage):
model_config = ConfigDict(title="Modify Edge Port Interface")
name: ReadOnlyField(subscription.edge_port.edge_port_name, default_type=str) # type: ignore[valid-type]
description: str | None = subscription.edge_port.edge_port_description or None
ae_members: lag_ae_members = current_lag_ae_members
interface_form_input = yield ModifyEdgePortInterfaceForm
capacity_has_changed = (
user_input.member_speed != subscription.edge_port.edge_port_member_speed
or user_input.number_of_members != len(subscription.edge_port.edge_port_ae_members)
or any(
old_interface.interface_name
not in [new_interface.interface_name for new_interface in interface_form_input.ae_members]
for old_interface in subscription.edge_port.edge_port_ae_members
)
or len(subscription.edge_port.edge_port_ae_members) != len(interface_form_input.ae_members)
)
return user_input.model_dump() | interface_form_input.model_dump() | {"capacity_has_changed": capacity_has_changed}
@step("Modify edge port subscription.")
def modify_edge_port_subscription(
subscription: EdgePort,
member_speed: PhysicalPortCapacity,
encapsulation: EncapsulationType,
minimum_links: int,
mac_address: str | None,
geant_ga_id: str | None,
enable_lacp: bool, # noqa: FBT001
ae_members: list[dict[str, str]],
ignore_if_down: bool, # noqa: FBT001
description: str | None = None,
) -> State:
"""Modify the edge port subscription with the given parameters."""
previous_ae_members = [
{
"interface_name": member.interface_name,
"interface_description": member.interface_description,
}
for member in subscription.edge_port.edge_port_ae_members
]
removed_ae_members = [member for member in previous_ae_members if member not in ae_members]
subscription.edge_port.edge_port_enable_lacp = enable_lacp
subscription.edge_port.edge_port_member_speed = member_speed
subscription.edge_port.edge_port_encapsulation = encapsulation
subscription.edge_port.edge_port_minimum_links = minimum_links
subscription.edge_port.edge_port_mac_address = mac_address
subscription.edge_port.edge_port_ignore_if_down = ignore_if_down
subscription.edge_port.edge_port_geant_ga_id = geant_ga_id
subscription.edge_port.edge_port_description = description
subscription.description = (
f"Edge Port {subscription.edge_port.edge_port_name} on"
f" {subscription.edge_port.edge_port_node.router_fqdn},"
f" {get_partner_by_id(subscription.customer_id).name}, {geant_ga_id or ""}"
)
subscription.edge_port.edge_port_ae_members.clear()
for member in ae_members:
subscription.edge_port.edge_port_ae_members.append(EdgePortAEMemberBlock.new(subscription_id=uuid4(), **member))
return {
"subscription": subscription,
"removed_ae_members": removed_ae_members,
"previous_ae_members": previous_ae_members,
}
@step("Update interfaces in NetBox")
def update_interfaces_in_netbox(
subscription: EdgePort, removed_ae_members: list[dict], previous_ae_members: list[dict]
) -> State:
"""Update the interfaces in NetBox."""
nbclient = NetboxClient()
# Free removed interfaces
for removed_member in removed_ae_members:
nbclient.free_interface(subscription.edge_port.edge_port_node.router_fqdn, removed_member["interface_name"])
# Attach physical interfaces to :term:`LAG`
# Update interface description to subscription ID
# Reserve interfaces
for member in subscription.edge_port.edge_port_ae_members:
if any(prev_member["interface_name"] == member.interface_name for prev_member in previous_ae_members):
continue
nbclient.attach_interface_to_lag(
device_name=subscription.edge_port.edge_port_node.router_fqdn,
lag_name=subscription.edge_port.edge_port_name,
iface_name=member.interface_name,
description=str(subscription.subscription_id),
)
nbclient.reserve_interface(subscription.edge_port.edge_port_node.router_fqdn, member.interface_name)
return {"subscription": subscription}
@step("[DRY RUN] Update edge port configuration.")
def update_edge_port_dry(
subscription: dict[str, Any], process_id: UUIDstr, tt_number: str, removed_ae_members: list[dict]
) -> LSOState:
"""Perform a dry run of updating the edge port configuration."""
extra_vars = {
"subscription": subscription,
"dry_run": True,
"verb": "update",
"commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} "
f"- Update Edge Port {subscription["edge_port"]["edge_port_name"]}"
f" on {subscription["edge_port"]["edge_port_node"]["router_fqdn"]}",
"removed_ae_members": removed_ae_members,
}
return {
"playbook_name": "edge_ports.yaml",
"inventory": {"all": {"hosts": {subscription["edge_port"]["edge_port_node"]["router_fqdn"]}}},
"extra_vars": extra_vars,
"subscription": subscription,
}
@step("[FOR REAL] Update edge port configuration.")
def update_edge_port_real(
subscription: dict[str, Any], process_id: UUIDstr, tt_number: str, removed_ae_members: list[str]
) -> LSOState:
"""Update the edge port configuration."""
extra_vars = {
"subscription": subscription,
"dry_run": False,
"verb": "update",
"commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} "
f"- Update Edge Port {subscription["edge_port"]["edge_port_name"]}"
f" on {subscription["edge_port"]["edge_port_node"]["router_fqdn"]}",
"removed_ae_members": removed_ae_members,
}
return {
"subscription": subscription,
"playbook_name": "edge_ports.yaml",
"inventory": {"all": {"hosts": {subscription["edge_port"]["edge_port_node"]["router_fqdn"]: None}}},
"extra_vars": extra_vars,
}
@step("Allocate/Deallocate interfaces in NetBox")
def allocate_interfaces_in_netbox(subscription: EdgePort, previous_ae_members: list[dict]) -> None:
"""Allocate the new interfaces in NetBox and detach the old ones from the :term:`LAG`."""
nbclient = NetboxClient()
for member in subscription.edge_port.edge_port_ae_members:
if any(member.interface_name == prev_member["interface_name"] for prev_member in previous_ae_members):
continue
nbclient.allocate_interface(
device_name=subscription.edge_port.edge_port_node.router_fqdn,
iface_name=member.interface_name,
)
# detach the old interfaces from lag
nbclient.detach_interfaces_from_lag(
device_name=subscription.edge_port.edge_port_node.router_fqdn, lag_name=subscription.edge_port.edge_port_name
)
@workflow(
"Modify Edge Port",
initial_input_form=wrap_modify_initial_input_form(initial_input_form_generator),
target=Target.MODIFY,
)
def modify_edge_port() -> StepList:
"""Modify a new edge port in the network.
* Modify the subscription object in the service database
* Modify configuration on the new edge port, first as a dry run
* Change :term:`LAG` and :term:`LAG` members in the Netbox.
"""
capacity_has_changed = conditional(lambda state: state["capacity_has_changed"])
return (
begin
>> store_process_subscription(Target.MODIFY)
>> unsync
>> modify_edge_port_subscription
>> capacity_has_changed(update_interfaces_in_netbox)
>> capacity_has_changed(lso_interaction(update_edge_port_dry))
>> capacity_has_changed(lso_interaction(update_edge_port_real))
>> capacity_has_changed(allocate_interfaces_in_netbox)
>> resync
>> done
)
"""Terminate an edge port in the network."""
from typing import Any
from orchestrator import workflow
from orchestrator.forms import FormPage
from orchestrator.targets import Target
from orchestrator.types import SubscriptionLifecycle
from orchestrator.workflow import StepList, begin, done, step
from orchestrator.workflows.steps import resync, set_status, store_process_subscription, unsync
from orchestrator.workflows.utils import wrap_modify_initial_input_form
from pydantic_forms.types import FormGenerator, UUIDstr
from gso.products.product_types.edge_port import EdgePort
from gso.services.lso_client import LSOState, lso_interaction
from gso.services.netbox_client import NetboxClient
from gso.utils.types.tt_number import TTNumber
def initial_input_form_generator() -> FormGenerator:
"""Let the operator decide whether to delete configuration on the router, and clear up :term:`IPAM` resources."""
class TerminateForm(FormPage):
tt_number: TTNumber
user_input = yield TerminateForm
return user_input.model_dump()
@step("[DRY RUN] Remove Edge Port")
def remove_edge_port_dry(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> dict[str, Any]:
"""Remove an edge port from the network."""
extra_vars = {
"subscription": subscription,
"dry_run": True,
"verb": "terminate",
"commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Delete Edge Port",
}
return {
"subscription": subscription,
"playbook_name": "edge_port.yaml",
"inventory": {"all": {"hosts": {subscription["edge_port"]["edge_port_node"]["router_fqdn"]: None}}},
"extra_vars": extra_vars,
}
@step("[FOR REAL] Remove Edge Port")
def remove_edge_port_real(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> LSOState:
"""Remove an edge port from the network."""
extra_vars = {
"subscription": subscription,
"dry_run": False,
"verb": "terminate",
"commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Delete Edge Port",
}
return {
"subscription": subscription,
"playbook_name": "edge_port.yaml",
"inventory": {"all": {"hosts": {subscription["edge_port"]["edge_port_node"]["router_fqdn"]: None}}},
"extra_vars": extra_vars,
}
@step("Netbox Clean Up")
def netbox_clean_up(subscription: EdgePort) -> None:
"""Update Netbox to remove the edge port :term:`LAG` interface and all the :term:`LAG` members."""
nbclient = NetboxClient()
for member in subscription.edge_port.edge_port_ae_members:
nbclient.free_interface(subscription.edge_port.edge_port_node.router_fqdn, member.interface_name)
nbclient.delete_interface(subscription.edge_port.edge_port_node.router_fqdn, subscription.edge_port.edge_port_name)
@workflow(
"Terminate Edge Port",
initial_input_form=wrap_modify_initial_input_form(initial_input_form_generator),
target=Target.TERMINATE,
)
def terminate_edge_port() -> StepList:
"""Terminate a new edge port in the network."""
return (
begin
>> store_process_subscription(Target.TERMINATE)
>> unsync
>> lso_interaction(remove_edge_port_dry)
>> lso_interaction(remove_edge_port_real)
>> netbox_clean_up
>> set_status(SubscriptionLifecycle.TERMINATED)
>> resync
>> done
)
"""Workflow for validating an existing Edge port subscription."""
from typing import Any
from orchestrator.targets import Target
from orchestrator.types import State, UUIDstr
from orchestrator.utils.errors import ProcessFailureError
from orchestrator.workflow import StepList, begin, done, step, workflow
from orchestrator.workflows.steps import resync, store_process_subscription
from orchestrator.workflows.utils import wrap_modify_initial_input_form
from gso.products.product_types.edge_port import EdgePort
from gso.services.lso_client import LSOState, anonymous_lso_interaction
from gso.services.netbox_client import NetboxClient
@step("Prepare required keys in state")
def prepare_state(subscription_id: UUIDstr) -> State:
"""Add required keys to the state for the workflow to run successfully."""
edge_port = EdgePort.from_subscription(subscription_id)
return {"subscription": edge_port}
@step("Verify NetBox entries")
def verify_netbox_entries(subscription: EdgePort) -> None:
"""Validate required entries for an edge port in NetBox."""
nbclient = NetboxClient()
netbox_errors = []
# Raises en exception when not found.
lag = nbclient.get_interface_by_name_and_device(
subscription.edge_port.edge_port_name, subscription.edge_port.edge_port_node.router_fqdn
)
if lag.description != str(subscription.subscription_id):
netbox_errors.append(
f"Incorrect description for '{lag}', expected "
f"'{subscription.subscription_id}' but got '{lag.description}'"
)
if not lag.enabled:
netbox_errors.append(f"NetBox interface '{lag}' is not enabled.")
for member in subscription.edge_port.edge_port_ae_members:
interface = nbclient.get_interface_by_name_and_device(
member.interface_name, subscription.edge_port.edge_port_node.router_fqdn
)
if interface.description != str(subscription.subscription_id):
netbox_errors.append(
f"Incorrect description for '{member.interface_name}', expected "
f"'{subscription.subscription_id}' but got '{interface.description}'"
)
if not interface.enabled:
netbox_errors.append(f"NetBox interface '{member.interface_name}' is not enabled.")
if netbox_errors:
raise ProcessFailureError(message="NetBox misconfiguration(s) found", details=str(netbox_errors))
@step("Check base config for drift")
def verify_base_config(subscription: dict[str, Any]) -> LSOState:
"""Workflow step for running a playbook that checks whether base config has drifted."""
return {
"playbook_name": "edge_port.yaml",
"inventory": {"all": {"hosts": {subscription["edge_port"]["edge_port_node"]["router_fqdn"]: None}}},
"extra_vars": {
"dry_run": True,
"subscription": subscription,
"verb": "create",
"is_verification_workflow": "true",
},
}
@workflow(
"Validate Edge Port Configuration", target=Target.SYSTEM, initial_input_form=wrap_modify_initial_input_form(None)
)
def validate_edge_port() -> StepList:
"""Validate an existing, active Edge port subscription.
* Check correct configuration of interfaces in NetBox.
* Verify create Edge port configuration.
"""
return (
begin
>> store_process_subscription(Target.SYSTEM)
>> prepare_state
>> verify_netbox_entries
>> anonymous_lso_interaction(verify_base_config)
>> resync
>> done
)
......@@ -255,7 +255,7 @@ def dig_all_hosts_v6(new_ipv6_network: str) -> None:
@step("Ping all hosts in the assigned IPv4 network")
def ping_all_hosts_v4(new_ipv4_network: str) -> None:
"""Ping all hosts in the IPv4 network to verify they're not in use."""
"""Ping all hosts in the IPv4 network to verify they are not in use."""
unavailable_hosts = [host for host in IPv4Network(new_ipv4_network) if ping(str(host), timeout=1)]
if unavailable_hosts:
......@@ -265,7 +265,7 @@ def ping_all_hosts_v4(new_ipv4_network: str) -> None:
@step("Ping all hosts in the assigned IPv6 network")
def ping_all_hosts_v6(new_ipv6_network: str) -> State:
"""Ping all hosts in the IPv6 network to verify they're not in use."""
"""Ping all hosts in the IPv6 network to verify they are not in use."""
unavailable_hosts = [host for host in IPv6Network(new_ipv6_network) if ping(str(host), timeout=1)]
if unavailable_hosts:
......
......@@ -89,7 +89,7 @@ def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
):
# We want to stay on the same site, so all routers that are in different sites get skipped.
continue
# If migrate_to_different_site is true, we can add ALL routers to the result map
# If migrate_to_different_site is true, we can add *all* routers to the result map
routers[str(router_id)] = router["description"]
new_router_enum = Choice("Select a new router", zip(routers.keys(), routers.items(), strict=True)) # type: ignore[arg-type]
......@@ -204,7 +204,7 @@ def calculate_old_side_data(subscription: Iptrunk, replace_index: int) -> State:
@step("Check Optical PRE levels on the trunk endpoint")
def check_ip_trunk_optical_levels_pre(subscription: Iptrunk) -> LSOState:
"""Check Optical PRE levels on the trunk."""
"""Check Optical levels on the trunk before migration."""
extra_vars = {"wfo_ip_trunk_json": json.loads(json_dumps(subscription)), "check": "optical_pre"}
return {
......@@ -252,7 +252,7 @@ def check_ip_trunk_optical_levels_post(
def check_ip_trunk_lldp(
subscription: Iptrunk, new_node: Router, new_lag_member_interfaces: list[dict], replace_index: int
) -> LSOState:
"""Check LLDP on the new trunk endpoints."""
"""Check :term:`LLDP` on the new trunk endpoints."""
extra_vars = {
"wfo_ip_trunk_json": json.loads(json_dumps(subscription)),
"new_node": json.loads(json_dumps(new_node)),
......@@ -489,7 +489,7 @@ def update_remaining_side_bfd_real(
@step("Check BFD session over trunk")
def check_ip_trunk_bfd(subscription: Iptrunk, new_node: Router, replace_index: int) -> LSOState:
"""Check BFD session across the new trunk."""
"""Check :term:`BFD` session across the new trunk."""
extra_vars = {
"wfo_ip_trunk_json": json.loads(json_dumps(subscription)),
"new_node": json.loads(json_dumps(new_node)),
......@@ -830,7 +830,7 @@ def migrate_iptrunk() -> StepList:
* Deploy a new :term:`ISIS` interface between routers A and C
* Wait for operator confirmation that :term:`ISIS` is behaving as expected
* Restore the old :term:`ISIS` metric on the new trunk
* Delete the old, disabled configuration on the routers, first as a dry run
* Delete the old configuration from the routers, first as a dry run
* Reflect the changes made in :term:`IPAM`
* Update the subscription model in the database
* Update the reserved interfaces in Netbox
......
......@@ -205,7 +205,7 @@ def check_ip_trunk_connectivity(subscription: Iptrunk) -> LSOState:
@step("Check LLDP on the trunk endpoints")
def check_ip_trunk_lldp(subscription: Iptrunk) -> LSOState:
"""Check LLDP on trunk endpoints."""
"""Check :term:`LLDP` on trunk endpoints."""
extra_vars = {"wfo_ip_trunk_json": json.loads(json_dumps(subscription)), "check": "lldp"}
return {
......
......@@ -210,7 +210,7 @@ def validate_iptrunk() -> StepList:
* Verify that the :term:`LAG` interfaces are correctly configured in :term:`IPAM`.
* Check correct configuration of interfaces in NetBox.
* Verify the configuration on both sides of the trunk is intact.
* Check the ISIS metric of the trunk.
* Check the :term:`ISIS` metric of the trunk.
* Verify that TWAMP configuration is correct.
If a trunk has a Juniper router on both sides, it is considered legacy and does not require validation.
......
""":term:`NREN` layer 3 core service workflows."""
"""A creation workflow for adding an existing GEANT IP to the service database."""
from uuid import uuid4
from orchestrator import workflow
from orchestrator.forms import FormPage
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, SubscriptionLifecycle
from orchestrator.workflow import StepList, begin, done, step
from orchestrator.workflows.steps import resync, set_status, store_process_subscription
from pydantic import BaseModel
from pydantic_forms.types import UUIDstr
from gso.products import ProductName
from gso.products.product_blocks.bgp_session import BGPSession, IPFamily
from gso.products.product_blocks.nren_l3_core_service import NRENAccessPortInactive
from gso.products.product_blocks.service_binding_port import VLAN_ID, ServiceBindingPortInactive
from gso.products.product_types.edge_port import EdgePort
from gso.products.product_types.nren_l3_core_service import ImportedGeantIPInactive
from gso.services.partners import get_partner_by_name
from gso.services.subscriptions import get_product_id_by_name
from gso.utils.shared_enums import SBPType
from gso.utils.types.ip_address import IPAddress, IPv4AddressType, IPV4Netmask, IPv6AddressType, IPV6Netmask
def initial_input_form_generator() -> FormGenerator:
"""Take all information passed to this workflow by the :term:`API` endpoint that was called."""
class BaseBGPPeer(BaseModel):
bfd_enabled: bool = False
bfd_interval: int | None = None
bfd_multiplier: int | None = None
has_custom_policies: bool = False
authentication_key: str
multipath_enabled: bool = False
send_default_route: bool = False
is_passive: bool = False
peer_address: IPAddress
families: list[IPFamily]
is_multi_hop: bool
rtbh_enabled: bool
class ServiceBindingPort(BaseModel):
edge_port: UUIDstr
ap_type: str
geant_sid: str
sbp_type: SBPType = SBPType.L3
is_tagged: bool = False
vlan_id: VLAN_ID
custom_firewall_filters: bool = False
ipv4_address: IPv4AddressType
ipv4_mask: IPV4Netmask
ipv6_address: IPv6AddressType
ipv6_mask: IPV6Netmask
rtbh_enabled: bool = True
is_multi_hop: bool = True
bgp_peers: list[BaseBGPPeer]
class ImportGeantIPForm(FormPage):
partner: str
service_binding_ports: list[ServiceBindingPort]
user_input = yield ImportGeantIPForm
return user_input.model_dump()
@step("Create subscription")
def create_subscription(partner: str) -> dict:
"""Create a new subscription object in the database."""
partner_id = get_partner_by_name(partner)["partner_id"]
product_id = get_product_id_by_name(ProductName.IMPORTED_GEANT_IP)
subscription = ImportedGeantIPInactive.from_product_id(product_id, partner_id)
return {"subscription": subscription, "subscription_id": subscription.subscription_id}
@step("Initialize subscription")
def initialize_subscription(subscription: ImportedGeantIPInactive, service_binding_ports: list) -> dict:
"""Initialize the subscription with the user input."""
for service_binding_port in service_binding_ports:
edge_port_subscription = EdgePort.from_subscription(service_binding_port.pop("edge_port"))
bgp_peers = service_binding_port.pop("bgp_peers")
sbp_bgp_session_list = [BGPSession.new(subscription_id=uuid4(), **session) for session in bgp_peers]
service_binding_port_subscription = ServiceBindingPortInactive.new(
subscription_id=uuid4(),
edge_port=edge_port_subscription.edge_port,
sbp_bgp_session_list=sbp_bgp_session_list,
**service_binding_port,
)
subscription.geant_ip.geant_ip_ap_list.append(
NRENAccessPortInactive.new(
subscription_id=uuid4(),
nren_ap_type=service_binding_port["ap_type"],
geant_ip_sbp=service_binding_port_subscription,
)
)
subscription.description = "GEANT IP service"
return {"subscription": subscription}
@workflow(
"Import GÉANT IP",
initial_input_form=initial_input_form_generator,
target=Target.CREATE,
)
def create_imported_geant_ip() -> StepList:
"""Import a GÉANT IP without provisioning it."""
return (
begin
>> create_subscription
>> store_process_subscription(Target.CREATE)
>> initialize_subscription
>> set_status(SubscriptionLifecycle.ACTIVE)
>> resync
>> done
)
"""Create a new GÉANT IP subscription."""
from typing import Annotated, Any
from uuid import uuid4
from orchestrator.forms import FormPage
from orchestrator.forms.validators import Label
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, State, SubscriptionLifecycle, UUIDstr
from orchestrator.workflow import StepList, begin, done, step, workflow
from orchestrator.workflows.steps import resync, set_status, store_process_subscription
from orchestrator.workflows.utils import wrap_create_initial_input_form
from pydantic import AfterValidator, BaseModel, ConfigDict, Field, computed_field
from pydantic_forms.validators import Divider
from gso.products.product_blocks.bgp_session import BGPSession, IPFamily
from gso.products.product_blocks.nren_l3_core_service import NRENAccessPortInactive
from gso.products.product_blocks.service_binding_port import VLAN_ID, ServiceBindingPortInactive
from gso.products.product_types.edge_port import EdgePort
from gso.products.product_types.nren_l3_core_service import GeantIPInactive
from gso.services.lso_client import LSOState, lso_interaction
from gso.utils.helpers import (
active_edge_port_selector,
partner_choice,
)
from gso.utils.shared_enums import APType, SBPType
from gso.utils.types.ip_address import IPv4AddressType, IPV4Netmask, IPv6AddressType, IPV6Netmask
from gso.utils.types.tt_number import TTNumber
def initial_input_form_generator(product_name: str) -> FormGenerator:
"""Gather input from the operator to build a new subscription object."""
class CreateGeantIPForm(FormPage):
model_config = ConfigDict(title="GÉANT IP - Select partner")
tt_number: TTNumber
partner: partner_choice() # type: ignore[valid-type]
initial_user_input = yield CreateGeantIPForm
class EdgePortSelection(BaseModel):
edge_port: active_edge_port_selector(partner_id=initial_user_input.partner) # type: ignore[valid-type]
ap_type: APType
def validate_edge_ports_are_unique(edge_ports: list[EdgePortSelection]) -> list[EdgePortSelection]:
"""Verify if interfaces are unique."""
port_names = [port.edge_port for port in edge_ports]
if len(port_names) != len(set(port_names)):
msg = "Edge Ports must be unique."
raise ValueError(msg)
return edge_ports
class EdgePortSelectionForm(FormPage):
model_config = ConfigDict(title="GÉANT IP - Select Edge Ports")
info_label: Label = Field(
"Please select the Edge Ports where this GÉANT IP service will terminate", exclude=True
)
edge_ports: Annotated[list[EdgePortSelection], AfterValidator(validate_edge_ports_are_unique)]
selected_edge_ports = yield EdgePortSelectionForm
ep_list = selected_edge_ports.edge_ports
class BaseBGPPeer(BaseModel):
bfd_enabled: bool = False
bfd_interval: int | None = None
bfd_multiplier: int | None = None
has_custom_policies: bool = False
authentication_key: str
multipath_enabled: bool = False
send_default_route: bool = False
is_passive: bool = False
class IPv4BGPPeer(BaseBGPPeer):
peer_address: IPv4AddressType
add_v4_multicast: bool = Field(default=False, exclude=True)
@computed_field # type: ignore[misc]
@property
def families(self) -> list[IPFamily]:
return [IPFamily.V4UNICAST, IPFamily.V4MULTICAST] if self.add_v4_multicast else [IPFamily.V4UNICAST]
class IPv6BGPPeer(BaseBGPPeer):
peer_address: IPv6AddressType
add_v6_multicast: bool = Field(default=False, exclude=True)
@computed_field # type: ignore[misc]
@property
def families(self) -> list[IPFamily]:
return [IPFamily.V6UNICAST, IPFamily.V6MULTICAST] if self.add_v6_multicast else [IPFamily.V6UNICAST]
binding_port_inputs = []
for ep_index, edge_port in enumerate(ep_list):
class BindingPortsInputForm(FormPage):
model_config = ConfigDict(title=f"GÉANT IP - Configure Edge Ports ({ep_index + 1}/{len(ep_list)})")
info_label: Label = Field("Please configure the Service Binding Ports for each Edge Port.", exclude=True)
current_ep_label: Label = Field(
f"Currently configuring on {EdgePort.from_subscription(edge_port.edge_port).description} "
f"(Access Port type: {edge_port.ap_type})",
exclude=True,
)
geant_sid: str
is_tagged: bool = False
vlan_id: VLAN_ID
ipv4_address: IPv4AddressType
ipv4_mask: IPV4Netmask
ipv6_address: IPv6AddressType
ipv6_mask: IPV6Netmask
custom_firewall_filters: bool = False
divider: Divider = Field(None, exclude=True)
v4_bgp_peer: IPv4BGPPeer
v6_bgp_peer: IPv6BGPPeer
binding_port_input_form = yield BindingPortsInputForm
binding_port_inputs.append(
binding_port_input_form.model_dump()
| {
"bgp_peers": [
binding_port_input_form.v4_bgp_peer.model_dump(),
binding_port_input_form.v6_bgp_peer.model_dump(),
]
}
)
return (
initial_user_input.model_dump()
| selected_edge_ports.model_dump()
| {"binding_port_inputs": binding_port_inputs, "product_name": product_name}
)
@step("Create subscription")
def create_subscription(product: UUIDstr, partner: str) -> State:
"""Create a new subscription object in the database."""
subscription = GeantIPInactive.from_product_id(product, partner)
return {"subscription": subscription, "subscription_id": subscription.subscription_id}
@step("Initialize subscription")
def initialize_subscription(
subscription: GeantIPInactive, edge_ports: list[dict], binding_port_inputs: list[dict]
) -> State:
"""Take all user inputs and use them to populate the subscription model."""
edge_port_fqdn_list = []
for edge_port_input, sbp_input in zip(edge_ports, binding_port_inputs, strict=False):
edge_port_subscription = EdgePort.from_subscription(edge_port_input["edge_port"])
sbp_bgp_session_list = [
BGPSession.new(subscription_id=uuid4(), **session, rtbh_enabled=True, is_multi_hop=True)
for session in sbp_input["bgp_peers"]
]
service_binding_port = ServiceBindingPortInactive.new(
subscription_id=uuid4(),
**sbp_input,
sbp_bgp_session_list=sbp_bgp_session_list,
sbp_type=SBPType.L3,
edge_port=edge_port_subscription.edge_port,
)
subscription.geant_ip.geant_ip_ap_list.append(
NRENAccessPortInactive.new(
subscription_id=uuid4(),
nren_ap_type=edge_port_input["ap_type"],
geant_ip_sbp=service_binding_port,
)
)
edge_port_fqdn_list.append(edge_port_subscription.edge_port.edge_port_node.router_fqdn)
subscription.description = "GEANT IP service"
return {"subscription": subscription, "edge_port_fqdn_list": edge_port_fqdn_list}
@step("[DRY RUN] Deploy service binding port")
def provision_sbp_dry(
subscription: dict[str, Any], process_id: UUIDstr, tt_number: str, edge_port_fqdn_list: list[str]
) -> LSOState:
"""Perform a dry run of deploying Service Binding Ports."""
extra_vars = {
"subscription": subscription,
"dry_run": True,
"verb": "deploy",
"commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - "
f"Deploy config for {subscription["description"]}",
}
return {
"playbook_name": "manage_sbp.yaml",
"inventory": {"all": {"hosts": dict.fromkeys(edge_port_fqdn_list)}},
"extra_vars": extra_vars,
}
@step("[FOR REAL] Deploy service binding port")
def provision_sbp_real(
subscription: dict[str, Any], process_id: UUIDstr, tt_number: str, edge_port_fqdn_list: list[str]
) -> LSOState:
"""Deploy Service Binding Ports."""
extra_vars = {
"subscription": subscription,
"dry_run": False,
"verb": "deploy",
"commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - "
f"Deploy config for {subscription["description"]}",
}
return {
"playbook_name": "manage_sbp.yaml",
"inventory": {"all": {"hosts": dict.fromkeys(edge_port_fqdn_list)}},
"extra_vars": extra_vars,
}
@step("Check service binding port functionality")
def check_sbp_functionality(subscription: dict[str, Any], edge_port_fqdn_list: list[str]) -> LSOState:
"""Check functionality of deployed Service Binding Ports."""
extra_vars = {"subscription": subscription, "verb": "check"}
return {
"playbook_name": "manage_sbp.yaml",
"inventory": {"all": {"hosts": dict.fromkeys(edge_port_fqdn_list)}},
"extra_vars": extra_vars,
}
@step("[DRY RUN] Deploy BGP peers")
def deploy_bgp_peers_dry(
subscription: dict[str, Any], edge_port_fqdn_list: list[str], tt_number: str, process_id: UUIDstr
) -> LSOState:
"""Perform a dry run of deploying :term:`BGP` peers."""
extra_vars = {
"subscription": subscription,
"verb": "deploy",
"dry_run": True,
"commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - "
f"Deploying BGP peers for {subscription["description"]}",
}
return {
"playbook_name": "manage_sbp.yaml",
"inventory": {"all": {"hosts": dict.fromkeys(edge_port_fqdn_list)}},
"extra_vars": extra_vars,
}
@step("[FOR REAL] Deploy BGP peers")
def deploy_bgp_peers_real(
subscription: dict[str, Any], edge_port_fqdn_list: list[str], tt_number: str, process_id: UUIDstr
) -> LSOState:
"""Deploy :term:`BGP` peers."""
extra_vars = {
"subscription": subscription,
"verb": "deploy",
"dry_run": False,
"commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - "
f"Deploying BGP peers for {subscription["description"]}",
}
return {
"playbook_name": "manage_sbp.yaml",
"inventory": {"all": {"hosts": dict.fromkeys(edge_port_fqdn_list)}},
"extra_vars": extra_vars,
}
@step("Check BGP peers")
def check_bgp_peers(subscription: dict[str, Any], edge_port_fqdn_list: list[str]) -> LSOState:
"""Check correct deployment of :term:`BGP` peers."""
extra_vars = {"subscription": subscription, "verb": "check"}
return {
"playbook_name": "manage_sbp.yaml",
"inventory": {"all": {"hosts": dict.fromkeys(edge_port_fqdn_list)}},
"extra_vars": extra_vars,
}
@step("Update Infoblox")
def update_dns_records(subscription: GeantIPInactive) -> State:
"""Update :term:`DNS` records in Infoblox."""
# TODO: implement
return {"subscription": subscription}
@workflow(
"Create GÉANT IP",
initial_input_form=wrap_create_initial_input_form(initial_input_form_generator),
target=Target.CREATE,
)
def create_geant_ip() -> StepList:
"""Create a new GÉANT IP subscription.
* Create subscription object in the service database
* Deploy service binding ports
* Deploy :term:`BGP` peers
* Update :term:`DNS` records
* Set the subscription in a provisioning state in the database
"""
return (
begin
>> create_subscription
>> store_process_subscription(Target.CREATE)
>> initialize_subscription
>> lso_interaction(provision_sbp_dry)
>> lso_interaction(provision_sbp_real)
>> lso_interaction(check_sbp_functionality)
>> lso_interaction(deploy_bgp_peers_dry)
>> lso_interaction(deploy_bgp_peers_real)
>> lso_interaction(check_bgp_peers)
>> update_dns_records
>> set_status(SubscriptionLifecycle.ACTIVE)
>> resync
>> done
)
"""A modification workflow for migrating an ImportedGeantIP to an GeantIP subscription."""
from orchestrator.targets import Target
from orchestrator.types import State, UUIDstr
from orchestrator.workflow import StepList, done, init, step, workflow
from orchestrator.workflows.steps import resync, store_process_subscription, unsync
from orchestrator.workflows.utils import wrap_modify_initial_input_form
from gso.products import ProductName
from gso.products.product_types.nren_l3_core_service import GeantIP, ImportedGeantIP
from gso.services.subscriptions import get_product_id_by_name
@step("Create new IP trunk subscription")
def import_geant_ip_subscription(subscription_id: UUIDstr) -> State:
"""Take an ImportedGeantIP subscription, and turn it into an GeantIP subscription."""
old_geant_ip = ImportedGeantIP.from_subscription(subscription_id)
new_subscription_id = get_product_id_by_name(ProductName.GEANT_IP)
new_subscription = GeantIP.from_other_product(old_geant_ip, new_subscription_id) # type: ignore[arg-type]
return {"subscription": new_subscription}
@workflow("Import GÉANT IP", target=Target.MODIFY, initial_input_form=wrap_modify_initial_input_form(None))
def import_geant_ip() -> StepList:
"""Modify an ImportedGeantIP subscription into an GeantIP subscription to complete the import."""
return init >> store_process_subscription(Target.MODIFY) >> unsync >> import_geant_ip_subscription >> resync >> done
"""A modification workflow that migrates a GÉANT IP subscription to a different set of Edge Ports."""
from typing import Annotated
from annotated_types import Len
from orchestrator import workflow
from orchestrator.targets import Target
from orchestrator.workflow import StepList, begin, done, step
from orchestrator.workflows.steps import resync, store_process_subscription, unsync
from orchestrator.workflows.utils import wrap_modify_initial_input_form
from pydantic import AfterValidator, BaseModel, ConfigDict, Field
from pydantic_forms.core import FormPage
from pydantic_forms.types import FormGenerator, State, UUIDstr
from pydantic_forms.validators import Choice, Divider
from gso.products.product_types.edge_port import EdgePort
from gso.products.product_types.nren_l3_core_service import GeantIP
from gso.services.subscriptions import get_active_edge_port_subscriptions
from gso.utils.types.tt_number import TTNumber
def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
"""Gather input from the operator on what new Edge Ports this GÉANT IP should migrate to."""
subscription = GeantIP.from_subscription(subscription_id)
partner_id = subscription.customer_id
edge_port_count = len(subscription.geant_ip.geant_ip_ap_list)
def _new_edge_port_selector(pid: UUIDstr) -> Choice:
existing_ep_name_list = [
ap.geant_ip_sbp.edge_port.owner_subscription_id for ap in subscription.geant_ip.geant_ip_ap_list
]
edge_port_subscriptions = list(
filter(
lambda ep: bool(ep["customer_id"] == pid) and ep["subscription_id"] not in existing_ep_name_list,
get_active_edge_port_subscriptions(includes=["subscription_id", "description", "customer_id"]),
)
)
edge_ports = {str(port["subscription_id"]): port["description"] for port in edge_port_subscriptions}
return Choice(
"Select an Edge Port",
zip(edge_ports.keys(), edge_ports.items(), strict=True), # type: ignore[arg-type]
)
class NewEdgePortSelection(BaseModel):
old_edge_port: str
new_edge_port: _new_edge_port_selector(partner_id) | str # type: ignore[valid-type]
def _validate_new_edge_ports_are_unique(edge_ports: list[NewEdgePortSelection]) -> list[NewEdgePortSelection]:
new_edge_ports = [str(port.new_edge_port) for port in edge_ports]
if len(new_edge_ports) != len(set(new_edge_ports)):
msg = "New Edge Ports must be unique"
raise ValueError(msg)
return edge_ports
class GeantIPEdgePortSelectionForm(FormPage):
model_config = ConfigDict(title="Migrating GÉANT IP to a new set of Edge Ports")
tt_number: TTNumber
divider: Divider = Field(None, exclude=True)
edge_port_selection: Annotated[
list[NewEdgePortSelection],
AfterValidator(_validate_new_edge_ports_are_unique),
Len(min_length=edge_port_count, max_length=edge_port_count),
] = [ # noqa: RUF012
NewEdgePortSelection(
old_edge_port=f"{
EdgePort.from_subscription(ap.geant_ip_sbp.edge_port.owner_subscription_id).description
} ({ap.nren_ap_type})",
new_edge_port="",
)
for ap in subscription.geant_ip.geant_ip_ap_list
]
ep_user_input = yield GeantIPEdgePortSelectionForm
return {"subscription_id": subscription_id, "subscription": subscription} | ep_user_input.model_dump()
@step("Update subscription model")
def update_subscription_model(subscription: GeantIP, edge_port_selection: list[dict]) -> State:
"""Update the subscription model with the new list of Access Ports."""
for index, selected_port in enumerate(edge_port_selection):
subscription.geant_ip.geant_ip_ap_list[index].geant_ip_sbp.edge_port = EdgePort.from_subscription(
selected_port["new_edge_port"]
).edge_port
return {"subscription": subscription}
@workflow(
"Migrate GÉANT IP",
initial_input_form=wrap_modify_initial_input_form(initial_input_form_generator),
target=Target.MODIFY,
)
def migrate_geant_ip() -> StepList:
"""Migrate a GÉANT IP to a new set of Edge Ports."""
return begin >> store_process_subscription(Target.MODIFY) >> unsync >> update_subscription_model >> resync >> done
"""A modification workflow for a GÉANT IP subscription."""
from typing import Annotated, Any
from uuid import uuid4
from orchestrator import begin, conditional, done, step, workflow
from orchestrator.forms import FormPage
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, UUIDstr
from orchestrator.workflow import StepList
from orchestrator.workflows.steps import resync, store_process_subscription, unsync
from orchestrator.workflows.utils import wrap_modify_initial_input_form
from pydantic import AfterValidator, BaseModel, ConfigDict, Field, computed_field
from pydantic_forms.types import State
from pydantic_forms.validators import Divider, Label
from gso.products.product_blocks.bgp_session import BGPSession, IPFamily
from gso.products.product_blocks.nren_l3_core_service import NRENAccessPort
from gso.products.product_blocks.service_binding_port import VLAN_ID, ServiceBindingPort
from gso.products.product_types.edge_port import EdgePort
from gso.products.product_types.nren_l3_core_service import GeantIP
from gso.utils.helpers import active_edge_port_selector
from gso.utils.shared_enums import APType, SBPType
from gso.utils.types.ip_address import IPv4AddressType, IPV4Netmask, IPv6AddressType, IPV6Netmask
def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
"""Get input about added, removed, and modified Access Ports."""
subscription = GeantIP.from_subscription(subscription_id)
class AccessPortSelection(BaseModel):
geant_ip_ep: active_edge_port_selector(partner_id=subscription.customer_id) | str # type: ignore[valid-type]
nren_ap_type: APType
def validate_edge_ports_are_unique(access_ports: list[AccessPortSelection]) -> list[AccessPortSelection]:
"""Verify if interfaces are unique."""
edge_ports = [str(port.geant_ip_ep) for port in access_ports]
if len(edge_ports) != len(set(edge_ports)):
msg = "Edge Ports must be unique."
raise ValueError(msg)
return access_ports
class ModifyGeantIPAccessPortsForm(FormPage):
model_config = ConfigDict(title="Modify GÉANT IP")
access_ports: Annotated[list[AccessPortSelection], AfterValidator(validate_edge_ports_are_unique)] = [ # noqa: RUF012
AccessPortSelection(
geant_ip_ep=str(access_port.geant_ip_sbp.edge_port.owner_subscription_id),
nren_ap_type=access_port.nren_ap_type,
)
for access_port in subscription.geant_ip.geant_ip_ap_list
]
access_port_input = yield ModifyGeantIPAccessPortsForm
input_ap_list = access_port_input.access_ports
input_ep_list = [str(ap.geant_ip_ep) for ap in input_ap_list]
existing_ep_list = [
str(ap.geant_ip_sbp.edge_port.owner_subscription_id) for ap in subscription.geant_ip.geant_ip_ap_list
]
class BaseBGPPeer(BaseModel):
bfd_enabled: bool = False
bfd_interval: int | None = None
bfd_multiplier: int | None = None
has_custom_policies: bool = False
authentication_key: str
multipath_enabled: bool = False
send_default_route: bool = False
is_passive: bool = False
class IPv4BGPPeer(BaseBGPPeer):
peer_address: IPv4AddressType
add_v4_multicast: bool = Field(default=False, exclude=True)
@computed_field # type: ignore[misc]
@property
def families(self) -> list[IPFamily]:
return [IPFamily.V4UNICAST, IPFamily.V4MULTICAST] if self.add_v4_multicast else [IPFamily.V4UNICAST]
class IPv6BGPPeer(BaseBGPPeer):
peer_address: IPv6AddressType
add_v6_multicast: bool = Field(default=False, exclude=True)
@computed_field # type: ignore[misc]
@property
def families(self) -> list[IPFamily]:
return [IPFamily.V6UNICAST, IPFamily.V6MULTICAST] if self.add_v6_multicast else [IPFamily.V6UNICAST]
# There are three possible scenarios for Edge Ports. They can be added, removed, or their relevant SBP can be
# modified.
removed_ap_list = [
access_port.subscription_instance_id
for access_port in subscription.geant_ip.geant_ip_ap_list
if str(access_port.geant_ip_sbp.edge_port.owner_subscription_id) not in input_ep_list
]
modified_ap_list = [
(
access_port,
next(
(
ap.nren_ap_type
for ap in input_ap_list
if str(ap.geant_ip_ep) == str(access_port.geant_ip_sbp.edge_port.owner_subscription_id)
),
None,
),
)
for access_port in subscription.geant_ip.geant_ip_ap_list
if str(access_port.geant_ip_sbp.edge_port.owner_subscription_id) in input_ep_list
]
added_ap_list = [
(ep, next(ap.nren_ap_type for ap in input_ap_list if str(ap.geant_ip_ep) == ep))
for ep in input_ep_list
if ep not in existing_ep_list
]
# First, the user can modify existing Edge Ports
sbp_inputs = []
for access_port_index, ap_entry in enumerate(modified_ap_list):
access_port, new_ap_type = ap_entry
current_sbp = access_port.geant_ip_sbp
v4_peer = next(peer for peer in current_sbp.sbp_bgp_session_list if IPFamily.V4UNICAST in peer.families)
v6_peer = next(peer for peer in current_sbp.sbp_bgp_session_list if IPFamily.V6UNICAST in peer.families)
class BindingPortModificationForm(FormPage):
model_config = ConfigDict(
title=f"GÉANT IP - Modify Edge Port configuration ({access_port_index + 1}/{len(input_ap_list)})"
)
current_ep_label: Label = Field(
f"Currently configuring on {access_port.geant_ip_sbp.edge_port.description} "
f"(Access Port type: {access_port.nren_ap_type})",
exclude=True,
)
geant_sid: str = current_sbp.geant_sid
is_tagged: bool = current_sbp.is_tagged
# The SBP model doesn't require these three fields, but in the case of GÉANT IP this will never occur since
# it's a layer 3 service. The ignore statements are there to put our type checker at ease.
vlan_id: VLAN_ID = current_sbp.vlan_id # type: ignore[assignment]
ipv4_address: IPv4AddressType = current_sbp.ipv4_address # type: ignore[assignment]
ipv4_mask: IPV4Netmask = current_sbp.ipv4_mask # type: ignore[assignment]
ipv6_address: IPv6AddressType = current_sbp.ipv6_address # type: ignore[assignment]
ipv6_mask: IPV6Netmask = current_sbp.ipv6_mask # type: ignore[assignment]
custom_firewall_filters: bool = current_sbp.custom_firewall_filters
divider: Divider = Field(None, exclude=True)
v4_bgp_peer: IPv4BGPPeer = IPv4BGPPeer(
**v4_peer.model_dump(exclude=set("families")),
add_v4_multicast=bool(IPFamily.V4MULTICAST in v4_peer.families),
)
v6_bgp_peer: IPv6BGPPeer = IPv6BGPPeer(
**v6_peer.model_dump(exclude=set("families")),
add_v6_multicast=bool(IPFamily.V6MULTICAST in v6_peer.families),
)
binding_port_input_form = yield BindingPortModificationForm
sbp_inputs.append(
binding_port_input_form.model_dump()
| {
"new_ap_type": new_ap_type,
"current_sbp_id": current_sbp.subscription_instance_id,
}
)
# Second, newly added Edge Ports are configured
binding_port_inputs = []
for ap_index, access_port_tuple in enumerate(added_ap_list):
edge_port_id, ap_type = access_port_tuple
class BindingPortInputForm(FormPage):
model_config = ConfigDict(
title=f"GÉANT IP - Configure new Edge Port "
f"({len(modified_ap_list) + ap_index + 1}/{len(input_ap_list)})"
)
info_label: Label = Field(
"Please configure the Service Binding Ports for each newly added Edge Port", exclude=True
)
current_ep_label: Label = Field(
f"Currently configuring on {EdgePort.from_subscription(edge_port_id).description} "
f"(Access Port type: {ap_type})",
exclude=True,
)
geant_sid: str
is_tagged: bool = False
vlan_id: VLAN_ID
ipv4_address: IPv4AddressType
ipv6_address: IPv6AddressType
custom_firewall_filters: bool = False
divider: Divider = Field(None, exclude=True)
v4_bgp_peer: IPv4BGPPeer
v6_bgp_peer: IPv6BGPPeer
binding_port_input_form = yield BindingPortInputForm
binding_port_inputs.append(
binding_port_input_form.model_dump()
| {
"bgp_peers": [
binding_port_input_form.v4_bgp_peer.model_dump(),
binding_port_input_form.v6_bgp_peer.model_dump(),
],
"edge_port_id": edge_port_id,
"ap_type": ap_type,
}
)
return access_port_input.model_dump() | {
"added_service_binding_ports": binding_port_inputs,
"removed_access_ports": removed_ap_list,
"modified_sbp_list": sbp_inputs,
}
@step("Clean up removed Edge Ports")
def remove_old_sbp_blocks(subscription: GeantIP, removed_access_ports: list[UUIDstr]) -> State:
"""Remove old :term:`SBP` product blocks from the GÉANT IP subscription."""
subscription.geant_ip.geant_ip_ap_list = [
ap
for ap in subscription.geant_ip.geant_ip_ap_list
if str(ap.subscription_instance_id) not in removed_access_ports
]
return {"subscription": subscription}
@step("Modify existing Service Binding Ports")
def modify_existing_sbp_blocks(subscription: GeantIP, modified_sbp_list: list[dict[str, Any]]) -> State:
"""Update the subscription model."""
for access_port in subscription.geant_ip.geant_ip_ap_list:
current_sbp = access_port.geant_ip_sbp
modified_sbp_data = next(
sbp for sbp in modified_sbp_list if sbp["current_sbp_id"] == str(current_sbp.subscription_instance_id)
)
v4_peer = next(peer for peer in current_sbp.sbp_bgp_session_list if IPFamily.V4UNICAST in peer.families)
for attribute in modified_sbp_data["v4_bgp_peer"]:
setattr(v4_peer, attribute, modified_sbp_data["v4_bgp_peer"][attribute])
v6_peer = next(peer for peer in current_sbp.sbp_bgp_session_list if IPFamily.V6UNICAST in peer.families)
for attribute in modified_sbp_data["v6_bgp_peer"]:
setattr(v6_peer, attribute, modified_sbp_data["v6_bgp_peer"][attribute])
current_sbp.sbp_bgp_session_list = [v4_peer, v6_peer]
current_sbp.vlan_id = modified_sbp_data["vlan_id"]
current_sbp.geant_sid = modified_sbp_data["geant_sid"]
current_sbp.is_tagged = modified_sbp_data["is_tagged"]
current_sbp.ipv4_address = modified_sbp_data["ipv4_address"]
current_sbp.ipv6_address = modified_sbp_data["ipv6_address"]
current_sbp.custom_firewall_filters = modified_sbp_data["custom_firewall_filters"]
access_port.nren_ap_type = modified_sbp_data["new_ap_type"]
return {"subscription": subscription}
@step("Instantiate new Service Binding Ports")
def create_new_sbp_blocks(subscription: GeantIP, added_service_binding_ports: list[dict[str, Any]]) -> State:
"""Add new :term:`SBP`s to the GÉANT IP subscription."""
for sbp_input in added_service_binding_ports:
edge_port = EdgePort.from_subscription(sbp_input["edge_port_id"])
sbp_bgp_session_list = [
BGPSession.new(subscription_id=uuid4(), **session, rtbh_enabled=True, is_multi_hop=True)
for session in sbp_input["bgp_peers"]
]
service_binding_port = ServiceBindingPort.new(
subscription_id=uuid4(),
**sbp_input,
sbp_bgp_session_list=sbp_bgp_session_list,
sbp_type=SBPType.L3,
edge_port=edge_port.edge_port,
)
subscription.geant_ip.geant_ip_ap_list.append(
NRENAccessPort.new(
subscription_id=uuid4(),
nren_ap_type=sbp_input["ap_type"],
geant_ip_sbp=service_binding_port,
)
)
return {"subscription": subscription}
@workflow(
"Modify GÉANT IP",
initial_input_form=wrap_modify_initial_input_form(initial_input_form_generator),
target=Target.MODIFY,
)
def modify_geant_ip() -> StepList:
"""Modify a GÉANT IP subscription."""
access_ports_are_removed = conditional(lambda state: bool(len(state["removed_access_ports"]) > 0))
access_ports_are_modified = conditional(lambda state: bool(len(state["modified_sbp_list"]) > 0))
access_ports_are_added = conditional(lambda state: bool(len(state["added_service_binding_ports"]) > 0))
return (
begin
>> store_process_subscription(Target.MODIFY)
>> unsync
>> access_ports_are_removed(remove_old_sbp_blocks)
>> access_ports_are_modified(modify_existing_sbp_blocks)
>> access_ports_are_added(create_new_sbp_blocks)
>> resync
>> done
)
......@@ -4,7 +4,7 @@ from typing import Self
from orchestrator.config.assignee import Assignee
from orchestrator.forms import FormPage
from orchestrator.forms.validators import Choice, Label
from orchestrator.forms.validators import Label
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, State, SubscriptionLifecycle, UUIDstr
from orchestrator.utils.errors import ProcessFailureError
......@@ -17,13 +17,13 @@ from pydantic_forms.validators import ReadOnlyField
from gso.products.product_blocks.router import RouterRole
from gso.products.product_types.router import RouterInactive, RouterProvisioning
from gso.products.product_types.site import Site
from gso.services import infoblox, subscriptions
from gso.services import infoblox
from gso.services.lso_client import lso_interaction
from gso.services.netbox_client import NetboxClient
from gso.services.partners import get_partner_by_name
from gso.services.sharepoint import SharePointClient
from gso.settings import load_oss_params
from gso.utils.helpers import generate_fqdn, iso_from_ipv4
from gso.utils.helpers import active_site_selector, generate_fqdn, iso_from_ipv4
from gso.utils.shared_enums import Vendor
from gso.utils.types.ip_address import PortNumber
from gso.utils.types.tt_number import TTNumber
......@@ -35,15 +35,6 @@ from gso.utils.workflow_steps import (
)
def _site_selector() -> Choice:
site_subscriptions = {}
for site in subscriptions.get_active_site_subscriptions(includes=["subscription_id", "description"]):
site_subscriptions[str(site["subscription_id"])] = site["description"]
# noinspection PyTypeChecker
return Choice("Select a site", zip(site_subscriptions.keys(), site_subscriptions.items(), strict=True)) # type: ignore[arg-type]
def initial_input_form_generator(product_name: str) -> FormGenerator:
"""Gather information about the new router from the operator."""
......@@ -53,7 +44,7 @@ def initial_input_form_generator(product_name: str) -> FormGenerator:
tt_number: TTNumber
partner: ReadOnlyField("GEANT", default_type=str) # type: ignore[valid-type]
vendor: Vendor
router_site: _site_selector() # type: ignore[valid-type]
router_site: active_site_selector() # type: ignore[valid-type]
hostname: str
ts_port: PortNumber
router_role: RouterRole
......