Skip to content
Snippets Groups Projects
Verified Commit a0ae14f5 authored by Karel van Klink's avatar Karel van Klink :smiley_cat:
Browse files

Add GÉANT IP creation workflow

parent b74b3279
No related branches found
No related tags found
1 merge request!286Add Edge Port, GÉANT IP and IAS products
"""Add GÉANT IP creation workflow.
Revision ID: 734dc86f5dd3
Revises: aa6dcb493d12
Create Date: 2024-09-19 16:11:25.056745
"""
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = '734dc86f5dd3'
down_revision = 'aa6dcb493d12'
branch_labels = None
depends_on = None
from orchestrator.migrations.helpers import create_workflow, delete_workflow
new_workflows = [
{
"name": "create_geant_ip",
"target": "CREATE",
"description": "Create G\u00c9ANT IP",
"product_type": "GeantIP"
}
]
def upgrade() -> None:
conn = op.get_bind()
for workflow in new_workflows:
create_workflow(conn, workflow)
def downgrade() -> None:
conn = op.get_bind()
for workflow in new_workflows:
delete_workflow(conn, workflow["name"])
......@@ -14,14 +14,14 @@ class NRENAccessPortInactive(
"""An access port for an R&E :term:`NREN` service that is inactive."""
nren_ap_type: APType | None = None
geant_ip_ep_list: list[EdgePortBlockInactive] = Field(default_factory=list)
geant_ip_ep: EdgePortBlockInactive
class NRENAccessPortProvisioning(NRENAccessPortInactive, lifecycle=[SubscriptionLifecycle.PROVISIONING]):
"""An access port for an R&E :term:`NREN` service that is being provisioned."""
nren_ap_type: APType
geant_ip_ep_list: list[EdgePortBlockProvisioning] # type: ignore[assignment]
geant_ip_ep: EdgePortBlockProvisioning # type: ignore[assignment]
class NRENAccessPort(NRENAccessPortProvisioning, lifecycle=[SubscriptionLifecycle.ACTIVE]):
......@@ -30,7 +30,7 @@ class NRENAccessPort(NRENAccessPortProvisioning, lifecycle=[SubscriptionLifecycl
#: The type of Access Port
nren_ap_type: APType
#: The list of Edge Ports where this service terminates.
geant_ip_ep_list: list[EdgePortBlock] # type: ignore[assignment]
geant_ip_ep: EdgePortBlock # type: ignore[assignment]
class GeantIPBlockInactive(
......
......@@ -246,6 +246,20 @@ def get_active_site_subscriptions(includes: list[str] | None = None) -> list[Sub
)
def get_active_edge_port_subscriptions(includes: list[str] | None = None) -> list[SubscriptionType]:
"""Retrieve active Edge Port subscriptions.
:param includes: The fields to be included in the returned Subscription objects.
:type includes: list[str]
:return: A list of Subscription objects for Edge Ports.
:rtype: list[Subscription]
"""
return get_subscriptions(
product_types=[ProductType.EDGE_PORT], lifecycles=[SubscriptionLifecycle.ACTIVE], includes=includes
)
def get_site_by_name(site_name: str) -> Site:
"""Get a site by its name.
......
......@@ -11,7 +11,7 @@ from gso.products.product_blocks.router import RouterRole
from gso.products.product_types.router import Router
from gso.services import subscriptions
from gso.services.netbox_client import NetboxClient
from gso.services.partners import get_all_partners
from gso.services.partners import get_all_partners, get_partner_by_name
from gso.utils.shared_enums import Vendor
from gso.utils.types.interfaces import PhysicalPortCapacity
from gso.utils.types.ip_address import IPv4AddressType
......@@ -200,6 +200,27 @@ def active_switch_selector() -> Choice:
return Choice("Select a switch", zip(switch_subscriptions.keys(), switch_subscriptions.items(), strict=True)) # type: ignore[arg-type]
def active_edge_port_selector(*, geant_only: bool | None = None) -> Choice:
"""Generate a dropdown selector for choosing an active Edge Port in an input form."""
edge_port_subscriptions = subscriptions.get_active_edge_port_subscriptions(
includes=["subscription_id", "description", "customer_id"]
)
if geant_only is not None:
# ``geant_only`` is set, so we will filter accordingly.
geant_partner_id = get_partner_by_name("GEANT")["partner_id"]
edge_port_subscriptions = filter(
lambda subscription: geant_only ^ bool(subscription["customer_id"] != geant_partner_id),
edge_port_subscriptions,
)
edge_port_subscriptions = {port["subscription_id"]: port["description"] for port in edge_port_subscriptions}
return Choice(
"Select an Edge Port", zip(edge_port_subscriptions.keys(), edge_port_subscriptions.items(), strict=True)
)
def partner_choice() -> Choice:
"""Return a Choice object containing a list of available partners."""
partners = {partner["partner_id"]: partner["name"] for partner in get_all_partners()}
......
......@@ -18,13 +18,27 @@ def validate_ipv4_or_ipv6(value: str) -> str:
return value
def validate_ipv4_or_ipv6_network(value: str) -> str:
"""Validate that a value is a valid IPv4 or IPv6 network."""
try:
ipaddress.ip_network(value)
except ValueError as e:
msg = "Enter a valid IPv4 or IPv6 network."
raise ValueError(msg) from e
else:
return value
def _str(value: Any) -> str:
return str(value)
IPv4AddressType = Annotated[ipaddress.IPv4Address, PlainSerializer(_str, return_type=str, when_used="always")]
IPv4NetworkType = Annotated[ipaddress.IPv4Network, PlainSerializer(_str, return_type=str, when_used="always")]
IPv6AddressType = Annotated[ipaddress.IPv6Address, PlainSerializer(_str, return_type=str, when_used="always")]
IPv6NetworkType = Annotated[ipaddress.IPv6Network, PlainSerializer(_str, return_type=str, when_used="always")]
IPAddress = Annotated[str, AfterValidator(validate_ipv4_or_ipv6)]
IPNetwork = Annotated[str, AfterValidator(validate_ipv4_or_ipv6_network)]
PortNumber = Annotated[
int,
Field(
......
......@@ -79,9 +79,11 @@ LazyWorkflowInstance("gso.workflows.tasks.modify_partners", "task_modify_partner
LazyWorkflowInstance("gso.workflows.tasks.delete_partners", "task_delete_partners")
LazyWorkflowInstance("gso.workflows.tasks.clean_old_tasks", "task_clean_old_tasks")
# Edge port workflows
LazyWorkflowInstance("gso.workflows.edge_port.create_edge_port", "create_edge_port")
LazyWorkflowInstance("gso.workflows.edge_port.modify_edge_port", "modify_edge_port")
LazyWorkflowInstance("gso.workflows.edge_port.terminate_edge_port", "terminate_edge_port")
LazyWorkflowInstance("gso.workflows.edge_port.validate_edge_port", "validate_edge_port")
# GÉANT IP workflows
LazyWorkflowInstance("gso.workflows.geant_ip.create_geant_ip", "create_geant_ip")
"""Create a new GÉANT IP subscription."""
from typing import Annotated, Any
from uuid import uuid4
from orchestrator.forms import FormPage
from orchestrator.forms.validators import Label
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, State, SubscriptionLifecycle, UUIDstr
from orchestrator.utils.errors import ProcessFailureError
from orchestrator.workflow import StepList, begin, done, step, workflow
from orchestrator.workflows.steps import resync, set_status, store_process_subscription
from orchestrator.workflows.utils import wrap_create_initial_input_form
from products import EdgePort
from pydantic import AfterValidator, BaseModel, ConfigDict, Field
from pydantic_forms.validators import Divider, validate_unique_list
from gso.products.product_blocks.bgp_session import BGPSession, IPFamily
from gso.products.product_blocks.geant_ip import NRENAccessPortInactive
from gso.products.product_blocks.service_binding_port import VLAN_ID, ServiceBindingPort
from gso.products.product_types.geant_ip import GeantIPInactive
from gso.services.lso_client import execute_playbook, lso_interaction
from gso.services.partners import get_partner_by_name
from gso.utils.helpers import (
active_edge_port_selector,
partner_choice,
)
from gso.utils.shared_enums import APType, SBPType
from gso.utils.types.ip_address import IPAddress, IPv4AddressType, IPv6AddressType
from gso.utils.types.tt_number import TTNumber
def initial_input_form_generator(product_name: str) -> FormGenerator:
"""Gather input from the operator to build a new subscription object."""
class CreateGeantIPForm(FormPage):
model_config = ConfigDict(title=f"{product_name} - Select partner")
tt_number: TTNumber
partner = partner_choice()
initial_user_input = yield CreateGeantIPForm
class EdgePortSelection(BaseModel):
edge_port: active_edge_port_selector()
ap_type: APType
class EdgePortSelectionForm(FormPage):
model_config = ConfigDict(title=f"{product_name} - Select Edge Ports")
info_label = Label("Please select the Edge Ports where this GÉANT IP service will terminate")
edge_ports: list[EdgePortSelection] = Field(default_factory=list)
selected_edge_ports = yield EdgePortSelectionForm
ep_list = selected_edge_ports.edge_port_pair_list
total_ep_count = len(ep_list)
current_ep_index = 0
class BGPPeer(BaseModel):
peer_address: IPAddress
bfd_enabled: bool
bfd_interval: int | None = None
bfd_multiplier: int | None = None
families: Annotated[list[IPFamily], AfterValidator(validate_unique_list)]
has_custom_policies: bool
authentication_key: str
multipath_enabled: bool
send_default_route: bool
is_multi_hop: bool
class BindingPortsInputForm(FormPage):
model_config = ConfigDict(
title=f"{product_name} - Configure Service Binding Ports ({current_ep_index + 1}/{total_ep_count})"
)
info_label = Label("Please configure the Service Binding Ports for each Edge Port.")
current_ep_label = Label(f'Currently configuring Edge Port: "{ep_list[current_ep_index]["description"]}"')
geant_sid: str
is_tagged: bool
sbp_type: SBPType
vlan_id: VLAN_ID | None
ipv4_address: IPv4AddressType | None = None
ipv6_address: IPv6AddressType | None = None
custom_firewall_filters: bool
divider: Divider
bgp_peers: list[BGPPeer] = Field(default_factory=list)
binding_port_inputs = []
while current_ep_index < total_ep_count:
binding_port_input_form = yield BindingPortsInputForm
binding_port_inputs.append(
binding_port_input_form.model_dump(exclude=set("info_label" "current_ep_label" "divider"))
)
current_ep_index += 1
return (
initial_user_input.model_dump()
| selected_edge_ports.model_dump(exclude=set("info_label"))
| binding_port_inputs
)
@step("Create subscription")
def create_subscription(product: UUIDstr, partner: str) -> State:
"""Create a new subscription object in the database."""
subscription = GeantIPInactive.from_product_id(product, get_partner_by_name(partner)["partner_id"])
return {"subscription": subscription}
@step("Initialize subscription")
def initialize_subscription(
subscription: GeantIPInactive, edge_ports: list[dict], binding_port_inputs: list[dict]
) -> State:
"""Take all user inputs and use them to populate the subscription model."""
edge_port_fqdn_list = []
for edge_port_input, sbp_input in zip(edge_ports, binding_port_inputs, strict=False):
edge_port_subscription = EdgePort.from_subscription(edge_port_input["edge_port"])
subscription.geant_ip.geant_ip_ap_list.append(
NRENAccessPortInactive.new(
subscription_id=uuid4(),
nren_ap_type=edge_port_input["ap_type"],
geant_ip_ep=edge_port_subscription.subscription_id,
)
)
sbp_bgp_session_list = [
BGPSession.new(subscription_id=uuid4(), **session) for session in sbp_input["bgp_peers"]
]
edge_port_subscription.edge_port.edge_port_sbp_list.append(
ServiceBindingPort.new(subscription_id=uuid4(), **sbp_input, sbp_bgp_session_list=sbp_bgp_session_list)
)
edge_port_fqdn_list.append(edge_port_subscription.edge_port.edge_port_node.router_fqdn)
subscription.description = "GEANT IP service"
return {"subscription": subscription, "edge_port_fqdn_list": edge_port_fqdn_list}
@step("[DRY RUN] Deploy service binding port")
def provision_sbp_dry(
subscription: dict[str, Any],
callback_route: str,
process_id: UUIDstr,
tt_number: str,
edge_port_fqdn_list: list[str],
) -> None:
"""Perform a dry run of deploying Service Binding Ports."""
extra_vars = {
"subscription": subscription,
"dry_run": True,
"verb": "deploy",
"commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - "
f"Deploy config for {subscription["description"]}",
}
execute_playbook(
playbook_name="manage_sbp.yaml",
callback_route=callback_route,
inventory="\n".join(edge_port_fqdn_list),
extra_vars=extra_vars,
)
@step("[FOR REAL] Deploy service binding port")
def provision_sbp_real(
subscription: dict[str, Any],
callback_route: str,
process_id: UUIDstr,
tt_number: str,
edge_port_fqdn_list: list[str],
) -> None:
"""Deploy Service Binding Ports."""
extra_vars = {
"subscription": subscription,
"dry_run": False,
"verb": "deploy",
"commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - "
f"Deploy config for {subscription["description"]}",
}
execute_playbook(
playbook_name="manage_sbp.yaml",
callback_route=callback_route,
inventory="\n".join(edge_port_fqdn_list),
extra_vars=extra_vars,
)
@step("Check service binding port functionality")
def check_sbp_functionality(subscription: dict[str, Any], callback_route: str, edge_port_fqdn_list: list[str]) -> None:
"""Check functionality of deployed Service Binding Ports."""
extra_vars = {"subscription": subscription, "verb": "check"}
execute_playbook(
playbook_name="manage_sbp.yaml",
callback_route=callback_route,
inventory="\n".join(edge_port_fqdn_list),
extra_vars=extra_vars,
)
@step("[DRY RUN] Deploy BGP peers")
def deploy_bgp_peers_dry(
subscription: dict[str, Any],
callback_route: str,
edge_port_fqdn_list: list[str],
tt_number: str,
process_id: UUIDstr,
) -> None:
"""Perform a dry run of deploying :term:`BGP` peers."""
extra_vars = {
"subscription": subscription,
"verb": "deploy",
"dry_run": True,
"commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - "
f"Deploying BGP peers for {subscription["description"]}",
}
execute_playbook(
playbook_name="manage_bgp_peers.yaml",
callback_route=callback_route,
inventory="\n".join(edge_port_fqdn_list),
extra_vars=extra_vars,
)
@step("[FOR REAL] Deploy BGP peers")
def deploy_bgp_peers_real(
subscription: dict[str, Any],
callback_route: str,
edge_port_fqdn_list: list[str],
tt_number: str,
process_id: UUIDstr,
) -> None:
"""Deploy :term:`BGP` peers."""
extra_vars = {
"subscription": subscription,
"verb": "deploy",
"dry_run": False,
"commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - "
f"Deploying BGP peers for {subscription["description"]}",
}
execute_playbook(
playbook_name="manage_bgp_peers.yaml",
callback_route=callback_route,
inventory="\n".join(edge_port_fqdn_list),
extra_vars=extra_vars,
)
@step("Check BGP peers")
def check_bgp_peers(subscription: dict[str, Any], callback_route: str, edge_port_fqdn_list: list[str]) -> None:
"""Check correct deployment of :term:`BGP` peers."""
extra_vars = {"subscription": subscription, "verb": "check"}
execute_playbook(
playbook_name="manage_bgp_peers.yaml",
callback_route=callback_route,
inventory="\n".join(edge_port_fqdn_list),
extra_vars=extra_vars,
)
@step("Update Infoblox")
def update_dns_records(subscription: GeantIPInactive) -> None:
"""Update :term:`DNS` records in Infoblox."""
raise ProcessFailureError(subscription.description)
@workflow(
"Create GÉANT IP",
initial_input_form=wrap_create_initial_input_form(initial_input_form_generator),
target=Target.CREATE,
)
def create_geant_ip() -> StepList:
"""Create a new GÉANT IP subscription.
* Create subscription object in the service database
* Deploy service binding ports
* Deploy :term:`BGP` peers
* Update :term:`DNS` records
* Set the subscription in a provisioning state in the database
"""
return (
begin
>> create_subscription
>> store_process_subscription(Target.CREATE)
>> initialize_subscription
>> lso_interaction(provision_sbp_dry)
>> lso_interaction(provision_sbp_real)
>> lso_interaction(check_sbp_functionality)
>> lso_interaction(deploy_bgp_peers_dry)
>> lso_interaction(deploy_bgp_peers_real)
>> lso_interaction(check_bgp_peers)
>> update_dns_records
>> set_status(SubscriptionLifecycle.PROVISIONING)
>> resync
>> done
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment