From 6e407cc19de031677f1c7284b65326f5c66fe204 Mon Sep 17 00:00:00 2001 From: Neda Moeini <neda.moeini@geant.org> Date: Tue, 8 Oct 2024 11:01:41 +0200 Subject: [PATCH] Import Geant IP workflows. --- gso/cli/imports.py | 86 ++++++++++++- ...fdf9e83f3_add_import_geant_ip_workflows.py | 45 +++++++ ...33789df39_add_imported_geant_ip_product.py | 47 +++++++ gso/products/__init__.py | 5 +- gso/workflows/__init__.py | 2 + .../geant_ip/create_imported_geant_ip.py | 119 ++++++++++++++++++ gso/workflows/geant_ip/import_geant_ip.py | 27 ++++ 7 files changed, 327 insertions(+), 4 deletions(-) create mode 100644 gso/migrations/versions/2024-10-02_41dfdf9e83f3_add_import_geant_ip_workflows.py create mode 100644 gso/migrations/versions/2024-10-08_ac333789df39_add_imported_geant_ip_product.py create mode 100644 gso/workflows/geant_ip/create_imported_geant_ip.py create mode 100644 gso/workflows/geant_ip/import_geant_ip.py diff --git a/gso/cli/imports.py b/gso/cli/imports.py index 4e0d56f1..71c3c03a 100644 --- a/gso/cli/imports.py +++ b/gso/cli/imports.py @@ -18,9 +18,11 @@ from sqlalchemy.exc import SQLAlchemyError from gso.db.models import PartnerTable from gso.products import ProductType +from gso.products.product_blocks.bgp_session import IPFamily from gso.products.product_blocks.edge_port import EdgePortType, EncapsulationType from gso.products.product_blocks.iptrunk import IptrunkType from gso.products.product_blocks.router import RouterRole +from gso.products.product_blocks.service_binding_port import VLAN_ID from gso.services.partners import ( PartnerEmail, PartnerName, @@ -32,10 +34,10 @@ from gso.services.subscriptions import ( get_active_subscriptions_by_field_and_value, get_subscriptions, ) -from gso.utils.shared_enums import Vendor +from gso.utils.shared_enums import SBPType, Vendor from gso.utils.types.base_site import BaseSiteValidatorModel from gso.utils.types.interfaces import LAGMember, LAGMemberList, PhysicalPortCapacity -from gso.utils.types.ip_address import IPv4AddressType, IPv6AddressType, PortNumber +from gso.utils.types.ip_address import IPAddress, IPv4AddressType, IPv6AddressType, PortNumber app: typer.Typer = typer.Typer() @@ -212,6 +214,45 @@ class EdgePortImportModel(BaseModel): return self +class GeantIPImportModel(BaseModel): + """Import GEANT IP model.""" + + partner: str + service_binding_ports: list["GeantIPImportModel.ServiceBindingPort"] + + class BaseBGPPeer(BaseModel): + """Base BGP Peer model.""" + + bfd_enabled: bool = False + bfd_interval: int | None = None + bfd_multiplier: int | None = None + has_custom_policies: bool = False + authentication_key: str + multipath_enabled: bool = False + send_default_route: bool = False + is_passive: bool = False + peer_address: IPAddress + families: list[IPFamily] + is_multi_hop: bool + rtbh_enabled: bool # whether Remote Triggered Blackhole is enabled + + class ServiceBindingPort(BaseModel): + """Service Binding model.""" + + edge_port: str + ap_type: str + geant_sid: str + sbp_type: SBPType = SBPType.L3 + is_tagged: bool = False + vlan_id: VLAN_ID + custom_firewall_filters: bool = False + ipv4_address: IPv4AddressType + ipv6_address: IPv6AddressType + rtbh_enabled: bool = True + is_multi_hop: bool = True + bgp_peers: list["GeantIPImportModel.BaseBGPPeer"] + + T = TypeVar( "T", SiteImportModel, @@ -221,6 +262,7 @@ T = TypeVar( OfficeRouterImportModel, OpenGearImportModel, EdgePortImportModel, + GeantIPImportModel, ) common_filepath_option = typer.Option( @@ -265,7 +307,7 @@ def _import_partners_from_csv(file_path: Path) -> list[dict]: def _generic_import_product( - file_path: Path, imported_product_type: ProductType, workflow_suffix: str, name_key: str, import_model: type[T] + file_path: Path, imported_product_type: ProductType, workflow_suffix: str, name_key: str, import_model: type[T] ) -> None: """Import subscriptions from a JSON or YAML file.""" successfully_imported_data = [] @@ -474,3 +516,41 @@ def import_partners(file_path: str = typer.Argument(..., help="Path to the CSV f typer.echo(f"Failed to import partners: {e}") finally: db.session.close() + + +@app.command() +def import_geant_ip(filepath: str = common_filepath_option) -> None: + """Import GEANT IP into GSO.""" + successfully_imported_data = [] + geant_ip_list = _read_data(Path(filepath)) + + for geant_ip in geant_ip_list: + partner = geant_ip["partner"] + typer.echo(f"Creating imported GEANT IP for {partner}") + + try: + initial_data = GeantIPImportModel(**geant_ip) + start_process("create_imported_geant_ip", [initial_data.model_dump()]) + edge_ports = [sbp["edge_port"] for sbp in geant_ip["service_binding_ports"]] + successfully_imported_data.append(edge_ports) + typer.echo(f"Successfully created imported GEANT IP for {partner}") + except ValidationError as e: + typer.echo(f"Validation error: {e}") + + typer.echo("Waiting for the dust to settle before importing new products...") + time.sleep(1) + + # Migrate new products from imported to "full" counterpart. + imported_products = get_subscriptions( + [ProductType.IMPORTED_GEANT_IP], lifecycles=[SubscriptionLifecycle.ACTIVE], includes=["subscription_id"] + ) + + for subscription_id in imported_products: + typer.echo(f"Importing {subscription_id}") + start_process("import_geant_ip", [subscription_id]) + + if successfully_imported_data: + typer.echo("Successfully created imported GEANT IPs:") + for item in successfully_imported_data: + typer.echo(f"- {item}") + diff --git a/gso/migrations/versions/2024-10-02_41dfdf9e83f3_add_import_geant_ip_workflows.py b/gso/migrations/versions/2024-10-02_41dfdf9e83f3_add_import_geant_ip_workflows.py new file mode 100644 index 00000000..c6f335d9 --- /dev/null +++ b/gso/migrations/versions/2024-10-02_41dfdf9e83f3_add_import_geant_ip_workflows.py @@ -0,0 +1,45 @@ +"""Add Import Geant IP workflows.. + +Revision ID: 41dfdf9e83f3 +Revises: 289e5334848f +Create Date: 2024-10-02 14:55:03.223311 + +""" +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = '41dfdf9e83f3' +down_revision = '6dd8a91b8ce0' +branch_labels = None +depends_on = None + + +from orchestrator.migrations.helpers import create_workflow, delete_workflow + +new_workflows = [ + { + "name": "create_imported_geant_ip", + "target": "CREATE", + "description": "Import G\u00c9ANT IP", + "product_type": "GeantIP" + }, + { + "name": "import_geant_ip", + "target": "MODIFY", + "description": "Import GeantIP", + "product_type": "GeantIP" + } +] + + +def upgrade() -> None: + conn = op.get_bind() + for workflow in new_workflows: + create_workflow(conn, workflow) + + +def downgrade() -> None: + conn = op.get_bind() + for workflow in new_workflows: + delete_workflow(conn, workflow["name"]) diff --git a/gso/migrations/versions/2024-10-08_ac333789df39_add_imported_geant_ip_product.py b/gso/migrations/versions/2024-10-08_ac333789df39_add_imported_geant_ip_product.py new file mode 100644 index 00000000..971d3ce7 --- /dev/null +++ b/gso/migrations/versions/2024-10-08_ac333789df39_add_imported_geant_ip_product.py @@ -0,0 +1,47 @@ +"""Add Imported GEANT IP product.. + +Revision ID: ac333789df39 +Revises: 41dfdf9e83f3 +Create Date: 2024-10-08 11:00:59.799797 + +""" +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = 'ac333789df39' +down_revision = '41dfdf9e83f3' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + conn = op.get_bind() + conn.execute(sa.text(""" +INSERT INTO products (name, description, product_type, tag, status) VALUES ('ImportedGeantIP', 'ImportedGeantIP', 'ImportedGeantIP', 'IMPORTED_G_IP', 'active') RETURNING products.product_id + """)) + conn.execute(sa.text(""" +INSERT INTO product_product_blocks (product_id, product_block_id) VALUES ((SELECT products.product_id FROM products WHERE products.name IN ('ImportedGeantIP')), (SELECT product_blocks.product_block_id FROM product_blocks WHERE product_blocks.name IN ('GeantIPBlock'))) + """)) + + +def downgrade() -> None: + conn = op.get_bind() + conn.execute(sa.text(""" +DELETE FROM product_product_blocks WHERE product_product_blocks.product_id IN (SELECT products.product_id FROM products WHERE products.name IN ('ImportedGeantIP')) AND product_product_blocks.product_block_id IN (SELECT product_blocks.product_block_id FROM product_blocks WHERE product_blocks.name IN ('GeantIPBlock')) + """)) + conn.execute(sa.text(""" +DELETE FROM processes WHERE processes.pid IN (SELECT processes_subscriptions.pid FROM processes_subscriptions WHERE processes_subscriptions.subscription_id IN (SELECT subscriptions.subscription_id FROM subscriptions WHERE subscriptions.product_id IN (SELECT products.product_id FROM products WHERE products.name IN ('ImportedGeantIP')))) + """)) + conn.execute(sa.text(""" +DELETE FROM processes_subscriptions WHERE processes_subscriptions.subscription_id IN (SELECT subscriptions.subscription_id FROM subscriptions WHERE subscriptions.product_id IN (SELECT products.product_id FROM products WHERE products.name IN ('ImportedGeantIP'))) + """)) + conn.execute(sa.text(""" +DELETE FROM subscription_instances WHERE subscription_instances.subscription_id IN (SELECT subscriptions.subscription_id FROM subscriptions WHERE subscriptions.product_id IN (SELECT products.product_id FROM products WHERE products.name IN ('ImportedGeantIP'))) + """)) + conn.execute(sa.text(""" +DELETE FROM subscriptions WHERE subscriptions.product_id IN (SELECT products.product_id FROM products WHERE products.name IN ('ImportedGeantIP')) + """)) + conn.execute(sa.text(""" +DELETE FROM products WHERE products.name IN ('ImportedGeantIP') + """)) diff --git a/gso/products/__init__.py b/gso/products/__init__.py index 0e943f6a..d3a076ad 100644 --- a/gso/products/__init__.py +++ b/gso/products/__init__.py @@ -9,7 +9,7 @@ from orchestrator.domain import SUBSCRIPTION_MODEL_REGISTRY from pydantic_forms.types import strEnum from gso.products.product_types.edge_port import EdgePort, ImportedEdgePort -from gso.products.product_types.geant_ip import GeantIP +from gso.products.product_types.geant_ip import GeantIP, ImportedGeantIP from gso.products.product_types.iptrunk import ImportedIptrunk, Iptrunk from gso.products.product_types.lan_switch_interconnect import LanSwitchInterconnect from gso.products.product_types.office_router import ImportedOfficeRouter, OfficeRouter @@ -42,6 +42,7 @@ class ProductName(strEnum): EDGE_PORT = "Edge port" IMPORTED_EDGE_PORT = "Imported Edge port" GEANT_IP = "GEANT IP" + IMPORTED_GEANT_IP = "Imported GEANT IP" class ProductType(strEnum): @@ -65,6 +66,7 @@ class ProductType(strEnum): EDGE_PORT = EdgePort.__name__ IMPORTED_EDGE_PORT = ImportedEdgePort.__name__ GEANT_IP = GeantIP.__name__ + IMPORTED_GEANT_IP = ImportedGeantIP.__name__ SUBSCRIPTION_MODEL_REGISTRY.update( @@ -87,5 +89,6 @@ SUBSCRIPTION_MODEL_REGISTRY.update( ProductName.EDGE_PORT.value: EdgePort, ProductName.IMPORTED_EDGE_PORT.value: ImportedEdgePort, ProductType.GEANT_IP.value: GeantIP, + ProductType.IMPORTED_GEANT_IP.value: ImportedGeantIP, }, ) diff --git a/gso/workflows/__init__.py b/gso/workflows/__init__.py index 4bf546c4..e778e142 100644 --- a/gso/workflows/__init__.py +++ b/gso/workflows/__init__.py @@ -90,3 +90,5 @@ LazyWorkflowInstance("gso.workflows.edge_port.import_edge_port", "import_edge_po # GÉANT IP workflows LazyWorkflowInstance("gso.workflows.geant_ip.create_geant_ip", "create_geant_ip") LazyWorkflowInstance("gso.workflows.geant_ip.modify_geant_ip", "modify_geant_ip") +LazyWorkflowInstance("gso.workflows.geant_ip.create_imported_geant_ip", "create_imported_geant_ip") +LazyWorkflowInstance("gso.workflows.geant_ip.import_geant_ip", "import_geant_ip") diff --git a/gso/workflows/geant_ip/create_imported_geant_ip.py b/gso/workflows/geant_ip/create_imported_geant_ip.py new file mode 100644 index 00000000..7b57a24b --- /dev/null +++ b/gso/workflows/geant_ip/create_imported_geant_ip.py @@ -0,0 +1,119 @@ +"""A creation workflow for adding an existing GEANT IP to the service database.""" +from uuid import uuid4 + +from orchestrator import workflow +from orchestrator.forms import FormPage +from orchestrator.targets import Target +from orchestrator.types import FormGenerator, SubscriptionLifecycle +from orchestrator.workflow import StepList, begin, done, step +from orchestrator.workflows.steps import resync, set_status, store_process_subscription +from pydantic import BaseModel +from pydantic_forms.types import UUIDstr + +from gso.products import EdgePort, ProductName +from gso.products.product_blocks.bgp_session import IPFamily, BGPSession +from gso.products.product_blocks.geant_ip import NRENAccessPortInactive +from gso.products.product_blocks.service_binding_port import VLAN_ID, ServiceBindingPort +from gso.products.product_types.geant_ip import ImportedGeantIPInactive +from gso.services.partners import get_partner_by_name +from gso.services.subscriptions import get_product_id_by_name +from gso.utils.shared_enums import SBPType +from gso.utils.types.ip_address import IPAddress, IPv4AddressType, IPv6AddressType + + +def initial_input_form_generator() -> FormGenerator: + """Take all information passed to this workflow by the :term:`API` endpoint that was called.""" + + class BaseBGPPeer(BaseModel): + bfd_enabled: bool = False + bfd_interval: int | None = None + bfd_multiplier: int | None = None + has_custom_policies: bool = False + authentication_key: str + multipath_enabled: bool = False + send_default_route: bool = False + is_passive: bool = False + peer_address: IPAddress + families: list[IPFamily] + is_multi_hop: bool + rtbh_enabled: bool + + class ServiceBindingPort(BaseModel): + edge_port: UUIDstr + ap_type: str + geant_sid: str + sbp_type: SBPType = SBPType.L3 + is_tagged: bool = False + vlan_id: VLAN_ID + custom_firewall_filters: bool = False + ipv4_address: IPv4AddressType + ipv6_address: IPv6AddressType + rtbh_enabled: bool = True + is_multi_hop: bool = True + bgp_peers: list[BaseBGPPeer] + + class ImportGeantIPForm(FormPage): + partner: str + service_binding_ports: list[ServiceBindingPort] + + user_input = yield ImportGeantIPForm + + return { + "partner": user_input.partner, + "initial_input": user_input.dict(), + } + + +@step("Create subscription") +def create_subscription(partner: str) -> dict: + """Create a new subscription object in the database.""" + partner_id = get_partner_by_name(partner)["partner_id"] + product_id = get_product_id_by_name(ProductName.IMPORTED_GEANT_IP) + subscription = ImportedGeantIPInactive.from_product_id(product_id, partner_id) + return {"subscription": subscription, "subscription_id": subscription.subscription_id} + + +@step("Initialize subscription") +def initialize_subscription(subscription: ImportedGeantIPInactive, initial_input: dict) -> dict: + """Initialize the subscription with the user input.""" + for service_binding_port in initial_input["service_binding_ports"]: + edge_port_subscription = EdgePort.from_subscription(service_binding_port["edge_port"]) + sbp_bgp_session_list = [ + BGPSession.new(subscription_id=uuid4(), **session) + for session in service_binding_port["bgp_peers"] + ] + ServiceBindingPort.new( + subscription_id=uuid4(), + edge_port=edge_port_subscription, + sbp_bgp_session_list=sbp_bgp_session_list, + **service_binding_port, + ) + subscription.geant_ip.geant_ip_ap_list.append( + NRENAccessPortInactive.new( + subscription_id=uuid4(), + nren_ap_type=service_binding_port["ap_type"], + geant_ip_sbp=service_binding_port, + ) + ) + + subscription.description = "GEANT IP service" + + return {"subscription": subscription} + + +@workflow( + "Import GÉANT IP", + initial_input_form=initial_input_form_generator, + target=Target.CREATE, +) +def create_imported_geant_ip() -> StepList: + """Import a GÉANT IP without provisioning it.""" + return ( + begin + >> create_subscription + >> store_process_subscription(Target.CREATE) + >> initialize_subscription + >> set_status(SubscriptionLifecycle.ACTIVE) + >> resync + >> done + ) diff --git a/gso/workflows/geant_ip/import_geant_ip.py b/gso/workflows/geant_ip/import_geant_ip.py new file mode 100644 index 00000000..a543d897 --- /dev/null +++ b/gso/workflows/geant_ip/import_geant_ip.py @@ -0,0 +1,27 @@ +"""A modification workflow for migrating an ImportedGeantIP to an GeantIP subscription.""" + +from orchestrator.targets import Target +from orchestrator.types import State, UUIDstr +from orchestrator.workflow import StepList, done, init, step, workflow +from orchestrator.workflows.steps import resync, store_process_subscription, unsync +from orchestrator.workflows.utils import wrap_modify_initial_input_form + +from gso.products import ProductName +from gso.products.product_types.geant_ip import GeantIP, ImportedGeantIP +from gso.services.subscriptions import get_product_id_by_name + + +@step("Create new IP trunk subscription") +def import_geant_ip_subscription(subscription_id: UUIDstr) -> State: + """Take an ImportedGeantIP subscription, and turn it into an GeantIP subscription.""" + old_geant_ip = ImportedGeantIP.from_subscription(subscription_id) + new_subscription_id = get_product_id_by_name(ProductName.IP_TRUNK) + new_subscription = GeantIP.from_other_product(old_geant_ip, new_subscription_id) # type: ignore[arg-type] + + return {"subscription": new_subscription} + + +@workflow("Import GeantIP", target=Target.MODIFY, initial_input_form=wrap_modify_initial_input_form(None)) +def import_geant_ip() -> StepList: + """Modify an ImportedGeantIP subscription into an GeantIP subscription to complete the import.""" + return init >> store_process_subscription(Target.MODIFY) >> unsync >> import_geant_ip_subscription >> resync >> done -- GitLab