""":term:`GSO` :term:`API` endpoints that import different types of existing services.""" import ipaddress from typing import Any from uuid import UUID from fastapi import Depends, HTTPException, status from fastapi.routing import APIRouter from orchestrator.security import opa_security_default from orchestrator.services import processes from pydantic import BaseModel, root_validator, validator from pydantic.fields import ModelField from gso.products.product_blocks.iptrunk import IptrunkType, PhyPortCapacity from gso.products.product_blocks.router import RouterRole, RouterVendor from gso.products.product_blocks.site import SiteTier from gso.services import subscriptions from gso.services.crm import CustomerNotFoundError, get_customer_by_name from gso.utils.helpers import ( LAGMember, validate_country_code, validate_ipv4_or_ipv6, validate_site_fields_is_unique, validate_site_name, ) router = APIRouter(prefix="/imports", tags=["Imports"], dependencies=[Depends(opa_security_default)]) class ImportResponseModel(BaseModel): """The model of a response given when services are imported using the :term:`API`.""" pid: UUID detail: str class SiteImportModel(BaseModel): """The required input for importing an existing :class:`gso.products.product_types.site`.""" site_name: str site_city: str site_country: str site_country_code: str site_latitude: float site_longitude: float site_bgp_community_id: int site_internal_id: int site_tier: SiteTier site_ts_address: str customer: str @validator("site_ts_address", allow_reuse=True) def validate_ts_address(cls, site_ts_address: str) -> str: """A terminal server address must be valid.""" validate_site_fields_is_unique("site_ts_address", site_ts_address) validate_ipv4_or_ipv6(site_ts_address) return site_ts_address @validator("site_country_code", allow_reuse=True) def country_code_must_exist(cls, country_code: str) -> str: """A country code must exist.""" validate_country_code(country_code) return country_code @validator("site_internal_id", "site_bgp_community_id", allow_reuse=True) def validate_unique_fields(cls, value: str, field: ModelField) -> str | int: """Validate that the internal side ID and :term:`BGP` community IDs are unique.""" return validate_site_fields_is_unique(field.name, value) @validator("site_name", allow_reuse=True) def site_name_must_be_valid(cls, site_name: str) -> str: """Validate the site name. The site name must consist of three uppercase letters (A-Z) followed by an optional single digit (0-9). """ validate_site_fields_is_unique("site_name", site_name) validate_site_name(site_name) return site_name class RouterImportModel(BaseModel): """Required fields for importing an existing :class:`gso.product.product_types.router`.""" customer: str router_site: str hostname: str ts_port: int router_vendor: RouterVendor router_role: RouterRole is_ias_connected: bool router_lo_ipv4_address: ipaddress.IPv4Address router_lo_ipv6_address: ipaddress.IPv6Address router_lo_iso_address: str router_si_ipv4_network: ipaddress.IPv4Network | None = None router_ias_lt_ipv4_network: ipaddress.IPv4Network | None = None router_ias_lt_ipv6_network: ipaddress.IPv6Network | None = None class IptrunkImportModel(BaseModel): """Required fields for importing an existing :class:`gso.products.product_types.iptrunk`.""" customer: str geant_s_sid: str iptrunk_type: IptrunkType iptrunk_description: str iptrunk_speed: PhyPortCapacity iptrunk_minimum_links: int side_a_node_id: str side_a_ae_iface: str side_a_ae_geant_a_sid: str side_a_ae_members: list[LAGMember] side_b_node_id: str side_b_ae_iface: str side_b_ae_geant_a_sid: str side_b_ae_members: list[LAGMember] iptrunk_ipv4_network: ipaddress.IPv4Network iptrunk_ipv6_network: ipaddress.IPv6Network @classmethod def _get_active_routers(cls) -> set[str]: return { str(router["subscription_id"]) for router in subscriptions.get_active_router_subscriptions(includes=["subscription_id"]) } @validator("customer") def check_if_customer_exists(cls, value: str) -> str: """The customer must exist.""" try: get_customer_by_name(value) except CustomerNotFoundError as e: msg = f"Customer {value} not found" raise ValueError(msg) from e return value @validator("side_a_node_id", "side_b_node_id") def check_if_router_side_is_available(cls, value: str) -> str: """Both sides of the trunk must exist in :term:`GSO`.""" if value not in cls._get_active_routers(): msg = f"Router {value} not found" raise ValueError(msg) return value @validator("side_a_ae_members", "side_b_ae_members") def check_side_uniqueness(cls, value: list[str]) -> list[str]: """:term:`LAG` members must be unique.""" if len(value) != len(set(value)): msg = "Items must be unique" raise ValueError(msg) return value @root_validator def check_members(cls, values: dict[str, Any]) -> dict[str, Any]: """Amount of :term:`LAG` members has to match on side A and B, and meet the minimum requirement.""" min_links = values["iptrunk_minimum_links"] side_a_members = values.get("side_a_ae_members", []) side_b_members = values.get("side_b_ae_members", []) len_a = len(side_a_members) len_b = len(side_b_members) if len_a < min_links: msg = f"Side A members should be at least {min_links} (iptrunk_minimum_links)" raise ValueError(msg) if len_a != len_b: msg = "Mismatch between Side A and B members" raise ValueError(msg) return values def _start_process(process_name: str, data: dict) -> UUID: """Start a process and handle common exceptions.""" pid: UUID = processes.start_process(process_name, [data]) if pid is None: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to start the process.", ) process = processes._get_process(pid) # noqa: SLF001 if process.last_status == "failed": raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Process {pid} failed because of an internal error. {process.failed_reason}", ) return pid @router.post("/sites", status_code=status.HTTP_201_CREATED, response_model=ImportResponseModel) def import_site(site: SiteImportModel) -> dict[str, Any]: """Import a site by running the import_site workflow. :param site: The site information to be imported. :type site: SiteImportModel :return: A dictionary containing the process id of the started process and detail message. :rtype: dict[str, Any] :raises HTTPException: If the site already exists or if there's an error in the process. """ pid = _start_process("import_site", site.dict()) return {"detail": "Site added successfully.", "pid": pid} @router.post("/routers", status_code=status.HTTP_201_CREATED, response_model=ImportResponseModel) def import_router(router_data: RouterImportModel) -> dict[str, Any]: """Import a router by running the import_router workflow. :param router_data: The router information to be imported. :type router_data: RouterImportModel :return: A dictionary containing the process id of the started process and detail message. :rtype: dict[str, Any] :raises HTTPException: If there's an error in the process. """ pid = _start_process("import_router", router_data.dict()) return {"detail": "Router added successfully", "pid": pid} @router.post("/iptrunks", status_code=status.HTTP_201_CREATED, response_model=ImportResponseModel) def import_iptrunk(iptrunk_data: IptrunkImportModel) -> dict[str, Any]: """Import an iptrunk by running the import_iptrunk workflow. :param iptrunk_data: The iptrunk information to be imported. :type iptrunk_data: IptrunkImportModel :return: A dictionary containing the process id of the started process and detail message. :rtype: dict[str, Any] :raises HTTPException: If there's an error in the process. """ pid = _start_process("import_iptrunk", iptrunk_data.dict()) return {"detail": "Iptrunk added successfully", "pid": pid}