Newer
Older
""":term:`GSO` :term:`API` endpoints that import different types of existing services."""
import ipaddress
from fastapi import Depends, HTTPException, status
from orchestrator.services import processes
from pydantic import BaseModel, root_validator, validator

Mohammad Torkashvand
committed
from gso.auth.security import opa_security_default
from gso.products.product_blocks.iptrunk import IptrunkType, PhysicalPortCapacity
from gso.products.product_blocks.router import RouterRole
from gso.products.product_blocks.site import SiteTier
from gso.services import subscriptions
from gso.services.partners import PartnerNotFoundError, get_partner_by_name
from gso.utils.helpers import BaseSiteValidatorModel, LAGMember
from gso.utils.shared_enums import PortNumber, Vendor
router = APIRouter(prefix="/imports", tags=["Imports"], dependencies=[Depends(opa_security_default)])
class ImportResponseModel(BaseModel):
"""The model of a response given when services are imported using the :term:`API`."""
pid: UUID
detail: str
"""The required input for importing an existing :class:`gso.products.product_types.site`."""
site_name: str
site_city: str
site_country: str
site_country_code: str
site_latitude: float
site_longitude: float
site_bgp_community_id: int
site_internal_id: int
site_tier: SiteTier
site_ts_address: str
class RouterImportModel(BaseModel):
"""Required fields for importing an existing :class:`gso.product.product_types.router`."""
router_site: str
hostname: str
ts_port: int
router_role: RouterRole
router_lo_ipv4_address: ipaddress.IPv4Address
router_lo_ipv6_address: ipaddress.IPv6Address
router_lo_iso_address: str
class IptrunkImportModel(BaseModel):
"""Required fields for importing an existing :class:`gso.products.product_types.iptrunk`."""
geant_s_sid: str
iptrunk_type: IptrunkType
iptrunk_description: str
iptrunk_minimum_links: int
iptrunk_isis_metric: int
side_a_node_id: str
side_a_ae_iface: str
side_a_ae_geant_a_sid: str
side_a_ae_members: list[LAGMember]
side_b_node_id: str
side_b_ae_iface: str
side_b_ae_geant_a_sid: str
side_b_ae_members: list[LAGMember]
iptrunk_ipv4_network: ipaddress.IPv4Network
iptrunk_ipv6_network: ipaddress.IPv6Network
@classmethod
def _get_active_routers(cls) -> set[str]:
return {
str(router["subscription_id"])
for router in subscriptions.get_active_router_subscriptions(includes=["subscription_id"])
@validator("partner")
def check_if_partner_exists(cls, value: str) -> str:
"""Validate that the partner exists."""
get_partner_by_name(value)
except PartnerNotFoundError as e:
msg = f"partner {value} not found"
Karel van Klink
committed
raise ValueError(msg) from e
return value
@validator("side_a_node_id", "side_b_node_id")
def check_if_router_side_is_available(cls, value: str) -> str:
"""Both sides of the trunk must exist in :term:`GSO`."""
if value not in cls._get_active_routers():
Karel van Klink
committed
msg = f"Router {value} not found"
raise ValueError(msg)
return value
@validator("side_a_ae_members", "side_b_ae_members")
def check_side_uniqueness(cls, value: list[str]) -> list[str]:
""":term:`LAG` members must be unique."""
if len(value) != len(set(value)):
Karel van Klink
committed
msg = "Items must be unique"
raise ValueError(msg)
return value
@root_validator
def check_members(cls, values: dict[str, Any]) -> dict[str, Any]:
"""Amount of :term:`LAG` members has to match on side A and B, and meet the minimum requirement."""
min_links = values["iptrunk_minimum_links"]
side_a_members = values.get("side_a_ae_members", [])
side_b_members = values.get("side_b_ae_members", [])
len_a = len(side_a_members)
len_b = len(side_b_members)
if len_a < min_links:
Karel van Klink
committed
msg = f"Side A members should be at least {min_links} (iptrunk_minimum_links)"
raise ValueError(msg)
if len_a != len_b:
Karel van Klink
committed
msg = "Mismatch between Side A and B members"
raise ValueError(msg)
return values
class SuperPopSwitchImportModel(BaseModel):
"""Required fields for importing an existing :class:`gso.product.product_types.super_pop_switch`."""
super_pop_switch_site: str
hostname: str
super_pop_switch_ts_port: PortNumber
super_pop_switch_mgmt_ipv4_address: ipaddress.IPv4Address
class OfficeRouterImportModel(BaseModel):
"""Required fields for importing an existing :class:`gso.product.product_types.office_router`."""
office_router_site: str
office_router_fqdn: str
office_router_ts_port: PortNumber
office_router_lo_ipv4_address: ipaddress.IPv4Address
office_router_lo_ipv6_address: ipaddress.IPv6Address
def _start_process(process_name: str, data: dict) -> UUID:
"""Start a process and handle common exceptions."""
pid: UUID = processes.start_process(process_name, [data])
Karel van Klink
committed
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to start the process.",
)
process = processes._get_process(pid) # noqa: SLF001
if process.last_status == "failed":
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Process {pid} failed because of an internal error. {process.failed_reason}",
)
return pid
@router.post("/sites", status_code=status.HTTP_201_CREATED, response_model=ImportResponseModel)
def import_site(site: SiteImportModel) -> dict[str, Any]:
"""Import a site by running the import_site workflow.
:param site: The site information to be imported.
:type site: SiteImportModel
:return: A dictionary containing the process id of the started process and detail message.
:rtype: dict[str, Any]
:raises HTTPException: If the site already exists or if there's an error in the process.
pid = _start_process("import_site", site.dict())
return {"detail": "Site added successfully.", "pid": pid}
@router.post("/routers", status_code=status.HTTP_201_CREATED, response_model=ImportResponseModel)
def import_router(router_data: RouterImportModel) -> dict[str, Any]:
"""Import a router by running the import_router workflow.
:param router_data: The router information to be imported.
:type router_data: RouterImportModel
:return: A dictionary containing the process id of the started process and detail message.
:rtype: dict[str, Any]
:raises HTTPException: If there's an error in the process.
pid = _start_process("import_router", router_data.dict())
return {"detail": "Router has been added successfully", "pid": pid}
@router.post("/iptrunks", status_code=status.HTTP_201_CREATED, response_model=ImportResponseModel)
def import_iptrunk(iptrunk_data: IptrunkImportModel) -> dict[str, Any]:
"""Import an iptrunk by running the import_iptrunk workflow.
:param iptrunk_data: The iptrunk information to be imported.
:type iptrunk_data: IptrunkImportModel
:return: A dictionary containing the process id of the started process and detail message.
:rtype: dict[str, Any]
:raises HTTPException: If there's an error in the process.
"""
pid = _start_process("import_iptrunk", iptrunk_data.dict())
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
return {"detail": "Iptrunk has been added successfully", "pid": pid}
@router.post("/super-pop-switches", status_code=status.HTTP_201_CREATED, response_model=ImportResponseModel)
def import_super_pop_switch(super_pop_switch_data: SuperPopSwitchImportModel) -> dict[str, Any]:
"""Import a Super PoP switch by running the import_super_pop_switch workflow.
:param super_pop_switch_data: The Super PoP switch information to be imported.
:type super_pop_switch_data: SuperPopSwitchImportModel
:return: A dictionary containing the process id of the started process and detail message.
:rtype: dict[str, Any]
:raises HTTPException: If there's an error in the process.
"""
pid = _start_process("import_super_pop_switch", super_pop_switch_data.dict())
return {"detail": "Super PoP switch has been added successfully", "pid": pid}
@router.post("/office-routers", status_code=status.HTTP_201_CREATED, response_model=ImportResponseModel)
def import_office_router(office_router_data: OfficeRouterImportModel) -> dict[str, Any]:
"""Import a office router by running the import_office_router workflow.
:param office_router_data: The office router information to be imported.
:type office_router_data: OfficeRouterImportModel
:return: A dictionary containing the process id of the started process and detail message.
:rtype: dict[str, Any]
:raises HTTPException: If there's an error in the process.
"""
pid = _start_process("import_office_router", office_router_data.dict())
return {"detail": "Office router has been added successfully", "pid": pid}