Skip to content
Snippets Groups Projects
imports.py 3.65 KiB
import ipaddress
from typing import Any, Dict, Optional
from uuid import UUID

from fastapi import Depends, HTTPException, status
from fastapi.routing import APIRouter
from orchestrator.security import opa_security_default
from orchestrator.services import processes, subscriptions
from pydantic import BaseModel
from sqlalchemy.exc import MultipleResultsFound

from gso.products.product_blocks.router import RouterRole, RouterVendor
from gso.products.product_blocks.site import SiteTier

router = APIRouter(prefix="/imports", tags=["Import"], dependencies=[Depends(opa_security_default)])


def start_process(process_name: str, data: dict) -> UUID:
    """Start a process and handle common exceptions."""

    pid: UUID = processes.start_process(process_name, [data])
    if pid is None:
        raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to start the process.")

    process = processes._get_process(pid)
    if process.last_status == "failed":
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail=f"Process {pid} failed because of an internal error. {process.failed_reason}",
        )

    return pid


class SiteImport(BaseModel):
    site_name: str
    site_city: str
    site_country: str
    site_country_code: str
    site_latitude: float
    site_longitude: float
    site_bgp_community_id: int
    site_internal_id: int
    site_tier: SiteTier
    site_ts_address: str
    customer: str


@router.post("/sites", status_code=status.HTTP_201_CREATED)
def import_site(site: SiteImport) -> Dict[str, Any]:
    """Import site by running the import_site workflow.

    Args:
    ----
    site (SiteImport): The site information to be imported.

    Returns:
    -------
    dict: A dictionary containing the process id of the started process and detail message.

    Raises:
    ------
    HTTPException: If the site already exists or if there's an error in the process.
    """
    try:
        subscription = subscriptions.retrieve_subscription_by_subscription_instance_value(
            resource_type="site_name", value=site.site_name, sub_status=("provisioning", "active")
        )
        if subscription:
            raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Site already exists.")
    except MultipleResultsFound:
        raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Multiple subscriptions found.")

    pid = start_process("import_site", site.dict())
    return {"detail": "Site added successfully.", "pid": pid}


class RouterImportModel(BaseModel):
    customer: str
    router_site: str
    hostname: str
    ts_port: int
    router_vendor: RouterVendor
    router_role: RouterRole
    is_ias_connected: bool
    router_lo_ipv4_address: ipaddress.IPv4Address
    router_lo_ipv6_address: ipaddress.IPv6Address
    router_lo_iso_address: str
    router_si_ipv4_network: Optional[ipaddress.IPv4Network] = None
    router_ias_lt_ipv4_network: Optional[ipaddress.IPv4Network] = None
    router_ias_lt_ipv6_network: Optional[ipaddress.IPv6Network] = None


@router.post("/routers", status_code=status.HTTP_201_CREATED)
def import_router(router_data: RouterImportModel) -> Dict[str, Any]:
    """Import a router by running the import_router workflow.

    Args:
    ----
    router_data (RouterImportModel): The router information to be imported.

    Returns:
    -------
    dict: A dictionary containing the process id of the started process and detail message.

    Raises:
    ------
    HTTPException: If there's an error in the process.
    """

    pid = start_process("import_router", router_data.dict())
    return {"detail": "Router added successfully", "pid": pid}