""":term:`CLI` commands for importing data to coreDB.""" import csv import ipaddress import json import time from datetime import UTC, datetime from pathlib import Path from typing import Self, TypeVar import typer import yaml from orchestrator.db import db from orchestrator.services.processes import start_process from orchestrator.types import SubscriptionLifecycle from pydantic import BaseModel, EmailStr, ValidationError, field_validator, model_validator from sqlalchemy.exc import SQLAlchemyError from gso.db.models import PartnerTable from gso.products import ProductType from gso.products.product_blocks.iptrunk import IptrunkType, PhysicalPortCapacity from gso.products.product_blocks.router import RouterRole from gso.services.partners import ( PartnerNotFoundError, filter_partners_by_email, filter_partners_by_name, get_partner_by_name, ) from gso.services.subscriptions import ( get_active_router_subscriptions, get_active_subscriptions_by_field_and_value, get_subscriptions, ) from gso.utils.helpers import BaseSiteValidatorModel, LAGMember from gso.utils.shared_enums import IPv4AddressType, IPv6AddressType, PortNumber, Vendor app: typer.Typer = typer.Typer() class CreatePartner(BaseModel): """Required inputs for creating a partner.""" name: str email: EmailStr @field_validator("name") def validate_name(cls, name: str) -> str: """Validate name.""" if filter_partners_by_name(name=name, case_sensitive=False): msg = "Partner with this name already exists." raise ValueError(msg) return name @field_validator("email") def validate_email(cls, email: str) -> EmailStr: """Validate email.""" email = email.lower() if filter_partners_by_email(email=email, case_sensitive=False): msg = "Partner with this email already exists." raise ValueError(msg) return email class SiteImportModel(BaseSiteValidatorModel): """The required input for importing an existing :class:`gso.products.product_types.site`.""" class RouterImportModel(BaseModel): """Required fields for importing an existing :class:`gso.product.product_types.router`.""" partner: str router_site: str hostname: str ts_port: int router_vendor: Vendor router_role: RouterRole router_lo_ipv4_address: IPv4AddressType router_lo_ipv6_address: IPv6AddressType router_lo_iso_address: str class SuperPopSwitchImportModel(BaseModel): """Required fields for importing an existing :class:`gso.product.product_types.super_pop_switch`.""" partner: str super_pop_switch_site: str hostname: str super_pop_switch_ts_port: PortNumber super_pop_switch_mgmt_ipv4_address: ipaddress.IPv4Address class OfficeRouterImportModel(BaseModel): """Required fields for importing an existing :class:`gso.product.product_types.office_router`.""" partner: str office_router_site: str office_router_fqdn: str office_router_ts_port: PortNumber office_router_lo_ipv4_address: ipaddress.IPv4Address office_router_lo_ipv6_address: ipaddress.IPv6Address class IptrunkImportModel(BaseModel): """Required fields for importing an existing :class:`gso.products.product_types.iptrunk`.""" partner: str geant_s_sid: str | None iptrunk_type: IptrunkType iptrunk_description: str | None = None iptrunk_speed: PhysicalPortCapacity iptrunk_minimum_links: int iptrunk_isis_metric: int side_a_node_id: str side_a_ae_iface: str side_a_ae_geant_a_sid: str | None side_a_ae_members: list[LAGMember] side_b_node_id: str side_b_ae_iface: str side_b_ae_geant_a_sid: str | None side_b_ae_members: list[LAGMember] iptrunk_ipv4_network: ipaddress.IPv4Network iptrunk_ipv6_network: ipaddress.IPv6Network @classmethod def _get_active_routers(cls) -> set[str]: return { str(router["subscription_id"]) for router in get_active_router_subscriptions(includes=["subscription_id"]) } @field_validator("partner") def check_if_partner_exists(cls, value: str) -> str: """Validate that the partner exists.""" try: get_partner_by_name(value) except PartnerNotFoundError as e: msg = f"partner {value} not found" raise ValueError(msg) from e return value @field_validator("side_a_node_id", "side_b_node_id") def check_if_router_side_is_available(cls, value: str) -> str: """Both sides of the trunk must exist in :term:`GSO`.""" if value not in cls._get_active_routers(): msg = f"Router {value} not found" raise ValueError(msg) return value @field_validator("side_a_ae_members", "side_b_ae_members") def check_side_uniqueness(cls, value: list[str]) -> list[str]: """:term:`LAG` members must be unique.""" if len(value) != len(set(value)): msg = "Items must be unique" raise ValueError(msg) return value @model_validator(mode="after") def check_members(self) -> Self: """Amount of :term:`LAG` members has to match on side A and B, and meet the minimum requirement.""" len_a = len(self.side_a_ae_members) len_b = len(self.side_b_ae_members) if len_a < self.iptrunk_minimum_links: msg = f"Side A members should be at least {self.iptrunk_minimum_links} (iptrunk_minimum_links)" raise ValueError(msg) if len_a != len_b: msg = "Mismatch between Side A and B members" raise ValueError(msg) return self class OpenGearImportModel(BaseModel): """Required fields for importing an existing :class:`gso.products.product_types.opengear`.""" partner: str opengear_site: str opengear_hostname: str opengear_wan_address: IPv4AddressType opengear_wan_netmask: IPv4AddressType opengear_wan_gateway: IPv4AddressType T = TypeVar( "T", SiteImportModel, RouterImportModel, IptrunkImportModel, SuperPopSwitchImportModel, OfficeRouterImportModel, OpenGearImportModel, ) common_filepath_option = typer.Option( default="data.json", help="Path to the file", ) def _read_data(file_path: Path) -> dict: """Read data from a JSON or YAML file.""" typer.echo(f"Starting import from {file_path!s}") file_extension = file_path.suffix.lower() with file_path.open("r") as f: supported_extensions = {".json", ".yaml", ".yml"} if file_extension == ".json": return json.load(f) if file_extension in supported_extensions: return yaml.safe_load(f) typer.echo(f"Unsupported file format: {file_extension}") raise typer.Exit(code=1) def _get_router_subscription_id(node_name: str) -> str | None: """Get the subscription id for a router by its node name.""" subscriptions = get_active_subscriptions_by_field_and_value( "router_fqdn", node_name, ) if subscriptions: return str(subscriptions[0].subscription_id) return None def _import_partners_from_csv(file_path: Path) -> list[dict]: """Read partners from a CSV file.""" with Path.open(file_path, encoding="utf-8") as csv_file: csv_reader = csv.DictReader(csv_file) return list(csv_reader) def _generic_import_product( file_path: Path, imported_product_type: ProductType, workflow_suffix: str, name_key: str, import_model: type[T] ) -> None: """Import subscriptions from a JSON or YAML file.""" successfully_imported_data = [] data = _read_data(file_path) for details in data: details["partner"] = "GEANT" typer.echo(f"Creating imported {name_key}: {details[name_key]}") try: initial_data = import_model(**details) start_process(f"create_imported_{workflow_suffix}", [initial_data.model_dump()]) successfully_imported_data.append(getattr(initial_data, name_key)) typer.echo( f"Successfully created {name_key}: {getattr(initial_data, name_key)}", ) except ValidationError as e: typer.echo(f"Validation error: {e}") typer.echo("Waiting for the dust to settle before moving on the importing new products...") time.sleep(1) # Migrate new products from imported to "full" counterpart. imported_products = get_subscriptions( [imported_product_type], lifecycles=[SubscriptionLifecycle.ACTIVE], includes=["subscription_id"] ) for subscription_id in imported_products: typer.echo(f"Importing {subscription_id}") start_process(f"import_{workflow_suffix}", [subscription_id]) if successfully_imported_data: typer.echo(f"Successfully created imported {name_key}s:") for item in successfully_imported_data: typer.echo(f"- {item}") typer.echo(f"Please validate no more imported {workflow_suffix} products exist anymore in the database.") @app.command() def import_sites(filepath: str = common_filepath_option) -> None: """Import sites into GSO.""" _generic_import_product(Path(filepath), ProductType.IMPORTED_SITE, "site", "site_name", SiteImportModel) @app.command() def import_routers(filepath: str = common_filepath_option) -> None: """Import routers into GSO.""" _generic_import_product(Path(filepath), ProductType.IMPORTED_ROUTER, "router", "hostname", RouterImportModel) @app.command() def import_super_pop_switches(filepath: str = common_filepath_option) -> None: """Import Super PoP Switches into GSO.""" _generic_import_product( Path(filepath), ProductType.IMPORTED_SUPER_POP_SWITCH, "super_pop_switch", "hostname", SuperPopSwitchImportModel, ) @app.command() def import_office_routers(filepath: str = common_filepath_option) -> None: """Import office routers into GSO.""" _generic_import_product( Path(filepath), ProductType.IMPORTED_OFFICE_ROUTER, "office_router", "office_router_fqdn", OfficeRouterImportModel, ) @app.command() def import_opengear(filepath: str = common_filepath_option) -> None: """Import Opengear into GSO.""" _generic_import_product( Path(filepath), ProductType.IMPORTED_OPENGEAR, "opengear", "opengear_hostname", OpenGearImportModel, ) @app.command() def import_iptrunks(filepath: str = common_filepath_option) -> None: """Import IP trunks into GSO.""" successfully_imported_data = [] data = _read_data(Path(filepath)) for trunk in data: ipv4_network_a = ipaddress.IPv4Network(trunk["config"]["nodeA"]["ipv4_address"], strict=False) ipv4_network_b = ipaddress.IPv4Network(trunk["config"]["nodeB"]["ipv4_address"], strict=False) ipv6_network_a = ipaddress.IPv6Network(trunk["config"]["nodeA"]["ipv6_address"], strict=False) ipv6_network_b = ipaddress.IPv6Network(trunk["config"]["nodeB"]["ipv6_address"], strict=False) # Check if IPv4 networks are equal if ipv4_network_a == ipv4_network_b: iptrunk_ipv4_network = ipv4_network_a else: # Handle the case where IPv4 networks are different typer.echo(f"Error: IPv4 networks are different for trunk {trunk["id"]}.") continue # Check if IPv6 networks are the same if ipv6_network_a == ipv6_network_b: iptrunk_ipv6_network = ipv6_network_a else: # Handle the case where IPv6 networks are different typer.echo(f"Error: IPv6 networks are different for trunk {trunk["id"]}.") continue typer.echo( f"Importing IP Trunk: " f'{get_active_subscriptions_by_field_and_value("router_fqdn", trunk["config"]["nodeA"]["name"])}', ) try: initial_data = IptrunkImportModel( partner="GEANT", geant_s_sid=trunk["id"], iptrunk_type=trunk["config"]["common"]["type"], iptrunk_description=trunk["config"]["common"].get("description", ""), iptrunk_speed=trunk["config"]["common"]["link_speed"], iptrunk_minimum_links=trunk["config"]["common"]["minimum_links"], iptrunk_isis_metric=trunk["config"]["common"]["isis_metric"], side_a_node_id=_get_router_subscription_id(trunk["config"]["nodeA"]["name"]) or "", side_a_ae_iface=trunk["config"]["nodeA"]["ae_name"], side_a_ae_geant_a_sid=trunk["config"]["nodeA"]["port_sid"], side_a_ae_members=trunk["config"]["nodeA"]["members"], side_b_node_id=_get_router_subscription_id(trunk["config"]["nodeB"]["name"]) or "", side_b_ae_iface=trunk["config"]["nodeB"]["ae_name"], side_b_ae_geant_a_sid=trunk["config"]["nodeB"]["port_sid"], side_b_ae_members=trunk["config"]["nodeB"]["members"], iptrunk_ipv4_network=iptrunk_ipv4_network, iptrunk_ipv6_network=iptrunk_ipv6_network, ) start_process("create_imported_iptrunk", [initial_data.model_dump()]) successfully_imported_data.append(trunk["id"]) typer.echo(f"Successfully imported IP Trunk: {trunk["id"]}") except ValidationError as e: typer.echo(f"Validation error: {e}") typer.echo("Waiting for the dust to settle before moving on the importing new products...") time.sleep(1) trunk_ids = get_subscriptions( [ProductType.IMPORTED_IP_TRUNK], lifecycles=[SubscriptionLifecycle.ACTIVE], includes=["subscription_id"] ) for subscription_id in trunk_ids: typer.echo(f"Migrating iptrunk {subscription_id}") start_process("import_iptrunk", [subscription_id]) if successfully_imported_data: typer.echo("Successfully imported IP Trunks:") for item in successfully_imported_data: typer.echo(f"- {item}") @app.command() def import_partners(file_path: str = typer.Argument(..., help="Path to the CSV file containing partners")) -> None: """Import partners from a CSV file into the database.""" typer.echo(f"Importing partners from {file_path} ...") partners = _import_partners_from_csv(Path(file_path)) try: for partner in partners: if partner.get("created_at"): partner["created_at"] = datetime.strptime(partner["created_at"], "%Y-%m-%d").replace(tzinfo=UTC) new_partner = PartnerTable(**CreatePartner(**partner).model_dump()) db.session.add(new_partner) db.session.commit() typer.echo(f"Successfully imported {len(partners)} partners.") except SQLAlchemyError as e: db.session.rollback() typer.echo(f"Failed to import partners: {e}") finally: db.session.close()