Skip to content
Snippets Groups Projects
Verified Commit 6e407cc1 authored by Neda Moeini's avatar Neda Moeini Committed by Karel van Klink
Browse files

Import Geant IP workflows.

parent 428c680e
No related branches found
No related tags found
1 merge request!286Add Edge Port, GÉANT IP and IAS products
...@@ -18,9 +18,11 @@ from sqlalchemy.exc import SQLAlchemyError ...@@ -18,9 +18,11 @@ from sqlalchemy.exc import SQLAlchemyError
from gso.db.models import PartnerTable from gso.db.models import PartnerTable
from gso.products import ProductType from gso.products import ProductType
from gso.products.product_blocks.bgp_session import IPFamily
from gso.products.product_blocks.edge_port import EdgePortType, EncapsulationType from gso.products.product_blocks.edge_port import EdgePortType, EncapsulationType
from gso.products.product_blocks.iptrunk import IptrunkType from gso.products.product_blocks.iptrunk import IptrunkType
from gso.products.product_blocks.router import RouterRole from gso.products.product_blocks.router import RouterRole
from gso.products.product_blocks.service_binding_port import VLAN_ID
from gso.services.partners import ( from gso.services.partners import (
PartnerEmail, PartnerEmail,
PartnerName, PartnerName,
...@@ -32,10 +34,10 @@ from gso.services.subscriptions import ( ...@@ -32,10 +34,10 @@ from gso.services.subscriptions import (
get_active_subscriptions_by_field_and_value, get_active_subscriptions_by_field_and_value,
get_subscriptions, get_subscriptions,
) )
from gso.utils.shared_enums import Vendor from gso.utils.shared_enums import SBPType, Vendor
from gso.utils.types.base_site import BaseSiteValidatorModel from gso.utils.types.base_site import BaseSiteValidatorModel
from gso.utils.types.interfaces import LAGMember, LAGMemberList, PhysicalPortCapacity from gso.utils.types.interfaces import LAGMember, LAGMemberList, PhysicalPortCapacity
from gso.utils.types.ip_address import IPv4AddressType, IPv6AddressType, PortNumber from gso.utils.types.ip_address import IPAddress, IPv4AddressType, IPv6AddressType, PortNumber
app: typer.Typer = typer.Typer() app: typer.Typer = typer.Typer()
...@@ -212,6 +214,45 @@ class EdgePortImportModel(BaseModel): ...@@ -212,6 +214,45 @@ class EdgePortImportModel(BaseModel):
return self return self
class GeantIPImportModel(BaseModel):
"""Import GEANT IP model."""
partner: str
service_binding_ports: list["GeantIPImportModel.ServiceBindingPort"]
class BaseBGPPeer(BaseModel):
"""Base BGP Peer model."""
bfd_enabled: bool = False
bfd_interval: int | None = None
bfd_multiplier: int | None = None
has_custom_policies: bool = False
authentication_key: str
multipath_enabled: bool = False
send_default_route: bool = False
is_passive: bool = False
peer_address: IPAddress
families: list[IPFamily]
is_multi_hop: bool
rtbh_enabled: bool # whether Remote Triggered Blackhole is enabled
class ServiceBindingPort(BaseModel):
"""Service Binding model."""
edge_port: str
ap_type: str
geant_sid: str
sbp_type: SBPType = SBPType.L3
is_tagged: bool = False
vlan_id: VLAN_ID
custom_firewall_filters: bool = False
ipv4_address: IPv4AddressType
ipv6_address: IPv6AddressType
rtbh_enabled: bool = True
is_multi_hop: bool = True
bgp_peers: list["GeantIPImportModel.BaseBGPPeer"]
T = TypeVar( T = TypeVar(
"T", "T",
SiteImportModel, SiteImportModel,
...@@ -221,6 +262,7 @@ T = TypeVar( ...@@ -221,6 +262,7 @@ T = TypeVar(
OfficeRouterImportModel, OfficeRouterImportModel,
OpenGearImportModel, OpenGearImportModel,
EdgePortImportModel, EdgePortImportModel,
GeantIPImportModel,
) )
common_filepath_option = typer.Option( common_filepath_option = typer.Option(
...@@ -265,7 +307,7 @@ def _import_partners_from_csv(file_path: Path) -> list[dict]: ...@@ -265,7 +307,7 @@ def _import_partners_from_csv(file_path: Path) -> list[dict]:
def _generic_import_product( def _generic_import_product(
file_path: Path, imported_product_type: ProductType, workflow_suffix: str, name_key: str, import_model: type[T] file_path: Path, imported_product_type: ProductType, workflow_suffix: str, name_key: str, import_model: type[T]
) -> None: ) -> None:
"""Import subscriptions from a JSON or YAML file.""" """Import subscriptions from a JSON or YAML file."""
successfully_imported_data = [] successfully_imported_data = []
...@@ -474,3 +516,41 @@ def import_partners(file_path: str = typer.Argument(..., help="Path to the CSV f ...@@ -474,3 +516,41 @@ def import_partners(file_path: str = typer.Argument(..., help="Path to the CSV f
typer.echo(f"Failed to import partners: {e}") typer.echo(f"Failed to import partners: {e}")
finally: finally:
db.session.close() db.session.close()
@app.command()
def import_geant_ip(filepath: str = common_filepath_option) -> None:
"""Import GEANT IP into GSO."""
successfully_imported_data = []
geant_ip_list = _read_data(Path(filepath))
for geant_ip in geant_ip_list:
partner = geant_ip["partner"]
typer.echo(f"Creating imported GEANT IP for {partner}")
try:
initial_data = GeantIPImportModel(**geant_ip)
start_process("create_imported_geant_ip", [initial_data.model_dump()])
edge_ports = [sbp["edge_port"] for sbp in geant_ip["service_binding_ports"]]
successfully_imported_data.append(edge_ports)
typer.echo(f"Successfully created imported GEANT IP for {partner}")
except ValidationError as e:
typer.echo(f"Validation error: {e}")
typer.echo("Waiting for the dust to settle before importing new products...")
time.sleep(1)
# Migrate new products from imported to "full" counterpart.
imported_products = get_subscriptions(
[ProductType.IMPORTED_GEANT_IP], lifecycles=[SubscriptionLifecycle.ACTIVE], includes=["subscription_id"]
)
for subscription_id in imported_products:
typer.echo(f"Importing {subscription_id}")
start_process("import_geant_ip", [subscription_id])
if successfully_imported_data:
typer.echo("Successfully created imported GEANT IPs:")
for item in successfully_imported_data:
typer.echo(f"- {item}")
"""Add Import Geant IP workflows..
Revision ID: 41dfdf9e83f3
Revises: 289e5334848f
Create Date: 2024-10-02 14:55:03.223311
"""
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = '41dfdf9e83f3'
down_revision = '6dd8a91b8ce0'
branch_labels = None
depends_on = None
from orchestrator.migrations.helpers import create_workflow, delete_workflow
new_workflows = [
{
"name": "create_imported_geant_ip",
"target": "CREATE",
"description": "Import G\u00c9ANT IP",
"product_type": "GeantIP"
},
{
"name": "import_geant_ip",
"target": "MODIFY",
"description": "Import GeantIP",
"product_type": "GeantIP"
}
]
def upgrade() -> None:
conn = op.get_bind()
for workflow in new_workflows:
create_workflow(conn, workflow)
def downgrade() -> None:
conn = op.get_bind()
for workflow in new_workflows:
delete_workflow(conn, workflow["name"])
"""Add Imported GEANT IP product..
Revision ID: ac333789df39
Revises: 41dfdf9e83f3
Create Date: 2024-10-08 11:00:59.799797
"""
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = 'ac333789df39'
down_revision = '41dfdf9e83f3'
branch_labels = None
depends_on = None
def upgrade() -> None:
conn = op.get_bind()
conn.execute(sa.text("""
INSERT INTO products (name, description, product_type, tag, status) VALUES ('ImportedGeantIP', 'ImportedGeantIP', 'ImportedGeantIP', 'IMPORTED_G_IP', 'active') RETURNING products.product_id
"""))
conn.execute(sa.text("""
INSERT INTO product_product_blocks (product_id, product_block_id) VALUES ((SELECT products.product_id FROM products WHERE products.name IN ('ImportedGeantIP')), (SELECT product_blocks.product_block_id FROM product_blocks WHERE product_blocks.name IN ('GeantIPBlock')))
"""))
def downgrade() -> None:
conn = op.get_bind()
conn.execute(sa.text("""
DELETE FROM product_product_blocks WHERE product_product_blocks.product_id IN (SELECT products.product_id FROM products WHERE products.name IN ('ImportedGeantIP')) AND product_product_blocks.product_block_id IN (SELECT product_blocks.product_block_id FROM product_blocks WHERE product_blocks.name IN ('GeantIPBlock'))
"""))
conn.execute(sa.text("""
DELETE FROM processes WHERE processes.pid IN (SELECT processes_subscriptions.pid FROM processes_subscriptions WHERE processes_subscriptions.subscription_id IN (SELECT subscriptions.subscription_id FROM subscriptions WHERE subscriptions.product_id IN (SELECT products.product_id FROM products WHERE products.name IN ('ImportedGeantIP'))))
"""))
conn.execute(sa.text("""
DELETE FROM processes_subscriptions WHERE processes_subscriptions.subscription_id IN (SELECT subscriptions.subscription_id FROM subscriptions WHERE subscriptions.product_id IN (SELECT products.product_id FROM products WHERE products.name IN ('ImportedGeantIP')))
"""))
conn.execute(sa.text("""
DELETE FROM subscription_instances WHERE subscription_instances.subscription_id IN (SELECT subscriptions.subscription_id FROM subscriptions WHERE subscriptions.product_id IN (SELECT products.product_id FROM products WHERE products.name IN ('ImportedGeantIP')))
"""))
conn.execute(sa.text("""
DELETE FROM subscriptions WHERE subscriptions.product_id IN (SELECT products.product_id FROM products WHERE products.name IN ('ImportedGeantIP'))
"""))
conn.execute(sa.text("""
DELETE FROM products WHERE products.name IN ('ImportedGeantIP')
"""))
...@@ -9,7 +9,7 @@ from orchestrator.domain import SUBSCRIPTION_MODEL_REGISTRY ...@@ -9,7 +9,7 @@ from orchestrator.domain import SUBSCRIPTION_MODEL_REGISTRY
from pydantic_forms.types import strEnum from pydantic_forms.types import strEnum
from gso.products.product_types.edge_port import EdgePort, ImportedEdgePort from gso.products.product_types.edge_port import EdgePort, ImportedEdgePort
from gso.products.product_types.geant_ip import GeantIP from gso.products.product_types.geant_ip import GeantIP, ImportedGeantIP
from gso.products.product_types.iptrunk import ImportedIptrunk, Iptrunk from gso.products.product_types.iptrunk import ImportedIptrunk, Iptrunk
from gso.products.product_types.lan_switch_interconnect import LanSwitchInterconnect from gso.products.product_types.lan_switch_interconnect import LanSwitchInterconnect
from gso.products.product_types.office_router import ImportedOfficeRouter, OfficeRouter from gso.products.product_types.office_router import ImportedOfficeRouter, OfficeRouter
...@@ -42,6 +42,7 @@ class ProductName(strEnum): ...@@ -42,6 +42,7 @@ class ProductName(strEnum):
EDGE_PORT = "Edge port" EDGE_PORT = "Edge port"
IMPORTED_EDGE_PORT = "Imported Edge port" IMPORTED_EDGE_PORT = "Imported Edge port"
GEANT_IP = "GEANT IP" GEANT_IP = "GEANT IP"
IMPORTED_GEANT_IP = "Imported GEANT IP"
class ProductType(strEnum): class ProductType(strEnum):
...@@ -65,6 +66,7 @@ class ProductType(strEnum): ...@@ -65,6 +66,7 @@ class ProductType(strEnum):
EDGE_PORT = EdgePort.__name__ EDGE_PORT = EdgePort.__name__
IMPORTED_EDGE_PORT = ImportedEdgePort.__name__ IMPORTED_EDGE_PORT = ImportedEdgePort.__name__
GEANT_IP = GeantIP.__name__ GEANT_IP = GeantIP.__name__
IMPORTED_GEANT_IP = ImportedGeantIP.__name__
SUBSCRIPTION_MODEL_REGISTRY.update( SUBSCRIPTION_MODEL_REGISTRY.update(
...@@ -87,5 +89,6 @@ SUBSCRIPTION_MODEL_REGISTRY.update( ...@@ -87,5 +89,6 @@ SUBSCRIPTION_MODEL_REGISTRY.update(
ProductName.EDGE_PORT.value: EdgePort, ProductName.EDGE_PORT.value: EdgePort,
ProductName.IMPORTED_EDGE_PORT.value: ImportedEdgePort, ProductName.IMPORTED_EDGE_PORT.value: ImportedEdgePort,
ProductType.GEANT_IP.value: GeantIP, ProductType.GEANT_IP.value: GeantIP,
ProductType.IMPORTED_GEANT_IP.value: ImportedGeantIP,
}, },
) )
...@@ -90,3 +90,5 @@ LazyWorkflowInstance("gso.workflows.edge_port.import_edge_port", "import_edge_po ...@@ -90,3 +90,5 @@ LazyWorkflowInstance("gso.workflows.edge_port.import_edge_port", "import_edge_po
# GÉANT IP workflows # GÉANT IP workflows
LazyWorkflowInstance("gso.workflows.geant_ip.create_geant_ip", "create_geant_ip") LazyWorkflowInstance("gso.workflows.geant_ip.create_geant_ip", "create_geant_ip")
LazyWorkflowInstance("gso.workflows.geant_ip.modify_geant_ip", "modify_geant_ip") LazyWorkflowInstance("gso.workflows.geant_ip.modify_geant_ip", "modify_geant_ip")
LazyWorkflowInstance("gso.workflows.geant_ip.create_imported_geant_ip", "create_imported_geant_ip")
LazyWorkflowInstance("gso.workflows.geant_ip.import_geant_ip", "import_geant_ip")
"""A creation workflow for adding an existing GEANT IP to the service database."""
from uuid import uuid4
from orchestrator import workflow
from orchestrator.forms import FormPage
from orchestrator.targets import Target
from orchestrator.types import FormGenerator, SubscriptionLifecycle
from orchestrator.workflow import StepList, begin, done, step
from orchestrator.workflows.steps import resync, set_status, store_process_subscription
from pydantic import BaseModel
from pydantic_forms.types import UUIDstr
from gso.products import EdgePort, ProductName
from gso.products.product_blocks.bgp_session import IPFamily, BGPSession
from gso.products.product_blocks.geant_ip import NRENAccessPortInactive
from gso.products.product_blocks.service_binding_port import VLAN_ID, ServiceBindingPort
from gso.products.product_types.geant_ip import ImportedGeantIPInactive
from gso.services.partners import get_partner_by_name
from gso.services.subscriptions import get_product_id_by_name
from gso.utils.shared_enums import SBPType
from gso.utils.types.ip_address import IPAddress, IPv4AddressType, IPv6AddressType
def initial_input_form_generator() -> FormGenerator:
"""Take all information passed to this workflow by the :term:`API` endpoint that was called."""
class BaseBGPPeer(BaseModel):
bfd_enabled: bool = False
bfd_interval: int | None = None
bfd_multiplier: int | None = None
has_custom_policies: bool = False
authentication_key: str
multipath_enabled: bool = False
send_default_route: bool = False
is_passive: bool = False
peer_address: IPAddress
families: list[IPFamily]
is_multi_hop: bool
rtbh_enabled: bool
class ServiceBindingPort(BaseModel):
edge_port: UUIDstr
ap_type: str
geant_sid: str
sbp_type: SBPType = SBPType.L3
is_tagged: bool = False
vlan_id: VLAN_ID
custom_firewall_filters: bool = False
ipv4_address: IPv4AddressType
ipv6_address: IPv6AddressType
rtbh_enabled: bool = True
is_multi_hop: bool = True
bgp_peers: list[BaseBGPPeer]
class ImportGeantIPForm(FormPage):
partner: str
service_binding_ports: list[ServiceBindingPort]
user_input = yield ImportGeantIPForm
return {
"partner": user_input.partner,
"initial_input": user_input.dict(),
}
@step("Create subscription")
def create_subscription(partner: str) -> dict:
"""Create a new subscription object in the database."""
partner_id = get_partner_by_name(partner)["partner_id"]
product_id = get_product_id_by_name(ProductName.IMPORTED_GEANT_IP)
subscription = ImportedGeantIPInactive.from_product_id(product_id, partner_id)
return {"subscription": subscription, "subscription_id": subscription.subscription_id}
@step("Initialize subscription")
def initialize_subscription(subscription: ImportedGeantIPInactive, initial_input: dict) -> dict:
"""Initialize the subscription with the user input."""
for service_binding_port in initial_input["service_binding_ports"]:
edge_port_subscription = EdgePort.from_subscription(service_binding_port["edge_port"])
sbp_bgp_session_list = [
BGPSession.new(subscription_id=uuid4(), **session)
for session in service_binding_port["bgp_peers"]
]
ServiceBindingPort.new(
subscription_id=uuid4(),
edge_port=edge_port_subscription,
sbp_bgp_session_list=sbp_bgp_session_list,
**service_binding_port,
)
subscription.geant_ip.geant_ip_ap_list.append(
NRENAccessPortInactive.new(
subscription_id=uuid4(),
nren_ap_type=service_binding_port["ap_type"],
geant_ip_sbp=service_binding_port,
)
)
subscription.description = "GEANT IP service"
return {"subscription": subscription}
@workflow(
"Import GÉANT IP",
initial_input_form=initial_input_form_generator,
target=Target.CREATE,
)
def create_imported_geant_ip() -> StepList:
"""Import a GÉANT IP without provisioning it."""
return (
begin
>> create_subscription
>> store_process_subscription(Target.CREATE)
>> initialize_subscription
>> set_status(SubscriptionLifecycle.ACTIVE)
>> resync
>> done
)
"""A modification workflow for migrating an ImportedGeantIP to an GeantIP subscription."""
from orchestrator.targets import Target
from orchestrator.types import State, UUIDstr
from orchestrator.workflow import StepList, done, init, step, workflow
from orchestrator.workflows.steps import resync, store_process_subscription, unsync
from orchestrator.workflows.utils import wrap_modify_initial_input_form
from gso.products import ProductName
from gso.products.product_types.geant_ip import GeantIP, ImportedGeantIP
from gso.services.subscriptions import get_product_id_by_name
@step("Create new IP trunk subscription")
def import_geant_ip_subscription(subscription_id: UUIDstr) -> State:
"""Take an ImportedGeantIP subscription, and turn it into an GeantIP subscription."""
old_geant_ip = ImportedGeantIP.from_subscription(subscription_id)
new_subscription_id = get_product_id_by_name(ProductName.IP_TRUNK)
new_subscription = GeantIP.from_other_product(old_geant_ip, new_subscription_id) # type: ignore[arg-type]
return {"subscription": new_subscription}
@workflow("Import GeantIP", target=Target.MODIFY, initial_input_form=wrap_modify_initial_input_form(None))
def import_geant_ip() -> StepList:
"""Modify an ImportedGeantIP subscription into an GeantIP subscription to complete the import."""
return init >> store_process_subscription(Target.MODIFY) >> unsync >> import_geant_ip_subscription >> resync >> done
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment