Skip to content
Snippets Groups Projects
Verified Commit ea06b9e1 authored by Neda Moeini's avatar Neda Moeini Committed by Karel van Klink
Browse files

Implement Edge port workflows

parent 62b9837b
No related branches found
No related tags found
1 merge request!286Add Edge Port, GÉANT IP and IAS products
"""Add Edge Port product..
Revision ID: 6456d3a9d150
Revision ID: 75b5c3597bf4
Revises: 87a05eddee3e
Create Date: 2024-08-23 09:51:21.029168
Create Date: 2024-08-27 11:46:14.049679
"""
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = '6456d3a9d150'
revision = '75b5c3597bf4'
down_revision = '87a05eddee3e'
branch_labels = None
depends_on = None
......@@ -18,40 +18,43 @@ depends_on = None
def upgrade() -> None:
conn = op.get_bind()
conn.execute(sa.text("""
INSERT INTO products (name, description, product_type, tag, status) VALUES ('Edge port', 'Edge Port product', 'EdgePort', 'EDGE_PORT', 'active') RETURNING products.product_id
INSERT INTO products (name, description, product_type, tag, status) VALUES ('Edge port', 'Edge Port', 'EdgePort', 'EDGE_PORT', 'active') RETURNING products.product_id
"""))
conn.execute(sa.text("""
INSERT INTO product_blocks (name, description, tag, status) VALUES ('EdgePortBlock', 'Edge port product block.', 'EDGE_PORT_BLOCK', 'active') RETURNING product_blocks.product_block_id
INSERT INTO product_blocks (name, description, tag, status) VALUES ('EdgePortBlock', 'Edge Port product block.', 'EDGE_PORT_BLOCK', 'active') RETURNING product_blocks.product_block_id
"""))
conn.execute(sa.text("""
INSERT INTO product_blocks (name, description, tag, status) VALUES ('EdgePortInterfaceBlock', 'Edge port interface block', 'EDGE_PORT_IFACE_BLK', 'active') RETURNING product_blocks.product_block_id
INSERT INTO product_blocks (name, description, tag, status) VALUES ('EdgePortInterfaceBlock', 'Edge Port Iface product block.', 'EDGE_PORT_IFACE_BLCK', 'active') RETURNING product_blocks.product_block_id
"""))
conn.execute(sa.text("""
INSERT INTO resource_types (resource_type, description) VALUES ('edge_port_name', 'The name of the edge port, In our case, this is the name of the LAG interface.') RETURNING resource_types.resource_type_id
INSERT INTO resource_types (resource_type, description) VALUES ('edge_port_name', 'The name of the edge port, in our case, corresponds to the name of the LAG interface.') RETURNING resource_types.resource_type_id
"""))
conn.execute(sa.text("""
INSERT INTO resource_types (resource_type, description) VALUES ('edge_port_member_speed', 'The speed capacity of each member in the physical port.') RETURNING resource_types.resource_type_id
"""))
conn.execute(sa.text("""
INSERT INTO resource_types (resource_type, description) VALUES ('edge_port_mac_address', 'The MAC address assigned to this edge port, if applicable.') RETURNING resource_types.resource_type_id
INSERT INTO resource_types (resource_type, description) VALUES ('edge_port_description', 'A description of the edge port.') RETURNING resource_types.resource_type_id
"""))
conn.execute(sa.text("""
INSERT INTO resource_types (resource_type, description) VALUES ('edge_port_minimum_links', 'The minimum number of links required for this edge port.') RETURNING resource_types.resource_type_id
"""))
conn.execute(sa.text("""
INSERT INTO resource_types (resource_type, description) VALUES ('edge_port_ignore_if_down', 'If set to True, the edge port will be ignored if it is down.') RETURNING resource_types.resource_type_id
INSERT INTO resource_types (resource_type, description) VALUES ('edge_port_enable_lacp', 'Indicates whether LACP (Link Aggregation Control Protocol) is enabled for this edge port.') RETURNING resource_types.resource_type_id
"""))
conn.execute(sa.text("""
INSERT INTO resource_types (resource_type, description) VALUES ('edge_port_type', 'The type of edge port (e.g., customer, private, public).') RETURNING resource_types.resource_type_id
"""))
conn.execute(sa.text("""
INSERT INTO resource_types (resource_type, description) VALUES ('edge_port_encapsulation', 'The type of encapsulation used on this edge port, by default DOT1Q.') RETURNING resource_types.resource_type_id
INSERT INTO resource_types (resource_type, description) VALUES ('edge_port_geant_ga_id', 'The GEANT GA ID associated with this edge port, if any.') RETURNING resource_types.resource_type_id
"""))
conn.execute(sa.text("""
INSERT INTO resource_types (resource_type, description) VALUES ('edge_port_enable_lacp', 'Indicates whether LACP (Link Aggregation Control Protocol) is enabled for this edge port.') RETURNING resource_types.resource_type_id
INSERT INTO resource_types (resource_type, description) VALUES ('edge_port_mac_address', 'The MAC address assigned to this edge port, if applicable.') RETURNING resource_types.resource_type_id
"""))
conn.execute(sa.text("""
INSERT INTO resource_types (resource_type, description) VALUES ('edge_port_geant_ga_id', 'The GEANT GA ID associated with this edge port, if any.') RETURNING resource_types.resource_type_id
INSERT INTO resource_types (resource_type, description) VALUES ('edge_port_member_speed', 'The speed capacity of each member in the physical port.') RETURNING resource_types.resource_type_id
"""))
conn.execute(sa.text("""
INSERT INTO resource_types (resource_type, description) VALUES ('edge_port_ignore_if_down', 'If set to True, the edge port will be ignored if it is down.') RETURNING resource_types.resource_type_id
"""))
conn.execute(sa.text("""
INSERT INTO resource_types (resource_type, description) VALUES ('edge_port_encapsulation', 'The type of encapsulation used on this edge port, by default DOT1Q.') RETURNING resource_types.resource_type_id
"""))
conn.execute(sa.text("""
INSERT INTO product_product_blocks (product_id, product_block_id) VALUES ((SELECT products.product_id FROM products WHERE products.name IN ('Edge port')), (SELECT product_blocks.product_block_id FROM product_blocks WHERE product_blocks.name IN ('EdgePortBlock')))
......@@ -66,6 +69,9 @@ INSERT INTO product_block_relations (in_use_by_id, depends_on_id) VALUES ((SELEC
INSERT INTO product_block_resource_types (product_block_id, resource_type_id) VALUES ((SELECT product_blocks.product_block_id FROM product_blocks WHERE product_blocks.name IN ('EdgePortBlock')), (SELECT resource_types.resource_type_id FROM resource_types WHERE resource_types.resource_type IN ('edge_port_name')))
"""))
conn.execute(sa.text("""
INSERT INTO product_block_resource_types (product_block_id, resource_type_id) VALUES ((SELECT product_blocks.product_block_id FROM product_blocks WHERE product_blocks.name IN ('EdgePortBlock')), (SELECT resource_types.resource_type_id FROM resource_types WHERE resource_types.resource_type IN ('edge_port_description')))
"""))
conn.execute(sa.text("""
INSERT INTO product_block_resource_types (product_block_id, resource_type_id) VALUES ((SELECT product_blocks.product_block_id FROM product_blocks WHERE product_blocks.name IN ('EdgePortBlock')), (SELECT resource_types.resource_type_id FROM resource_types WHERE resource_types.resource_type IN ('edge_port_enable_lacp')))
"""))
conn.execute(sa.text("""
......@@ -106,6 +112,12 @@ DELETE FROM product_block_resource_types WHERE product_block_resource_types.prod
DELETE FROM subscription_instance_values USING product_block_resource_types WHERE subscription_instance_values.subscription_instance_id IN (SELECT subscription_instances.subscription_instance_id FROM subscription_instances WHERE subscription_instances.subscription_instance_id IN (SELECT product_blocks.product_block_id FROM product_blocks WHERE product_blocks.name IN ('EdgePortBlock'))) AND product_block_resource_types.resource_type_id = (SELECT resource_types.resource_type_id FROM resource_types WHERE resource_types.resource_type IN ('edge_port_name'))
"""))
conn.execute(sa.text("""
DELETE FROM product_block_resource_types WHERE product_block_resource_types.product_block_id IN (SELECT product_blocks.product_block_id FROM product_blocks WHERE product_blocks.name IN ('EdgePortBlock')) AND product_block_resource_types.resource_type_id = (SELECT resource_types.resource_type_id FROM resource_types WHERE resource_types.resource_type IN ('edge_port_description'))
"""))
conn.execute(sa.text("""
DELETE FROM subscription_instance_values USING product_block_resource_types WHERE subscription_instance_values.subscription_instance_id IN (SELECT subscription_instances.subscription_instance_id FROM subscription_instances WHERE subscription_instances.subscription_instance_id IN (SELECT product_blocks.product_block_id FROM product_blocks WHERE product_blocks.name IN ('EdgePortBlock'))) AND product_block_resource_types.resource_type_id = (SELECT resource_types.resource_type_id FROM resource_types WHERE resource_types.resource_type IN ('edge_port_description'))
"""))
conn.execute(sa.text("""
DELETE FROM product_block_resource_types WHERE product_block_resource_types.product_block_id IN (SELECT product_blocks.product_block_id FROM product_blocks WHERE product_blocks.name IN ('EdgePortBlock')) AND product_block_resource_types.resource_type_id = (SELECT resource_types.resource_type_id FROM resource_types WHERE resource_types.resource_type IN ('edge_port_enable_lacp'))
"""))
conn.execute(sa.text("""
......@@ -166,10 +178,10 @@ DELETE FROM product_block_resource_types WHERE product_block_resource_types.prod
DELETE FROM subscription_instance_values USING product_block_resource_types WHERE subscription_instance_values.subscription_instance_id IN (SELECT subscription_instances.subscription_instance_id FROM subscription_instances WHERE subscription_instances.subscription_instance_id IN (SELECT product_blocks.product_block_id FROM product_blocks WHERE product_blocks.name IN ('EdgePortInterfaceBlock'))) AND product_block_resource_types.resource_type_id = (SELECT resource_types.resource_type_id FROM resource_types WHERE resource_types.resource_type IN ('interface_description'))
"""))
conn.execute(sa.text("""
DELETE FROM subscription_instance_values WHERE subscription_instance_values.resource_type_id IN (SELECT resource_types.resource_type_id FROM resource_types WHERE resource_types.resource_type IN ('edge_port_name', 'edge_port_member_speed', 'edge_port_mac_address', 'edge_port_minimum_links', 'edge_port_ignore_if_down', 'edge_port_type', 'edge_port_encapsulation', 'edge_port_enable_lacp', 'edge_port_geant_ga_id'))
DELETE FROM subscription_instance_values WHERE subscription_instance_values.resource_type_id IN (SELECT resource_types.resource_type_id FROM resource_types WHERE resource_types.resource_type IN ('edge_port_name', 'edge_port_description', 'edge_port_minimum_links', 'edge_port_enable_lacp', 'edge_port_type', 'edge_port_geant_ga_id', 'edge_port_mac_address', 'edge_port_member_speed', 'edge_port_ignore_if_down', 'edge_port_encapsulation'))
"""))
conn.execute(sa.text("""
DELETE FROM resource_types WHERE resource_types.resource_type IN ('edge_port_name', 'edge_port_member_speed', 'edge_port_mac_address', 'edge_port_minimum_links', 'edge_port_ignore_if_down', 'edge_port_type', 'edge_port_encapsulation', 'edge_port_enable_lacp', 'edge_port_geant_ga_id')
DELETE FROM resource_types WHERE resource_types.resource_type IN ('edge_port_name', 'edge_port_description', 'edge_port_minimum_links', 'edge_port_enable_lacp', 'edge_port_type', 'edge_port_geant_ga_id', 'edge_port_mac_address', 'edge_port_member_speed', 'edge_port_ignore_if_down', 'edge_port_encapsulation')
"""))
conn.execute(sa.text("""
DELETE FROM product_product_blocks WHERE product_product_blocks.product_id IN (SELECT products.product_id FROM products WHERE products.name IN ('Edge port')) AND product_product_blocks.product_block_id IN (SELECT product_blocks.product_block_id FROM product_blocks WHERE product_blocks.name IN ('EdgePortBlock'))
......
"""Add Edge Port workflows..
Revision ID: c466b64eccfd
Revises: 75b5c3597bf4
Create Date: 2024-08-27 11:54:16.284844
"""
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = 'c466b64eccfd'
down_revision = '75b5c3597bf4'
branch_labels = None
depends_on = None
from orchestrator.migrations.helpers import create_workflow, delete_workflow
new_workflows = [
{
"name": "create_edge_port",
"target": "CREATE",
"description": "Create Edge Port",
"product_type": "EdgePort"
},
{
"name": "modify_edge_port",
"target": "MODIFY",
"description": "Modify Edge Port",
"product_type": "EdgePort"
},
{
"name": "terminate_edge_port",
"target": "TERMINATE",
"description": "Terminate Edge Port",
"product_type": "EdgePort"
},
{
"name": "validate_edge_port",
"target": "SYSTEM",
"description": "Validate Edge Port Configuration",
"product_type": "EdgePort"
}
]
def upgrade() -> None:
conn = op.get_bind()
for workflow in new_workflows:
create_workflow(conn, workflow)
def downgrade() -> None:
conn = op.get_bind()
for workflow in new_workflows:
delete_workflow(conn, workflow["name"])
......@@ -76,14 +76,15 @@ class EdgePortBlockInactive(
):
"""An edge port that's currently inactive. See :class:`EdgePortBlock`."""
node: RouterBlockInactive
edge_port_name: str
enable_lacp: bool
edge_port_node: RouterBlockInactive | None = None
edge_port_name: str | None = None
edge_port_description: str | None = None
edge_port_enable_lacp: bool | None = None
edge_port_encapsulation: EncapsulationType = EncapsulationType.DOT1Q
edge_port_mac_address: str | None = None
edge_port_member_speed: PhysicalPortCapacity
edge_port_member_speed: PhysicalPortCapacity | None = None
edge_port_minimum_links: int | None = None
edge_port_type: EdgePortType
edge_port_type: EdgePortType | None = None
edge_port_ignore_if_down: bool = False
edge_port_geant_ga_id: str | None = None
edge_port_ae_members: LAGMemberList[EdgePortInterfaceBlockInactive]
......@@ -92,8 +93,9 @@ class EdgePortBlockInactive(
class EdgePortBlockProvisioning(EdgePortBlockInactive, lifecycle=[SubscriptionLifecycle.PROVISIONING]):
"""An edge port that's being provisioned. See :class:`EdgePortBlock`."""
node: RouterBlockProvisioning
edge_port_node: RouterBlockProvisioning
edge_port_name: str
edge_port_description: str | None = None
edge_port_enable_lacp: bool
edge_port_encapsulation: EncapsulationType = EncapsulationType.DOT1Q
edge_port_mac_address: str | None = None
......@@ -109,9 +111,11 @@ class EdgePortBlock(EdgePortBlockProvisioning, lifecycle=[SubscriptionLifecycle.
"""An edge port that's currently deployed in the network."""
#: The router that this edge port is connected to.
node: RouterBlock
edge_port_node: RouterBlock
#: The name of the edge port, in our case, corresponds to the name of the LAG interface.
edge_port_name: str
#: A description of the edge port.
edge_port_description: str | None = None
#: Indicates whether LACP (Link Aggregation Control Protocol) is enabled for this edge port.
edge_port_enable_lacp: bool
#: The type of encapsulation used on this edge port, by default DOT1Q.
......
......@@ -289,22 +289,30 @@ class NetboxClient:
interface.lag = None
interface.save()
def get_available_lags(self, router_id: UUID) -> list[str]:
"""Return all available :term:`LAG` not assigned to a device."""
def get_available_lags_in_range(self, router_id: UUID, lag_range: range) -> list[str]:
"""Return all available LAGs within a given range not assigned to a device."""
router_name = Router.from_subscription(router_id).router.router_fqdn
device = self.get_device_by_name(router_name)
# Get the existing :term:`LAG` interfaces for the device
# Get the existing LAG interfaces for the device
lag_interface_names = [
interface["name"] for interface in self.netbox.dcim.interfaces.filter(device=device.name, type="lag")
]
# Generate all feasible LAGs
all_feasible_lags = [f"lag-{i}" for i in FEASIBLE_IP_TRUNK_LAG_RANGE]
# Generate all feasible LAGs in the specified range
all_feasible_lags = [f"lag-{i}" for i in lag_range]
# Return available LAGs not assigned to the device
return [lag for lag in all_feasible_lags if lag not in lag_interface_names]
def get_available_lags(self, router_id: UUID) -> list[str]:
"""Return all available :term:`LAG` not assigned to a device."""
return self.get_available_lags_in_range(router_id, FEASIBLE_IP_TRUNK_LAG_RANGE)
def get_available_services_lags(self, router_id: UUID) -> list[str]:
"""Return all available Edge port LAGs not assigned to a device."""
return self.get_available_lags_in_range(router_id, range(20, 51))
@staticmethod
def calculate_speed_bits_per_sec(speed: str) -> int:
"""Extract the numeric part from the speed."""
......
......@@ -45,8 +45,8 @@ class TierInfo:
return getattr(self, name)
# The range includes values from 1 to 10 (11 is not included)
FEASIBLE_IP_TRUNK_LAG_RANGE = range(1, 11)
FEASIBLE_SERVICES_LAG_RANGE = range(20, 51)
# Define default values
ROUTER_ROLE = {"name": "router", "slug": "router"}
......
......@@ -11,9 +11,10 @@ from gso.products.product_blocks.router import RouterRole
from gso.products.product_types.router import Router
from gso.services import subscriptions
from gso.services.netbox_client import NetboxClient
from gso.utils.shared_enums import Vendor
from gso.utils.types.interfaces import PhysicalPortCapacity
from gso.utils.types.ip_address import IPv4AddressType
from gso.services.partners import get_all_partners
from gso.utils.shared_enums import Vendor
if TYPE_CHECKING:
from gso.products.product_blocks.iptrunk import IptrunkInterfaceBlock
......@@ -75,6 +76,18 @@ def available_lags_choices(router_id: UUID) -> Choice | None:
return Choice("ae iface", zip(side_a_ae_iface_list, side_a_ae_iface_list, strict=True)) # type: ignore[arg-type]
def available_service_lags_choices(router_id: UUID) -> Choice | None:
"""Return a list of available lags for a given router for services.
For Nokia routers, return a list of available lags.
For Juniper routers, return ``None``.
"""
if get_router_vendor(router_id) != Vendor.NOKIA:
return None
side_a_ae_iface_list = NetboxClient().get_available_services_lags(router_id)
return Choice("ae iface", zip(side_a_ae_iface_list, side_a_ae_iface_list, strict=True)) # type: ignore[arg-type]
def get_router_vendor(router_id: UUID) -> Vendor:
"""Retrieve the vendor of a router.
......@@ -185,3 +198,22 @@ def active_switch_selector() -> Choice:
}
return Choice("Select a switch", zip(switch_subscriptions.keys(), switch_subscriptions.items(), strict=True)) # type: ignore[arg-type]
def partner_choice() -> Choice:
"""Return a Choice object containing a list of available partners."""
partners = {partner["partner_id"]: partner["name"] for partner in get_all_partners()}
return Choice("Select a partner", zip(partners.values(), partners.items(), strict=True)) # type: ignore[arg-type]
def validate_edge_port_number_of_members_based_on_lacp(*, number_of_members: int, enable_lacp: bool) -> None:
"""Validate the number of edge port members based on the LACP setting.
:param number_of_members: The number of members to validate.
:param enable_lacp: Whether LACP is enabled or not.
:raises ValueError: If the number of members is greater than 1 and LACP is disabled.
"""
if number_of_members > 1 and not enable_lacp:
err_msg = "Number of members must be 1 if LACP is disabled"
raise ValueError(err_msg)
......@@ -78,3 +78,10 @@ LazyWorkflowInstance("gso.workflows.tasks.create_partners", "task_create_partner
LazyWorkflowInstance("gso.workflows.tasks.modify_partners", "task_modify_partners")
LazyWorkflowInstance("gso.workflows.tasks.delete_partners", "task_delete_partners")
LazyWorkflowInstance("gso.workflows.tasks.clean_old_tasks", "task_clean_old_tasks")
# Edge port workflows
LazyWorkflowInstance("gso.workflows.edge_port.create_edge_port", "create_edge_port")
LazyWorkflowInstance("gso.workflows.edge_port.modify_edge_port", "modify_edge_port")
LazyWorkflowInstance("gso.workflows.edge_port.terminate_edge_port", "terminate_edge_port")
LazyWorkflowInstance("gso.workflows.edge_port.validate_edge_port", "validate_edge_port")
"""All workflows that can be executed on Edge port."""
"""A creation workflow for adding a new edge port to the network."""
from typing import Annotated, Any, Self
from uuid import uuid4
from annotated_types import Len
from orchestrator import step, workflow
from orchestrator.forms import FormPage
from orchestrator.forms.validators import Choice
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, State, SubscriptionLifecycle, UUIDstr
from orchestrator.workflow import StepList, begin, done
from orchestrator.workflows.steps import resync, set_status, store_process_subscription
from orchestrator.workflows.utils import wrap_create_initial_input_form
from pydantic import AfterValidator, ConfigDict, model_validator
from pydantic_forms.validators import validate_unique_list
from pynetbox.models.dcim import Interfaces
from gso.products.product_blocks.edge_port import EdgePortInterfaceBlockInactive, EdgePortType, EncapsulationType
from gso.products.product_blocks.iptrunk import PhysicalPortCapacity
from gso.products.product_blocks.router import RouterRole
from gso.products.product_types.edge_port import EdgePortInactive, EdgePortProvisioning
from gso.products.product_types.router import Router
from gso.services import lso_client, subscriptions
from gso.services.lso_client import lso_interaction
from gso.services.netbox_client import NetboxClient
from gso.utils.helpers import (
LAGMember,
available_interfaces_choices,
available_service_lags_choices,
partner_choice,
validate_edge_port_number_of_members_based_on_lacp,
)
from gso.utils.types import TTNumber
def initial_input_form_generator(product_name: str) -> FormGenerator:
"""Gather information to create a new Edge Port."""
routers = {}
for router in subscriptions.get_active_subscriptions_by_field_and_value("router_role", RouterRole.PE):
routers[str(router.subscription_id)] = router.description
router_enum = Choice("Select a router", zip(routers.keys(), routers.items(), strict=True)) # type: ignore[arg-type]
class CreateEdgePortForm(FormPage):
model_config = ConfigDict(title=product_name)
tt_number: TTNumber
node: router_enum # type: ignore[valid-type]
partner: partner_choice() # type: ignore[valid-type]
service_type: EdgePortType
enable_lacp: bool
speed: PhysicalPortCapacity
encapsulation: EncapsulationType = EncapsulationType.DOT1Q
number_of_members: int
minimum_links: int
mac_address: str | None = None
ignore_if_down: bool = False
geant_ga_id: str | None = None
@model_validator(mode="after")
def validate_number_of_members(self) -> Self:
validate_edge_port_number_of_members_based_on_lacp(
enable_lacp=self.enable_lacp, number_of_members=self.number_of_members
)
return self
initial_user_input = yield CreateEdgePortForm
class EdgePortLAGMember(LAGMember):
interface_name: available_interfaces_choices( # type: ignore[valid-type]
initial_user_input.node, initial_user_input.speed
)
lag_ae_members = Annotated[
list[EdgePortLAGMember],
AfterValidator(validate_unique_list),
Len(
min_length=initial_user_input.number_of_members,
max_length=initial_user_input.number_of_members,
),
]
class SelectInterfaceForm(FormPage):
model_config = ConfigDict(title="Select Interfaces")
name: available_service_lags_choices(initial_user_input.node) # type: ignore[valid-type]
description: str | None = None
ae_members: lag_ae_members
interface_form_input_data = yield SelectInterfaceForm
return initial_user_input.model_dump() | interface_form_input_data.model_dump()
@step("Create subscription")
def create_subscription(product: UUIDstr, partner: UUIDstr) -> State:
"""Create a new subscription object."""
subscription = EdgePortInactive.from_product_id(product, partner)
return {
"subscription": subscription,
"subscription_id": subscription.subscription_id,
}
@step("Initialize subscription")
def initialize_subscription(
subscription: EdgePortInactive,
node: UUIDstr,
service_type: EdgePortType,
speed: PhysicalPortCapacity,
encapsulation: EncapsulationType,
name: str,
minimum_links: int,
geant_ga_id: str | None,
mac_address: str | None,
partner: str,
enable_lacp: bool, # noqa: FBT001
ignore_if_down: bool, # noqa: FBT001
ae_members: list[dict[str, Any]],
description: str | None = None,
) -> State:
"""Initialise the subscription object in the service database."""
router = Router.from_subscription(node).router
subscription.edge_port.edge_port_node = router
subscription.edge_port.edge_port_type = service_type
subscription.edge_port.edge_port_enable_lacp = enable_lacp
subscription.edge_port.edge_port_member_speed = speed
subscription.edge_port.edge_port_encapsulation = encapsulation
subscription.edge_port.edge_port_name = name
subscription.edge_port.edge_port_minimum_links = minimum_links
subscription.edge_port.edge_port_ignore_if_down = ignore_if_down
subscription.edge_port.edge_port_geant_ga_id = geant_ga_id
subscription.edge_port.edge_port_mac_address = mac_address
subscription.description = f"Edge Port {name} on {router.router_fqdn}, {partner}, {geant_ga_id or ""}"
subscription.edge_port.edge_port_description = description
for member in ae_members:
subscription.edge_port.edge_port_ae_members.append(
EdgePortInterfaceBlockInactive.new(subscription_id=uuid4(), **member),
)
subscription = EdgePortProvisioning.from_other_lifecycle(subscription, SubscriptionLifecycle.PROVISIONING)
return {"subscription": subscription}
@step("Reserve interfaces in NetBox")
def reserve_interfaces_in_netbox(subscription: EdgePortProvisioning) -> State:
"""Create the :term:`LAG` interfaces in NetBox and attach the lag interfaces to the physical interfaces."""
nbclient = NetboxClient()
edge_port = subscription.edge_port
# Create :term:`LAG` interfaces
lag_interface: Interfaces = nbclient.create_interface(
iface_name=edge_port.edge_port_name,
interface_type="lag",
device_name=edge_port.edge_port_node.router_fqdn,
description=str(subscription.subscription_id),
enabled=True,
)
# Attach physical interfaces to :term:`LAG`
# Update interface description to subscription ID
# Reserve interfaces
for interface in edge_port.edge_port_ae_members:
nbclient.attach_interface_to_lag(
device_name=edge_port.edge_port_node.router_fqdn,
lag_name=lag_interface.name,
iface_name=interface.interface_name,
description=str(subscription.subscription_id),
)
nbclient.reserve_interface(
device_name=edge_port.edge_port_node.router_fqdn,
iface_name=interface.interface_name,
)
return {
"subscription": subscription,
}
@step("Allocate interfaces in NetBox")
def allocate_interfaces_in_netbox(subscription: EdgePortProvisioning) -> None:
"""Allocate the interfaces in NetBox."""
for interface in subscription.edge_port.edge_port_ae_members:
fqdn = subscription.edge_port.edge_port_node.router_fqdn
iface_name = interface.interface_name
if not fqdn or not iface_name:
msg = f"FQDN and/or interface name missing in subscription {interface.owner_subscription_id}"
raise ValueError(msg)
NetboxClient().allocate_interface(device_name=fqdn, iface_name=iface_name)
@step("[DRY RUN] Create edge port")
def create_edge_port_dry(
subscription: dict[str, Any], callback_route: str, tt_number: str, process_id: UUIDstr
) -> None:
"""Create a new edge port in the network as a dry run."""
extra_vars = {
"dry_run": True,
"subscription": subscription,
"commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Create Edge Port",
"verb": "create",
}
lso_client.execute_playbook(
playbook_name="edge_port.yaml",
callback_route=callback_route,
inventory=subscription["edge_port"]["edge_port_node"]["router_fqdn"],
extra_vars=extra_vars,
)
@step("[FOR REAL] Create edge port")
def create_edge_port_real(
subscription: dict[str, Any], callback_route: str, tt_number: str, process_id: UUIDstr
) -> None:
"""Create a new edge port in the network for real."""
extra_vars = {
"dry_run": False,
"subscription": subscription,
"commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Create Edge Port",
"verb": "create",
}
lso_client.execute_playbook(
playbook_name="edge_port.yaml",
callback_route=callback_route,
inventory=subscription["edge_port"]["edge_port_node"]["router_fqdn"],
extra_vars=extra_vars,
)
@workflow(
"Create Edge Port",
initial_input_form=wrap_create_initial_input_form(initial_input_form_generator),
target=Target.CREATE,
)
def create_edge_port() -> StepList:
"""Create a new edge port in the network.
* Create and initialise the subscription object in the service database
* Deploy configuration on the new edge port, first as a dry run
* allocate LAG and LAG members in the Netbox.
"""
return (
begin
>> create_subscription
>> store_process_subscription(Target.CREATE)
>> initialize_subscription
>> reserve_interfaces_in_netbox
>> lso_interaction(create_edge_port_dry)
>> lso_interaction(create_edge_port_real)
>> allocate_interfaces_in_netbox
>> set_status(SubscriptionLifecycle.ACTIVE)
>> resync
>> done
)
"""Modify an existing edge port subscription."""
from typing import Annotated, Any, Self
from uuid import uuid4
from annotated_types import Len
from orchestrator import workflow
from orchestrator.forms import FormPage
from orchestrator.targets import Target
from orchestrator.workflow import StepList, begin, conditional, done, step
from orchestrator.workflows.steps import resync, store_process_subscription, unsync
from orchestrator.workflows.utils import wrap_modify_initial_input_form
from pydantic import AfterValidator, ConfigDict, model_validator
from pydantic_forms.types import FormGenerator, UUIDstr
from pydantic_forms.validators import ReadOnlyField, validate_unique_list
from gso.products.product_blocks.edge_port import (
EdgePortInterfaceBlock,
EncapsulationType,
)
from gso.products.product_blocks.iptrunk import PhysicalPortCapacity
from gso.products.product_types.edge_port import EdgePort
from gso.services.lso_client import execute_playbook, lso_interaction
from gso.services.netbox_client import NetboxClient
from gso.services.partners import get_partner_by_id
from gso.utils.helpers import (
LAGMember,
available_interfaces_choices,
available_interfaces_choices_including_current_members,
validate_edge_port_number_of_members_based_on_lacp,
)
from gso.utils.types import TTNumber
def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
"""Gather input from the operator on what to change about the selected edge port subscription."""
subscription = EdgePort.from_subscription(subscription_id)
class ModifyEdgePortForm(FormPage):
model_config = ConfigDict(title="Modify Edge Port")
tt_number: TTNumber
enable_lacp: bool = subscription.edge_port.edge_port_enable_lacp
member_speed: PhysicalPortCapacity = subscription.edge_port.edge_port_member_speed
encapsulation: EncapsulationType = subscription.edge_port.edge_port_encapsulation
number_of_members: int = len(subscription.edge_port.edge_port_ae_members)
minimum_links: int | None = subscription.edge_port.edge_port_minimum_links or None
mac_address: str | None = subscription.edge_port.edge_port_mac_address or None
ignore_if_down: bool = subscription.edge_port.edge_port_ignore_if_down
geant_ga_id: str | None = subscription.edge_port.edge_port_geant_ga_id or None
@model_validator(mode="after")
def validate_number_of_members(self) -> Self:
validate_edge_port_number_of_members_based_on_lacp(
enable_lacp=self.enable_lacp, number_of_members=self.number_of_members
)
return self
user_input = yield ModifyEdgePortForm
class EdgePortLAGMember(LAGMember):
interface_name: ( # type: ignore[valid-type]
available_interfaces_choices_including_current_members(
subscription.edge_port.edge_port_node.owner_subscription_id,
user_input.member_speed,
subscription.edge_port.edge_port_ae_members,
)
if user_input.member_speed == subscription.edge_port.edge_port_member_speed
else (
available_interfaces_choices(
subscription.edge_port.edge_port_node.owner_subscription_id, user_input.member_speed
)
)
)
lag_ae_members = Annotated[
list[EdgePortLAGMember],
AfterValidator(validate_unique_list),
Len(
min_length=user_input.number_of_members,
max_length=user_input.number_of_members,
),
]
existing_lag_ae_members = [
EdgePortLAGMember(
interface_name=iface.interface_name,
interface_description=iface.interface_description,
)
for iface in subscription.edge_port.edge_port_ae_members
]
class ModifyEdgePortInterfaceForm(FormPage):
model_config = ConfigDict(title="Modify Edge Port Interface")
name: ReadOnlyField(subscription.edge_port.edge_port_name, default_type=str) # type: ignore[valid-type]
description: str | None = subscription.edge_port.edge_port_description or None
ae_members: lag_ae_members = (
existing_lag_ae_members if user_input.member_speed == subscription.edge_port.edge_port_member_speed else []
)
interface_form_input = yield ModifyEdgePortInterfaceForm
capacity_has_changed = (
user_input.member_speed != subscription.edge_port.edge_port_member_speed
or user_input.number_of_members != len(subscription.edge_port.edge_port_ae_members)
or any(
old_interface.interface_name
not in [new_interface.interface_name for new_interface in interface_form_input.ae_members]
for old_interface in subscription.edge_port.edge_port_ae_members
)
or len(subscription.edge_port.edge_port_ae_members) != len(interface_form_input.ae_members)
)
return user_input.model_dump() | interface_form_input.model_dump() | {"capacity_has_changed": capacity_has_changed}
@step("Modify edge port subscription.")
def modify_edge_port_subscription(
subscription: EdgePort,
member_speed: PhysicalPortCapacity,
encapsulation: EncapsulationType,
minimum_links: int,
mac_address: str | None,
geant_ga_id: str | None,
enable_lacp: bool, # noqa: FBT001
ae_members: list[dict[str, str]],
ignore_if_down: bool, # noqa: FBT001
description: str | None = None,
) -> dict[str, Any]:
"""Modify the edge port subscription with the given parameters."""
previous_ae_members = [
{
"interface_name": member.interface_name,
"interface_description": member.interface_description,
}
for member in subscription.edge_port.edge_port_ae_members
]
removed_ae_members = [member for member in previous_ae_members if member not in ae_members]
subscription.edge_port.edge_port_enable_lacp = enable_lacp
subscription.edge_port.edge_port_member_speed = member_speed
subscription.edge_port.edge_port_encapsulation = encapsulation
subscription.edge_port.edge_port_minimum_links = minimum_links
subscription.edge_port.edge_port_mac_address = mac_address
subscription.edge_port.edge_port_ignore_if_down = ignore_if_down
subscription.edge_port.edge_port_geant_ga_id = geant_ga_id
subscription.edge_port.edge_port_description = description
subscription.description = (
f"Edge Port {subscription.edge_port.edge_port_name} on"
f" {subscription.edge_port.edge_port_node.router_fqdn},"
f" {get_partner_by_id(subscription.customer_id).name}, {geant_ga_id or ""}"
)
subscription.edge_port.edge_port_ae_members.clear()
for member in ae_members:
subscription.edge_port.edge_port_ae_members.append(
EdgePortInterfaceBlock.new(subscription_id=uuid4(), **member),
)
subscription.save()
return {
"subscription": subscription,
"removed_ae_members": removed_ae_members,
"previous_ae_members": previous_ae_members,
}
@step("Update interfaces in NetBox")
def update_interfaces_in_netbox(
subscription: EdgePort,
removed_ae_members: list[dict],
previous_ae_members: list[dict],
) -> dict[str, Any]:
"""Update the interfaces in NetBox."""
nbclient = NetboxClient()
# Free removed interfaces
for member in removed_ae_members:
nbclient.free_interface(subscription.edge_port.edge_port_node.router_fqdn, member["interface_name"])
# Attach physical interfaces to :term:`LAG`
# Update interface description to subscription ID
# Reserve interfaces
for member in subscription.edge_port.edge_port_ae_members: # type: ignore[assignment]
if any(prev_member["interface_name"] == member.interface_name for prev_member in previous_ae_members):
continue
nbclient.attach_interface_to_lag(
device_name=subscription.edge_port.edge_port_node.router_fqdn,
lag_name=subscription.edge_port.edge_port_name,
iface_name=member.interface_name,
description=str(subscription.subscription_id),
)
nbclient.reserve_interface(subscription.edge_port.edge_port_node.router_fqdn, member.interface_name)
return {"subscription": subscription}
@step("[DRY RUN] Update edge port configuration.")
def update_edge_port_dry(
subscription: dict[str, Any],
process_id: UUIDstr,
tt_number: str,
callback_route: str,
removed_ae_members: list[dict],
) -> dict[str, Any]:
"""Perform a dry run of updating the edge port configuration."""
extra_vars = {
"subscription": subscription,
"dry_run": True,
"verb": "update",
"commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} "
f"- Update Edge Port {subscription["edge_port"]["edge_port_name"]}"
f" on {subscription["edge_port"]["edge_port_node"]["router_fqdn"]}",
"removed_ae_members": removed_ae_members,
}
execute_playbook(
playbook_name="edge_ports.yaml",
callback_route=callback_route,
inventory=subscription["edge_port"]["edge_port_node"]["router_fqdn"],
extra_vars=extra_vars,
)
return {"subscription": subscription}
@step("[FOR REAL] Update edge port configuration.")
def update_edge_port_real(
subscription: dict[str, Any],
process_id: UUIDstr,
tt_number: str,
callback_route: str,
removed_ae_members: list[str],
) -> dict[str, Any]:
"""Update the edge port configuration."""
extra_vars = {
"subscription": subscription,
"dry_run": False,
"verb": "update",
"commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} "
f"- Update Edge Port {subscription["edge_port"]["edge_port_name"]}"
f" on {subscription["edge_port"]["edge_port_node"]["router_fqdn"]}",
"removed_ae_members": removed_ae_members,
}
execute_playbook(
playbook_name="edge_ports.yaml",
callback_route=callback_route,
inventory=subscription["edge_port"]["edge_port_node"]["router_fqdn"],
extra_vars=extra_vars,
)
return {"subscription": subscription}
@step("Allocate/Deallocate interfaces in NetBox")
def allocate_interfaces_in_netbox(subscription: EdgePort, previous_ae_members: list[dict]) -> None:
"""Allocate the new interfaces in NetBox and detach the old ones from the LAG."""
nbclient = NetboxClient()
for member in subscription.edge_port.edge_port_ae_members:
if any(member.interface_name == prev_member["interface_name"] for prev_member in previous_ae_members):
continue
nbclient.allocate_interface(
device_name=subscription.edge_port.edge_port_node.router_fqdn,
iface_name=member.interface_name,
)
# detach the old interfaces from lag
nbclient.detach_interfaces_from_lag(
device_name=subscription.edge_port.edge_port_node.router_fqdn, lag_name=subscription.edge_port.edge_port_name
)
@workflow(
"Modify Edge Port",
initial_input_form=wrap_modify_initial_input_form(initial_input_form_generator),
target=Target.MODIFY,
)
def modify_edge_port() -> StepList:
"""Modify a new edge port in the network.
* Modify the subscription object in the service database
* Modify configuration on the new edge port, first as a dry run
* Change LAG and LAG members in the Netbox.
"""
capacity_has_changed = conditional(lambda state: state["capacity_has_changed"])
return (
begin
>> store_process_subscription(Target.MODIFY)
>> unsync
>> modify_edge_port_subscription
>> capacity_has_changed(update_interfaces_in_netbox)
>> capacity_has_changed(lso_interaction(update_edge_port_dry))
>> capacity_has_changed(lso_interaction(update_edge_port_real))
>> capacity_has_changed(allocate_interfaces_in_netbox)
>> resync
>> done
)
"""Terminate an edge port in the network."""
from typing import Any
from orchestrator import workflow
from orchestrator.forms import FormPage
from orchestrator.targets import Target
from orchestrator.types import SubscriptionLifecycle
from orchestrator.workflow import StepList, begin, done, step
from orchestrator.workflows.steps import resync, set_status, store_process_subscription, unsync
from orchestrator.workflows.utils import wrap_modify_initial_input_form
from pydantic_forms.types import FormGenerator, UUIDstr
from gso.products.product_types.edge_port import EdgePort
from gso.services.lso_client import execute_playbook, lso_interaction
from gso.services.netbox_client import NetboxClient
from gso.utils.types import TTNumber
def initial_input_form_generator() -> FormGenerator:
"""Let the operator decide whether to delete configuration on the router, and clear up :term:`IPAM` resources."""
class TerminateForm(FormPage):
tt_number: TTNumber
user_input = yield TerminateForm
return user_input.model_dump()
@step("[DRY RUN] Remove Edge Port")
def remove_edge_port_dry(
subscription: dict[str, Any], tt_number: str, process_id: UUIDstr, callback_route: str
) -> dict[str, Any]:
"""Remove an edge port from the network."""
extra_vars = {
"subscription": subscription,
"dry_run": True,
"verb": "terminate",
"commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Delete Edge Port",
}
execute_playbook(
playbook_name="edge_port.yaml",
callback_route=callback_route,
inventory=subscription["edge_port"]["edge_port_node"]["router_fqdn"],
extra_vars=extra_vars,
)
return {"subscription": subscription}
@step("[FOR REAL] Remove Edge Port")
def remove_edge_port_real(
subscription: dict[str, Any], tt_number: str, process_id: UUIDstr, callback_route: str
) -> None:
"""Remove an edge port from the network."""
extra_vars = {
"subscription": subscription,
"dry_run": False,
"verb": "terminate",
"commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Delete Edge Port",
}
execute_playbook(
playbook_name="edge_port.yaml",
callback_route=callback_route,
inventory=subscription["edge_port"]["edge_port_node"]["router_fqdn"],
extra_vars=extra_vars,
)
@step("Netbox Clean Up")
def netbox_clean_up(subscription: EdgePort) -> None:
"""Update Netbox to remove the edge port LAG interface and all the LAG members."""
nbclient = NetboxClient()
for member in subscription.edge_port.edge_port_ae_members:
nbclient.free_interface(subscription.edge_port.edge_port_node.router_fqdn, member.interface_name)
nbclient.delete_interface(subscription.edge_port.edge_port_node.router_fqdn, subscription.edge_port.edge_port_name)
@workflow(
"Terminate Edge Port",
initial_input_form=wrap_modify_initial_input_form(initial_input_form_generator),
target=Target.TERMINATE,
)
def terminate_edge_port() -> StepList:
"""Terminate a new edge port in the network."""
return (
begin
>> store_process_subscription(Target.TERMINATE)
>> unsync
>> lso_interaction(remove_edge_port_dry)
>> lso_interaction(remove_edge_port_real)
>> netbox_clean_up
>> set_status(SubscriptionLifecycle.TERMINATED)
>> resync
>> done
)
"""Workflow for validating an existing Edge port subscription."""
from typing import Any
from orchestrator.targets import Target
from orchestrator.types import State, UUIDstr
from orchestrator.utils.errors import ProcessFailureError
from orchestrator.workflow import StepList, begin, done, step, workflow
from orchestrator.workflows.steps import resync, store_process_subscription
from orchestrator.workflows.utils import wrap_modify_initial_input_form
from gso.products.product_types.edge_port import EdgePort
from gso.services.lso_client import execute_playbook
from gso.services.netbox_client import NetboxClient
@step("Prepare required keys in state")
def prepare_state(subscription_id: UUIDstr) -> State:
"""Add required keys to the state for the workflow to run successfully."""
edge_port = EdgePort.from_subscription(subscription_id)
return {"subscription": edge_port}
@step("Verify NetBox entries")
def verify_netbox_entries(subscription: EdgePort) -> None:
"""Validate required entries for an edge port in NetBox."""
nbclient = NetboxClient()
netbox_errors = []
# Raises en exception when not found.
lag = nbclient.get_interface_by_name_and_device(
subscription.edge_port.edge_port_name, subscription.edge_port.edge_port_node.router_fqdn
)
if lag.description != str(subscription.subscription_id):
netbox_errors.append(
f"Incorrect description for '{lag}', expected "
f"'{subscription.subscription_id}' but got '{lag.description}'"
)
if not lag.enabled:
netbox_errors.append(f"NetBox interface '{lag}' is not enabled.")
for member in subscription.edge_port.edge_port_ae_members:
interface = nbclient.get_interface_by_name_and_device(
member.interface_name, subscription.edge_port.edge_port_node.router_fqdn
)
if interface.description != str(subscription.subscription_id):
netbox_errors.append(
f"Incorrect description for '{member.interface_name}', expected "
f"'{subscription.subscription_id}' but got '{interface.description}'"
)
if not interface.enabled:
netbox_errors.append(f"NetBox interface '{member.interface_name}' is not enabled.")
if netbox_errors:
raise ProcessFailureError(message="NetBox misconfiguration(s) found", details=str(netbox_errors))
@step("Check base config for drift")
def verify_base_config(subscription: dict[str, Any], callback_route: str) -> None:
"""Workflow step for running a playbook that checks whether base config has drifted."""
execute_playbook(
playbook_name="edge_port.yaml",
callback_route=callback_route,
inventory=subscription["edge_port"]["edge_port_node"]["router_fqdn"],
extra_vars={
"dry_run": True,
"subscription": subscription,
"verb": "create",
"is_verification_workflow": "true",
},
)
@workflow(
"Validate Edge Port Configuration", target=Target.SYSTEM, initial_input_form=wrap_modify_initial_input_form(None)
)
def validate_edge_port() -> StepList:
"""Validate an existing, active Edge port subscription.
* Check correct configuration of interfaces in NetBox.
* Verify create Edge port configuration.
"""
return (
begin
>> store_process_subscription(Target.SYSTEM)
>> prepare_state
>> verify_netbox_entries
>> verify_base_config
>> resync
>> done
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment