diff --git "a/gso/migrations/versions/2024-09-19_734dc86f5dd3_add_g\303\251ant_ip_creation_workflow.py" "b/gso/migrations/versions/2024-09-19_734dc86f5dd3_add_g\303\251ant_ip_creation_workflow.py" new file mode 100644 index 0000000000000000000000000000000000000000..b8e5da1f3a791cfd09775b4250df7e368edeb5a8 --- /dev/null +++ "b/gso/migrations/versions/2024-09-19_734dc86f5dd3_add_g\303\251ant_ip_creation_workflow.py" @@ -0,0 +1,39 @@ +"""Add GÉANT IP creation workflow. + +Revision ID: 734dc86f5dd3 +Revises: aa6dcb493d12 +Create Date: 2024-09-19 16:11:25.056745 + +""" +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = '734dc86f5dd3' +down_revision = 'aa6dcb493d12' +branch_labels = None +depends_on = None + + +from orchestrator.migrations.helpers import create_workflow, delete_workflow + +new_workflows = [ + { + "name": "create_geant_ip", + "target": "CREATE", + "description": "Create G\u00c9ANT IP", + "product_type": "GeantIP" + } +] + + +def upgrade() -> None: + conn = op.get_bind() + for workflow in new_workflows: + create_workflow(conn, workflow) + + +def downgrade() -> None: + conn = op.get_bind() + for workflow in new_workflows: + delete_workflow(conn, workflow["name"]) diff --git a/gso/products/product_blocks/geant_ip.py b/gso/products/product_blocks/geant_ip.py index d57d2dd0c94b642432a2a491d077fc566935c03f..8104c23d1515da7fa633d7919c4b441e86977ce7 100644 --- a/gso/products/product_blocks/geant_ip.py +++ b/gso/products/product_blocks/geant_ip.py @@ -14,14 +14,14 @@ class NRENAccessPortInactive( """An access port for an R&E :term:`NREN` service that is inactive.""" nren_ap_type: APType | None = None - geant_ip_ep_list: list[EdgePortBlockInactive] = Field(default_factory=list) + geant_ip_ep: EdgePortBlockInactive class NRENAccessPortProvisioning(NRENAccessPortInactive, lifecycle=[SubscriptionLifecycle.PROVISIONING]): """An access port for an R&E :term:`NREN` service that is being provisioned.""" nren_ap_type: APType - geant_ip_ep_list: list[EdgePortBlockProvisioning] # type: ignore[assignment] + geant_ip_ep: EdgePortBlockProvisioning # type: ignore[assignment] class NRENAccessPort(NRENAccessPortProvisioning, lifecycle=[SubscriptionLifecycle.ACTIVE]): @@ -30,7 +30,7 @@ class NRENAccessPort(NRENAccessPortProvisioning, lifecycle=[SubscriptionLifecycl #: The type of Access Port nren_ap_type: APType #: The list of Edge Ports where this service terminates. - geant_ip_ep_list: list[EdgePortBlock] # type: ignore[assignment] + geant_ip_ep: EdgePortBlock # type: ignore[assignment] class GeantIPBlockInactive( diff --git a/gso/services/subscriptions.py b/gso/services/subscriptions.py index 33db8fd9c56a9a58391ead9d2e3d3177936002e0..1458f000439fa8bcf936a08b864eb117296caf37 100644 --- a/gso/services/subscriptions.py +++ b/gso/services/subscriptions.py @@ -246,6 +246,20 @@ def get_active_site_subscriptions(includes: list[str] | None = None) -> list[Sub ) +def get_active_edge_port_subscriptions(includes: list[str] | None = None) -> list[SubscriptionType]: + """Retrieve active Edge Port subscriptions. + + :param includes: The fields to be included in the returned Subscription objects. + :type includes: list[str] + + :return: A list of Subscription objects for Edge Ports. + :rtype: list[Subscription] + """ + return get_subscriptions( + product_types=[ProductType.EDGE_PORT], lifecycles=[SubscriptionLifecycle.ACTIVE], includes=includes + ) + + def get_site_by_name(site_name: str) -> Site: """Get a site by its name. diff --git a/gso/utils/helpers.py b/gso/utils/helpers.py index 47fbec269627f4ca9184a34ed496a77f8104d07f..18d7674ea29cc21a5c0c6677b71c0ed4a6c07b54 100644 --- a/gso/utils/helpers.py +++ b/gso/utils/helpers.py @@ -11,7 +11,7 @@ from gso.products.product_blocks.router import RouterRole from gso.products.product_types.router import Router from gso.services import subscriptions from gso.services.netbox_client import NetboxClient -from gso.services.partners import get_all_partners +from gso.services.partners import get_all_partners, get_partner_by_name from gso.utils.shared_enums import Vendor from gso.utils.types.interfaces import PhysicalPortCapacity from gso.utils.types.ip_address import IPv4AddressType @@ -200,6 +200,27 @@ def active_switch_selector() -> Choice: return Choice("Select a switch", zip(switch_subscriptions.keys(), switch_subscriptions.items(), strict=True)) # type: ignore[arg-type] +def active_edge_port_selector(*, geant_only: bool | None = None) -> Choice: + """Generate a dropdown selector for choosing an active Edge Port in an input form.""" + edge_port_subscriptions = subscriptions.get_active_edge_port_subscriptions( + includes=["subscription_id", "description", "customer_id"] + ) + + if geant_only is not None: + # ``geant_only`` is set, so we will filter accordingly. + geant_partner_id = get_partner_by_name("GEANT")["partner_id"] + edge_port_subscriptions = filter( + lambda subscription: geant_only ^ bool(subscription["customer_id"] != geant_partner_id), + edge_port_subscriptions, + ) + + edge_port_subscriptions = {port["subscription_id"]: port["description"] for port in edge_port_subscriptions} + + return Choice( + "Select an Edge Port", zip(edge_port_subscriptions.keys(), edge_port_subscriptions.items(), strict=True) + ) + + def partner_choice() -> Choice: """Return a Choice object containing a list of available partners.""" partners = {partner["partner_id"]: partner["name"] for partner in get_all_partners()} diff --git a/gso/utils/types/ip_address.py b/gso/utils/types/ip_address.py index 94cb8beaf498900e4785d3e222a73685f5d35bf6..820377b2435067534e0b24a676e7aff1e3786395 100644 --- a/gso/utils/types/ip_address.py +++ b/gso/utils/types/ip_address.py @@ -18,13 +18,27 @@ def validate_ipv4_or_ipv6(value: str) -> str: return value +def validate_ipv4_or_ipv6_network(value: str) -> str: + """Validate that a value is a valid IPv4 or IPv6 network.""" + try: + ipaddress.ip_network(value) + except ValueError as e: + msg = "Enter a valid IPv4 or IPv6 network." + raise ValueError(msg) from e + else: + return value + + def _str(value: Any) -> str: return str(value) IPv4AddressType = Annotated[ipaddress.IPv4Address, PlainSerializer(_str, return_type=str, when_used="always")] +IPv4NetworkType = Annotated[ipaddress.IPv4Network, PlainSerializer(_str, return_type=str, when_used="always")] IPv6AddressType = Annotated[ipaddress.IPv6Address, PlainSerializer(_str, return_type=str, when_used="always")] +IPv6NetworkType = Annotated[ipaddress.IPv6Network, PlainSerializer(_str, return_type=str, when_used="always")] IPAddress = Annotated[str, AfterValidator(validate_ipv4_or_ipv6)] +IPNetwork = Annotated[str, AfterValidator(validate_ipv4_or_ipv6_network)] PortNumber = Annotated[ int, Field( diff --git a/gso/workflows/__init__.py b/gso/workflows/__init__.py index ecddb6ff3208697122268813e8c2f92b9a97d54e..9d7ff08340e7a12014bd22ab40f0bc0336f39728 100644 --- a/gso/workflows/__init__.py +++ b/gso/workflows/__init__.py @@ -79,9 +79,11 @@ LazyWorkflowInstance("gso.workflows.tasks.modify_partners", "task_modify_partner LazyWorkflowInstance("gso.workflows.tasks.delete_partners", "task_delete_partners") LazyWorkflowInstance("gso.workflows.tasks.clean_old_tasks", "task_clean_old_tasks") - # Edge port workflows LazyWorkflowInstance("gso.workflows.edge_port.create_edge_port", "create_edge_port") LazyWorkflowInstance("gso.workflows.edge_port.modify_edge_port", "modify_edge_port") LazyWorkflowInstance("gso.workflows.edge_port.terminate_edge_port", "terminate_edge_port") LazyWorkflowInstance("gso.workflows.edge_port.validate_edge_port", "validate_edge_port") + +# GÉANT IP workflows +LazyWorkflowInstance("gso.workflows.geant_ip.create_geant_ip", "create_geant_ip") diff --git a/gso/workflows/geant_ip/create_geant_ip.py b/gso/workflows/geant_ip/create_geant_ip.py index 651c602e4e97515f33a98d99322c017c98aeea25..d3b660a9080f8174a7ae7def57bd9c0a53a65e2e 100644 --- a/gso/workflows/geant_ip/create_geant_ip.py +++ b/gso/workflows/geant_ip/create_geant_ip.py @@ -1 +1,300 @@ """Create a new GÉANT IP subscription.""" + +from typing import Annotated, Any +from uuid import uuid4 + +from orchestrator.forms import FormPage +from orchestrator.forms.validators import Label +from orchestrator.targets import Target +from orchestrator.types import FormGenerator, State, SubscriptionLifecycle, UUIDstr +from orchestrator.utils.errors import ProcessFailureError +from orchestrator.workflow import StepList, begin, done, step, workflow +from orchestrator.workflows.steps import resync, set_status, store_process_subscription +from orchestrator.workflows.utils import wrap_create_initial_input_form +from products import EdgePort +from pydantic import AfterValidator, BaseModel, ConfigDict, Field +from pydantic_forms.validators import Divider, validate_unique_list + +from gso.products.product_blocks.bgp_session import BGPSession, IPFamily +from gso.products.product_blocks.geant_ip import NRENAccessPortInactive +from gso.products.product_blocks.service_binding_port import VLAN_ID, ServiceBindingPort +from gso.products.product_types.geant_ip import GeantIPInactive +from gso.services.lso_client import execute_playbook, lso_interaction +from gso.services.partners import get_partner_by_name +from gso.utils.helpers import ( + active_edge_port_selector, + partner_choice, +) +from gso.utils.shared_enums import APType, SBPType +from gso.utils.types.ip_address import IPAddress, IPv4AddressType, IPv6AddressType +from gso.utils.types.tt_number import TTNumber + + +def initial_input_form_generator(product_name: str) -> FormGenerator: + """Gather input from the operator to build a new subscription object.""" + + class CreateGeantIPForm(FormPage): + model_config = ConfigDict(title=f"{product_name} - Select partner") + + tt_number: TTNumber + partner = partner_choice() + + initial_user_input = yield CreateGeantIPForm + + class EdgePortSelection(BaseModel): + edge_port: active_edge_port_selector() + ap_type: APType + + class EdgePortSelectionForm(FormPage): + model_config = ConfigDict(title=f"{product_name} - Select Edge Ports") + info_label = Label("Please select the Edge Ports where this GÉANT IP service will terminate") + + edge_ports: list[EdgePortSelection] = Field(default_factory=list) + + selected_edge_ports = yield EdgePortSelectionForm + ep_list = selected_edge_ports.edge_port_pair_list + total_ep_count = len(ep_list) + current_ep_index = 0 + + class BGPPeer(BaseModel): + peer_address: IPAddress + bfd_enabled: bool + bfd_interval: int | None = None + bfd_multiplier: int | None = None + families: Annotated[list[IPFamily], AfterValidator(validate_unique_list)] + has_custom_policies: bool + authentication_key: str + multipath_enabled: bool + send_default_route: bool + is_multi_hop: bool + + class BindingPortsInputForm(FormPage): + model_config = ConfigDict( + title=f"{product_name} - Configure Service Binding Ports ({current_ep_index + 1}/{total_ep_count})" + ) + info_label = Label("Please configure the Service Binding Ports for each Edge Port.") + current_ep_label = Label(f'Currently configuring Edge Port: "{ep_list[current_ep_index]["description"]}"') + + geant_sid: str + is_tagged: bool + sbp_type: SBPType + vlan_id: VLAN_ID | None + ipv4_address: IPv4AddressType | None = None + ipv6_address: IPv6AddressType | None = None + custom_firewall_filters: bool + divider: Divider + bgp_peers: list[BGPPeer] = Field(default_factory=list) + + binding_port_inputs = [] + while current_ep_index < total_ep_count: + binding_port_input_form = yield BindingPortsInputForm + binding_port_inputs.append( + binding_port_input_form.model_dump(exclude=set("info_label" "current_ep_label" "divider")) + ) + current_ep_index += 1 + + return ( + initial_user_input.model_dump() + | selected_edge_ports.model_dump(exclude=set("info_label")) + | binding_port_inputs + ) + + +@step("Create subscription") +def create_subscription(product: UUIDstr, partner: str) -> State: + """Create a new subscription object in the database.""" + subscription = GeantIPInactive.from_product_id(product, get_partner_by_name(partner)["partner_id"]) + + return {"subscription": subscription} + + +@step("Initialize subscription") +def initialize_subscription( + subscription: GeantIPInactive, edge_ports: list[dict], binding_port_inputs: list[dict] +) -> State: + """Take all user inputs and use them to populate the subscription model.""" + edge_port_fqdn_list = [] + for edge_port_input, sbp_input in zip(edge_ports, binding_port_inputs, strict=False): + edge_port_subscription = EdgePort.from_subscription(edge_port_input["edge_port"]) + subscription.geant_ip.geant_ip_ap_list.append( + NRENAccessPortInactive.new( + subscription_id=uuid4(), + nren_ap_type=edge_port_input["ap_type"], + geant_ip_ep=edge_port_subscription.subscription_id, + ) + ) + sbp_bgp_session_list = [ + BGPSession.new(subscription_id=uuid4(), **session) for session in sbp_input["bgp_peers"] + ] + edge_port_subscription.edge_port.edge_port_sbp_list.append( + ServiceBindingPort.new(subscription_id=uuid4(), **sbp_input, sbp_bgp_session_list=sbp_bgp_session_list) + ) + edge_port_fqdn_list.append(edge_port_subscription.edge_port.edge_port_node.router_fqdn) + + subscription.description = "GEANT IP service" + + return {"subscription": subscription, "edge_port_fqdn_list": edge_port_fqdn_list} + + +@step("[DRY RUN] Deploy service binding port") +def provision_sbp_dry( + subscription: dict[str, Any], + callback_route: str, + process_id: UUIDstr, + tt_number: str, + edge_port_fqdn_list: list[str], +) -> None: + """Perform a dry run of deploying Service Binding Ports.""" + extra_vars = { + "subscription": subscription, + "dry_run": True, + "verb": "deploy", + "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - " + f"Deploy config for {subscription["description"]}", + } + + execute_playbook( + playbook_name="manage_sbp.yaml", + callback_route=callback_route, + inventory="\n".join(edge_port_fqdn_list), + extra_vars=extra_vars, + ) + + +@step("[FOR REAL] Deploy service binding port") +def provision_sbp_real( + subscription: dict[str, Any], + callback_route: str, + process_id: UUIDstr, + tt_number: str, + edge_port_fqdn_list: list[str], +) -> None: + """Deploy Service Binding Ports.""" + extra_vars = { + "subscription": subscription, + "dry_run": False, + "verb": "deploy", + "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - " + f"Deploy config for {subscription["description"]}", + } + + execute_playbook( + playbook_name="manage_sbp.yaml", + callback_route=callback_route, + inventory="\n".join(edge_port_fqdn_list), + extra_vars=extra_vars, + ) + + +@step("Check service binding port functionality") +def check_sbp_functionality(subscription: dict[str, Any], callback_route: str, edge_port_fqdn_list: list[str]) -> None: + """Check functionality of deployed Service Binding Ports.""" + extra_vars = {"subscription": subscription, "verb": "check"} + + execute_playbook( + playbook_name="manage_sbp.yaml", + callback_route=callback_route, + inventory="\n".join(edge_port_fqdn_list), + extra_vars=extra_vars, + ) + + +@step("[DRY RUN] Deploy BGP peers") +def deploy_bgp_peers_dry( + subscription: dict[str, Any], + callback_route: str, + edge_port_fqdn_list: list[str], + tt_number: str, + process_id: UUIDstr, +) -> None: + """Perform a dry run of deploying :term:`BGP` peers.""" + extra_vars = { + "subscription": subscription, + "verb": "deploy", + "dry_run": True, + "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - " + f"Deploying BGP peers for {subscription["description"]}", + } + + execute_playbook( + playbook_name="manage_bgp_peers.yaml", + callback_route=callback_route, + inventory="\n".join(edge_port_fqdn_list), + extra_vars=extra_vars, + ) + + +@step("[FOR REAL] Deploy BGP peers") +def deploy_bgp_peers_real( + subscription: dict[str, Any], + callback_route: str, + edge_port_fqdn_list: list[str], + tt_number: str, + process_id: UUIDstr, +) -> None: + """Deploy :term:`BGP` peers.""" + extra_vars = { + "subscription": subscription, + "verb": "deploy", + "dry_run": False, + "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - " + f"Deploying BGP peers for {subscription["description"]}", + } + + execute_playbook( + playbook_name="manage_bgp_peers.yaml", + callback_route=callback_route, + inventory="\n".join(edge_port_fqdn_list), + extra_vars=extra_vars, + ) + + +@step("Check BGP peers") +def check_bgp_peers(subscription: dict[str, Any], callback_route: str, edge_port_fqdn_list: list[str]) -> None: + """Check correct deployment of :term:`BGP` peers.""" + extra_vars = {"subscription": subscription, "verb": "check"} + + execute_playbook( + playbook_name="manage_bgp_peers.yaml", + callback_route=callback_route, + inventory="\n".join(edge_port_fqdn_list), + extra_vars=extra_vars, + ) + + +@step("Update Infoblox") +def update_dns_records(subscription: GeantIPInactive) -> None: + """Update :term:`DNS` records in Infoblox.""" + raise ProcessFailureError(subscription.description) + + +@workflow( + "Create GÉANT IP", + initial_input_form=wrap_create_initial_input_form(initial_input_form_generator), + target=Target.CREATE, +) +def create_geant_ip() -> StepList: + """Create a new GÉANT IP subscription. + + * Create subscription object in the service database + * Deploy service binding ports + * Deploy :term:`BGP` peers + * Update :term:`DNS` records + * Set the subscription in a provisioning state in the database + """ + return ( + begin + >> create_subscription + >> store_process_subscription(Target.CREATE) + >> initialize_subscription + >> lso_interaction(provision_sbp_dry) + >> lso_interaction(provision_sbp_real) + >> lso_interaction(check_sbp_functionality) + >> lso_interaction(deploy_bgp_peers_dry) + >> lso_interaction(deploy_bgp_peers_real) + >> lso_interaction(check_bgp_peers) + >> update_dns_records + >> set_status(SubscriptionLifecycle.PROVISIONING) + >> resync + >> done + )