Skip to content
Snippets Groups Projects
imports.py 6.97 KiB
Newer Older
from typing import Any
from uuid import UUID

from fastapi import Depends, HTTPException, status
from fastapi.routing import APIRouter
from orchestrator.security import opa_security_default
from orchestrator.services import processes
from orchestrator.services import subscriptions as wfo_subscriptions
from pydantic import BaseModel, root_validator, validator
from sqlalchemy.exc import MultipleResultsFound

from gso.products.product_blocks.iptrunk import IptrunkType, PhyPortCapacity
from gso.products.product_blocks.router import RouterRole, RouterVendor
from gso.products.product_blocks.site import SiteTier
from gso.services import subscriptions
from gso.services.crm import CustomerNotFoundError, get_customer_by_name
from gso.utils.helpers import LAGMember, validate_site_name
router = APIRouter(prefix="/imports", tags=["Imports"], dependencies=[Depends(opa_security_default)])
class ImportResponseModel(BaseModel):
    pid: UUID
    detail: str


class SiteImportModel(BaseModel):
    site_name: str
    site_city: str
    site_country: str
    site_country_code: str
    site_latitude: float
    site_longitude: float
    site_bgp_community_id: int
    site_internal_id: int
    site_tier: SiteTier
    site_ts_address: str
    customer: str

    @validator("site_name", allow_reuse=True)
    def site_name_must_be_valid(cls, site_name: str) -> str:
        return validate_site_name(site_name)

class RouterImportModel(BaseModel):
    customer: str
    router_site: str
    hostname: str
    ts_port: int
    router_vendor: RouterVendor
    router_role: RouterRole
    is_ias_connected: bool
    router_lo_ipv4_address: ipaddress.IPv4Address
    router_lo_ipv6_address: ipaddress.IPv6Address
    router_lo_iso_address: str
    router_si_ipv4_network: ipaddress.IPv4Network | None = None
    router_ias_lt_ipv4_network: ipaddress.IPv4Network | None = None
    router_ias_lt_ipv6_network: ipaddress.IPv6Network | None = None


class IptrunkImportModel(BaseModel):
    customer: str
    geant_s_sid: str
    iptrunk_type: IptrunkType
    iptrunk_description: str
    iptrunk_speed: PhyPortCapacity
    iptrunk_minimum_links: int
    side_a_node_id: str
    side_a_ae_iface: str
    side_a_ae_geant_a_sid: str
    side_a_ae_members: list[LAGMember]
    side_b_node_id: str
    side_b_ae_iface: str
    side_b_ae_geant_a_sid: str
    side_b_ae_members: list[LAGMember]

    iptrunk_ipv4_network: ipaddress.IPv4Network
    iptrunk_ipv6_network: ipaddress.IPv6Network

    @classmethod
    def _get_active_routers(cls) -> set[str]:
        return {
            str(router["subscription_id"])
            for router in subscriptions.get_active_router_subscriptions(includes=["subscription_id"])
        }

    @validator("customer")
    def check_if_customer_exists(cls, value: str) -> str:
        try:
            get_customer_by_name(value)
        except CustomerNotFoundError:
            raise ValueError(f"Customer {value} not found")

        return value

    @validator("side_a_node_id", "side_b_node_id")
    def check_if_router_side_is_available(cls, value: str) -> str:
        if value not in cls._get_active_routers():
            raise ValueError(f"Router {value} not found")

        return value

    @validator("side_a_ae_members", "side_b_ae_members")
    def check_side_uniqueness(cls, value: list[str]) -> list[str]:
        if len(value) != len(set(value)):
            raise ValueError("Items must be unique")

        return value

    @root_validator
    def check_members(cls, values: dict[str, Any]) -> dict[str, Any]:
        min_links = values["iptrunk_minimum_links"]
        side_a_members = values.get("side_a_ae_members", [])
        side_b_members = values.get("side_b_ae_members", [])

        len_a = len(side_a_members)
        len_b = len(side_b_members)

        if len_a < min_links:
            raise ValueError(f"Side A members should be at least {min_links} (iptrunk_minimum_links)")

        if len_a != len_b:
            raise ValueError("Mismatch between Side A and B members")

        return values


def _start_process(process_name: str, data: dict) -> UUID:
Neda Moeini's avatar
Neda Moeini committed
    """Start a process and handle common exceptions."""
Neda Moeini's avatar
Neda Moeini committed
    pid: UUID = processes.start_process(process_name, [data])
    if pid is None:
        raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to start the process.")

Neda Moeini's avatar
Neda Moeini committed
    process = processes._get_process(pid)
    if process.last_status == "failed":
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail=f"Process {pid} failed because of an internal error. {process.failed_reason}",
        )

    return pid


@router.post("/sites", status_code=status.HTTP_201_CREATED, response_model=ImportResponseModel)
def import_site(site: SiteImportModel) -> dict[str, Any]:
    """Import a site by running the import_site workflow.
    :param site: The site information to be imported.
    :type site: SiteImportModel
    :return: A dictionary containing the process id of the started process and detail message.
    :rtype: dict[str, Any]
    :raises HTTPException: If the site already exists or if there's an error in the process.
        subscription = wfo_subscriptions.retrieve_subscription_by_subscription_instance_value(
            resource_type="site_name", value=site.site_name, sub_status=("provisioning", "active")
        )
        if subscription:
            raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Site already exists.")
    except MultipleResultsFound:
        raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Multiple subscriptions found.")

    pid = _start_process("import_site", site.dict())
    return {"detail": "Site added successfully.", "pid": pid}
@router.post("/routers", status_code=status.HTTP_201_CREATED, response_model=ImportResponseModel)
def import_router(router_data: RouterImportModel) -> dict[str, Any]:
Neda Moeini's avatar
Neda Moeini committed
    """Import a router by running the import_router workflow.

    :param router_data: The router information to be imported.
    :type router_data: RouterImportModel
    :return: A dictionary containing the process id of the started process and detail message.
    :rtype: dict[str, Any]
    :raises HTTPException: If there's an error in the process.
    pid = _start_process("import_router", router_data.dict())
Neda Moeini's avatar
Neda Moeini committed
    return {"detail": "Router added successfully", "pid": pid}


@router.post("/iptrunks", status_code=status.HTTP_201_CREATED, response_model=ImportResponseModel)
def import_iptrunk(iptrunk_data: IptrunkImportModel) -> dict[str, Any]:
    """Import an iptrunk by running the import_iptrunk workflow.

    :param iptrunk_data: The iptrunk information to be imported.
    :type iptrunk_data: IptrunkImportModel
    :return: A dictionary containing the process id of the started process and detail message.
    :rtype: dict[str, Any]
    :raises HTTPException: If there's an error in the process.
    """

    pid = _start_process("import_iptrunk", iptrunk_data.dict())
    return {"detail": "Iptrunk added successfully", "pid": pid}