import ipaddress from typing import Any from uuid import UUID import re from fastapi import Depends, HTTPException, status from fastapi.routing import APIRouter from orchestrator.security import opa_security_default from orchestrator.services import processes from orchestrator.services import subscriptions as wfo_subscriptions from pydantic import BaseModel, root_validator, validator from sqlalchemy.exc import MultipleResultsFound from gso.products.product_blocks.iptrunk import IptrunkType, PhyPortCapacity from gso.products.product_blocks.router import RouterRole, RouterVendor from gso.products.product_blocks.site import SiteTier from gso.services import subscriptions from gso.services.crm import CustomerNotFoundError, get_customer_by_name from gso.utils.helpers import LAGMember router = APIRouter(prefix="/imports", tags=["Imports"], dependencies=[Depends(opa_security_default)]) class ImportResponseModel(BaseModel): pid: UUID detail: str class SiteImportModel(BaseModel): site_name: str site_city: str site_country: str site_country_code: str site_latitude: float site_longitude: float site_bgp_community_id: int site_internal_id: int site_tier: SiteTier site_ts_address: str customer: str @validator("site_name", allow_reuse=True) def site_name_must_be_valid(cls, site_name: str) -> str: # Accept 3 chapital letters and only one ditigt after capital letters. pattern = re.compile(r"^[A-Z]{3}[0-9]?$") if not bool(pattern.match(site_name)): raise ValueError(f"Enter a valid site name similar looks like AMS, AMS1or LON9. Get: {site_name}") return site_name class RouterImportModel(BaseModel): customer: str router_site: str hostname: str ts_port: int router_vendor: RouterVendor router_role: RouterRole is_ias_connected: bool router_lo_ipv4_address: ipaddress.IPv4Address router_lo_ipv6_address: ipaddress.IPv6Address router_lo_iso_address: str router_si_ipv4_network: ipaddress.IPv4Network | None = None router_ias_lt_ipv4_network: ipaddress.IPv4Network | None = None router_ias_lt_ipv6_network: ipaddress.IPv6Network | None = None class IptrunkImportModel(BaseModel): customer: str geant_s_sid: str iptrunk_type: IptrunkType iptrunk_description: str iptrunk_speed: PhyPortCapacity iptrunk_minimum_links: int side_a_node_id: str side_a_ae_iface: str side_a_ae_geant_a_sid: str side_a_ae_members: list[LAGMember] side_b_node_id: str side_b_ae_iface: str side_b_ae_geant_a_sid: str side_b_ae_members: list[LAGMember] iptrunk_ipv4_network: ipaddress.IPv4Network iptrunk_ipv6_network: ipaddress.IPv6Network @classmethod def _get_active_routers(cls) -> set[str]: return { str(router["subscription_id"]) for router in subscriptions.get_active_router_subscriptions(includes=["subscription_id"]) } @validator("customer") def check_if_customer_exists(cls, value: str) -> str: try: get_customer_by_name(value) except CustomerNotFoundError: raise ValueError(f"Customer {value} not found") return value @validator("side_a_node_id", "side_b_node_id") def check_if_router_side_is_available(cls, value: str) -> str: if value not in cls._get_active_routers(): raise ValueError(f"Router {value} not found") return value @validator("side_a_ae_members", "side_b_ae_members") def check_side_uniqueness(cls, value: list[str]) -> list[str]: if len(value) != len(set(value)): raise ValueError("Items must be unique") return value @root_validator def check_members(cls, values: dict[str, Any]) -> dict[str, Any]: min_links = values["iptrunk_minimum_links"] side_a_members = values.get("side_a_ae_members", []) side_b_members = values.get("side_b_ae_members", []) len_a = len(side_a_members) len_b = len(side_b_members) if len_a < min_links: raise ValueError(f"Side A members should be at least {min_links} (iptrunk_minimum_links)") if len_a != len_b: raise ValueError("Mismatch between Side A and B members") return values def _start_process(process_name: str, data: dict) -> UUID: """Start a process and handle common exceptions.""" pid: UUID = processes.start_process(process_name, [data]) if pid is None: raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to start the process.") process = processes._get_process(pid) if process.last_status == "failed": raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Process {pid} failed because of an internal error. {process.failed_reason}", ) return pid @router.post("/sites", status_code=status.HTTP_201_CREATED, response_model=ImportResponseModel) def import_site(site: SiteImportModel) -> dict[str, Any]: """Import a site by running the import_site workflow. :param site: The site information to be imported. :type site: SiteImportModel :return: A dictionary containing the process id of the started process and detail message. :rtype: dict[str, Any] :raises HTTPException: If the site already exists or if there's an error in the process. """ try: subscription = wfo_subscriptions.retrieve_subscription_by_subscription_instance_value( resource_type="site_name", value=site.site_name, sub_status=("provisioning", "active") ) if subscription: raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Site already exists.") except MultipleResultsFound: raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Multiple subscriptions found.") pid = _start_process("import_site", site.dict()) return {"detail": "Site added successfully.", "pid": pid} @router.post("/routers", status_code=status.HTTP_201_CREATED, response_model=ImportResponseModel) def import_router(router_data: RouterImportModel) -> dict[str, Any]: """Import a router by running the import_router workflow. :param router_data: The router information to be imported. :type router_data: RouterImportModel :return: A dictionary containing the process id of the started process and detail message. :rtype: dict[str, Any] :raises HTTPException: If there's an error in the process. """ pid = _start_process("import_router", router_data.dict()) return {"detail": "Router added successfully", "pid": pid} @router.post("/iptrunks", status_code=status.HTTP_201_CREATED, response_model=ImportResponseModel) def import_iptrunk(iptrunk_data: IptrunkImportModel) -> dict[str, Any]: """Import an iptrunk by running the import_iptrunk workflow. :param iptrunk_data: The iptrunk information to be imported. :type iptrunk_data: IptrunkImportModel :return: A dictionary containing the process id of the started process and detail message. :rtype: dict[str, Any] :raises HTTPException: If there's an error in the process. """ pid = _start_process("import_iptrunk", iptrunk_data.dict()) return {"detail": "Iptrunk added successfully", "pid": pid}