Skip to content
Snippets Groups Projects
Verified Commit dd0e3d38 authored by Karel van Klink's avatar Karel van Klink :smiley_cat:
Browse files

Remodel the import module of the API

parent 6a847ed2
Branches
Tags
1 merge request!201Add imported products
Showing with 338 additions and 12 deletions
"""Helper methods for the :term:`API` module."""
from uuid import UUID
from fastapi import HTTPException
from orchestrator.services import processes
from pydantic import BaseModel
from starlette import status
def _start_process(process_name: str, data: dict) -> UUID:
"""Start a process and handle common exceptions."""
pid: UUID = processes.start_process(process_name, [data])
if pid is None:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to start the process.",
)
process = processes._get_process(pid) # noqa: SLF001
if process.last_status == "failed":
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Process {pid} failed because of an internal error. {process.failed_reason}",
)
return pid
class ImportResponseModel(BaseModel):
"""The model of a response given when services are imported using the :term:`API`."""
pid: UUID
detail: str
......@@ -2,7 +2,7 @@
from fastapi import APIRouter
from gso.api.v1.imports import router as imports_router
from gso.api.v1.imports import api_router as imports_router
from gso.api.v1.network import router as network_router
from gso.api.v1.processes import router as processes_router
from gso.api.v1.subscriptions import router as subscriptions_router
......
""":term:`API` routes for adding existing products to the database."""
from fastapi import Depends
from fastapi.routing import APIRouter
from gso.api.v1.imports.iptrunk import router as iptrunk_router
from gso.api.v1.imports.office_router import router as office_router_router
from gso.api.v1.imports.router import router as router_router
from gso.api.v1.imports.site import router as site_router
from gso.api.v1.imports.super_pop_switch import router as super_pop_switch_router
from gso.auth.security import opa_security_default
api_router = APIRouter(prefix="/imports", tags=["Imports"], dependencies=[Depends(opa_security_default)])
api_router.include_router(iptrunk_router)
api_router.include_router(office_router_router)
api_router.include_router(router_router)
api_router.include_router(site_router)
api_router.include_router(super_pop_switch_router)
""":term:`GSO` :term:`API` endpoints that import different types of existing services."""
""":term:`API` endpoints for importing IP trunks."""
import ipaddress
from typing import Any
from uuid import UUID
from fastapi import Depends, HTTPException, status
from fastapi.routing import APIRouter
from orchestrator.services import processes
from fastapi import APIRouter, status
from pydantic import BaseModel, root_validator, validator
from gso.auth.security import opa_security_default
from gso.api.helpers import ImportResponseModel, _start_process
from gso.products.product_blocks.iptrunk import IptrunkType, PhysicalPortCapacity
from gso.products.product_blocks.router import RouterRole
from gso.products.product_blocks.site import SiteTier
from gso.services import subscriptions
from gso.services.partners import PartnerNotFoundError, get_partner_by_name
from gso.utils.helpers import BaseSiteValidatorModel, LAGMember
from gso.utils.shared_enums import PortNumber, Vendor
from gso.utils.helpers import LAGMember
router = APIRouter(prefix="/imports", tags=["Imports"], dependencies=[Depends(opa_security_default)])
class ImportResponseModel(BaseModel):
"""The model of a response given when services are imported using the :term:`API`."""
pid: UUID
detail: str
class SiteImportModel(BaseSiteValidatorModel):
"""The required input for importing an existing :class:`gso.products.product_types.site`."""
site_name: str
site_city: str
site_country: str
site_country_code: str
site_latitude: float
site_longitude: float
site_bgp_community_id: int
site_internal_id: int
site_tier: SiteTier
site_ts_address: str
partner: str
class RouterImportModel(BaseModel):
"""Required fields for importing an existing :class:`gso.product.product_types.router`."""
partner: str
router_site: str
hostname: str
ts_port: int
router_vendor: Vendor
router_role: RouterRole
router_lo_ipv4_address: ipaddress.IPv4Address
router_lo_ipv6_address: ipaddress.IPv6Address
router_lo_iso_address: str
router = APIRouter()
class IptrunkImportModel(BaseModel):
......@@ -137,78 +94,6 @@ class IptrunkImportModel(BaseModel):
return values
class SuperPopSwitchImportModel(BaseModel):
"""Required fields for importing an existing :class:`gso.product.product_types.super_pop_switch`."""
partner: str
super_pop_switch_site: str
hostname: str
super_pop_switch_ts_port: PortNumber
super_pop_switch_mgmt_ipv4_address: ipaddress.IPv4Address
class OfficeRouterImportModel(BaseModel):
"""Required fields for importing an existing :class:`gso.product.product_types.office_router`."""
partner: str
office_router_site: str
office_router_fqdn: str
office_router_ts_port: PortNumber
office_router_lo_ipv4_address: ipaddress.IPv4Address
office_router_lo_ipv6_address: ipaddress.IPv6Address
def _start_process(process_name: str, data: dict) -> UUID:
"""Start a process and handle common exceptions."""
pid: UUID = processes.start_process(process_name, [data])
if pid is None:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to start the process.",
)
process = processes._get_process(pid) # noqa: SLF001
if process.last_status == "failed":
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Process {pid} failed because of an internal error. {process.failed_reason}",
)
return pid
@router.post("/sites", status_code=status.HTTP_201_CREATED, response_model=ImportResponseModel)
def create_imported_site(site: SiteImportModel) -> dict[str, Any]:
"""Import a site by running the create_imported_site workflow.
:param site: The site information to be imported.
:type site: SiteImportModel
:return: A dictionary containing the process id of the started process and detail message.
:rtype: dict[str, Any]
:raises HTTPException: If the site already exists or if there's an error in the process.
"""
pid = _start_process("create_imported_site", site.dict())
return {"detail": "Site added successfully.", "pid": pid}
@router.post("/routers", status_code=status.HTTP_201_CREATED, response_model=ImportResponseModel)
def create_imported_router(router_data: RouterImportModel) -> dict[str, Any]:
"""Import a router by running the create_imported_router workflow.
:param router_data: The router information to be imported.
:type router_data: RouterImportModel
:return: A dictionary containing the process id of the started process and detail message.
:rtype: dict[str, Any]
:raises HTTPException: If there's an error in the process.
"""
pid = _start_process("create_imported_router", router_data.dict())
return {"detail": "Router has been added successfully", "pid": pid}
@router.post("/iptrunks", status_code=status.HTTP_201_CREATED, response_model=ImportResponseModel)
def create_imported_iptrunk(iptrunk_data: IptrunkImportModel) -> dict[str, Any]:
"""Import an iptrunk by running the create_imported_iptrunk workflow.
......@@ -223,35 +108,3 @@ def create_imported_iptrunk(iptrunk_data: IptrunkImportModel) -> dict[str, Any]:
"""
pid = _start_process("create_imported_iptrunk", iptrunk_data.dict())
return {"detail": "Iptrunk has been added successfully", "pid": pid}
@router.post("/super-pop-switches", status_code=status.HTTP_201_CREATED, response_model=ImportResponseModel)
def create_imported_super_pop_switch(super_pop_switch_data: SuperPopSwitchImportModel) -> dict[str, Any]:
"""Import a Super PoP switch by running the create_imported_super_pop_switch workflow.
:param super_pop_switch_data: The Super PoP switch information to be imported.
:type super_pop_switch_data: SuperPopSwitchImportModel
:return: A dictionary containing the process id of the started process and detail message.
:rtype: dict[str, Any]
:raises HTTPException: If there's an error in the process.
"""
pid = _start_process("create_imported_super_pop_switch", super_pop_switch_data.dict())
return {"detail": "Super PoP switch has been added successfully", "pid": pid}
@router.post("/office-routers", status_code=status.HTTP_201_CREATED, response_model=ImportResponseModel)
def create_imported_office_router(office_router_data: OfficeRouterImportModel) -> dict[str, Any]:
"""Import an office router by running the create_imported_office_router workflow.
:param office_router_data: The office router information to be imported.
:type office_router_data: OfficeRouterImportModel
:return: A dictionary containing the process id of the started process and detail message.
:rtype: dict[str, Any]
:raises HTTPException: If there's an error in the process.
"""
pid = _start_process("create_imported_office_router", office_router_data.dict())
return {"detail": "Office router has been added successfully", "pid": pid}
""":term:`API` endpoints for importing office Routers."""
import ipaddress
from typing import Any
from fastapi import APIRouter, status
from pydantic import BaseModel
from gso.api.helpers import ImportResponseModel, _start_process
from gso.utils.shared_enums import PortNumber
router = APIRouter()
class OfficeRouterImportModel(BaseModel):
"""Required fields for importing an existing :class:`gso.product.product_types.office_router`."""
partner: str
office_router_site: str
office_router_fqdn: str
office_router_ts_port: PortNumber
office_router_lo_ipv4_address: ipaddress.IPv4Address
office_router_lo_ipv6_address: ipaddress.IPv6Address
@router.post("/office-routers", status_code=status.HTTP_201_CREATED, response_model=ImportResponseModel)
def create_imported_office_router(office_router_data: OfficeRouterImportModel) -> dict[str, Any]:
"""Import an office router by running the create_imported_office_router workflow.
:param office_router_data: The office router information to be imported.
:type office_router_data: OfficeRouterImportModel
:return: A dictionary containing the process id of the started process and detail message.
:rtype: dict[str, Any]
:raises HTTPException: If there's an error in the process.
"""
pid = _start_process("create_imported_office_router", office_router_data.dict())
return {"detail": "Office router has been added successfully", "pid": pid}
""":term:`API` endpoints for importing Routers."""
import ipaddress
from typing import Any
from fastapi import APIRouter, status
from pydantic import BaseModel
from gso.api.helpers import ImportResponseModel, _start_process
from gso.products.product_blocks.router import RouterRole
from gso.utils.shared_enums import Vendor
router = APIRouter()
class RouterImportModel(BaseModel):
"""Required fields for importing an existing :class:`gso.product.product_types.router`."""
partner: str
router_site: str
hostname: str
ts_port: int
router_vendor: Vendor
router_role: RouterRole
router_lo_ipv4_address: ipaddress.IPv4Address
router_lo_ipv6_address: ipaddress.IPv6Address
router_lo_iso_address: str
@router.post("/routers", status_code=status.HTTP_201_CREATED, response_model=ImportResponseModel)
def create_imported_router(router_data: RouterImportModel) -> dict[str, Any]:
"""Import a router by running the create_imported_router workflow.
:param router_data: The router information to be imported.
:type router_data: RouterImportModel
:return: A dictionary containing the process id of the started process and detail message.
:rtype: dict[str, Any]
:raises HTTPException: If there's an error in the process.
"""
pid = _start_process("create_imported_router", router_data.dict())
return {"detail": "Router has been added successfully", "pid": pid}
""":term:`API` endpoints for importing Sites."""
from typing import Any
from fastapi import APIRouter, status
from gso.api.helpers import ImportResponseModel, _start_process
from gso.products.product_blocks.site import SiteTier
from gso.utils.helpers import BaseSiteValidatorModel
router = APIRouter()
class SiteImportModel(BaseSiteValidatorModel):
"""The required input for importing an existing :class:`gso.products.product_types.site`."""
site_name: str
site_city: str
site_country: str
site_country_code: str
site_latitude: float
site_longitude: float
site_bgp_community_id: int
site_internal_id: int
site_tier: SiteTier
site_ts_address: str
partner: str
@router.post("/sites", status_code=status.HTTP_201_CREATED, response_model=ImportResponseModel)
def create_imported_site(site: SiteImportModel) -> dict[str, Any]:
"""Import a site by running the create_imported_site workflow.
:param site: The site information to be imported.
:type site: SiteImportModel
:return: A dictionary containing the process id of the started process and detail message.
:rtype: dict[str, Any]
:raises HTTPException: If the site already exists or if there's an error in the process.
"""
pid = _start_process("create_imported_site", site.dict())
return {"detail": "Site added successfully.", "pid": pid}
""":term:`API` endpoints for importing Super PoP switches."""
import ipaddress
from typing import Any
from fastapi import APIRouter, status
from pydantic import BaseModel
from gso.api.helpers import ImportResponseModel, _start_process
from gso.utils.shared_enums import PortNumber
router = APIRouter()
class SuperPopSwitchImportModel(BaseModel):
"""Required fields for importing an existing :class:`gso.product.product_types.super_pop_switch`."""
partner: str
super_pop_switch_site: str
hostname: str
super_pop_switch_ts_port: PortNumber
super_pop_switch_mgmt_ipv4_address: ipaddress.IPv4Address
@router.post("/super-pop-switches", status_code=status.HTTP_201_CREATED, response_model=ImportResponseModel)
def create_imported_super_pop_switch(super_pop_switch_data: SuperPopSwitchImportModel) -> dict[str, Any]:
"""Import a Super PoP switch by running the create_imported_super_pop_switch workflow.
:param super_pop_switch_data: The Super PoP switch information to be imported.
:type super_pop_switch_data: SuperPopSwitchImportModel
:return: A dictionary containing the process id of the started process and detail message.
:rtype: dict[str, Any]
:raises HTTPException: If there's an error in the process.
"""
pid = _start_process("create_imported_super_pop_switch", super_pop_switch_data.dict())
return {"detail": "Super PoP switch has been added successfully", "pid": pid}
......@@ -16,7 +16,7 @@ def import_iptrunk_subscription(subscription_id: UUIDstr) -> State:
"""Take an ImportedIptrunk subscription, and turn it into an Iptrunk subscription."""
old_iptrunk = ImportedIptrunk.from_subscription(subscription_id)
new_subscription_id = get_product_id_by_name(ProductName.SITE)
new_subscription = Iptrunk.from_other_product(old_iptrunk, new_subscription_id)
new_subscription = Iptrunk.from_other_product(old_iptrunk, new_subscription_id) # type: ignore[arg-type]
return {"subscription": new_subscription}
......
......@@ -16,7 +16,7 @@ def import_office_router_subscription(subscription_id: UUIDstr) -> State:
"""Take an ImportedOfficeRouter subscription, and turn it into an OfficeRouter subscription."""
old_office_router = ImportedOfficeRouter.from_subscription(subscription_id)
new_subscription_id = get_product_id_by_name(ProductName.SITE)
new_subscription = OfficeRouter.from_other_product(old_office_router, new_subscription_id)
new_subscription = OfficeRouter.from_other_product(old_office_router, new_subscription_id) # type: ignore[arg-type]
return {"subscription": new_subscription}
......
......@@ -16,7 +16,7 @@ def import_router_subscription(subscription_id: UUIDstr) -> State:
"""Take an ImportedRouter subscription, and turn it into a Router subscription."""
old_router = ImportedRouter.from_subscription(subscription_id)
new_subscription_id = get_product_id_by_name(ProductName.SITE)
new_subscription = Router.from_other_product(old_router, new_subscription_id)
new_subscription = Router.from_other_product(old_router, new_subscription_id) # type: ignore[arg-type]
return {"subscription": new_subscription}
......
......@@ -16,7 +16,7 @@ def import_site_subscription(subscription_id: UUIDstr) -> State:
"""Take an ImportedSite subscription, and turn it into a Site subscription."""
old_site = ImportedSite.from_subscription(subscription_id)
new_subscription_id = get_product_id_by_name(ProductName.SITE)
new_subscription = Site.from_other_product(old_site, new_subscription_id)
new_subscription = Site.from_other_product(old_site, new_subscription_id) # type: ignore[arg-type]
return {"subscription": new_subscription}
......
......@@ -16,7 +16,7 @@ def import_super_pop_switch_subscription(subscription_id: UUIDstr) -> State:
"""Take an ImportedSuperPopSwitch subscription, and turn it into a SuperPopSwitch subscription."""
old_super_pop_switch = ImportedSuperPopSwitch.from_subscription(subscription_id)
new_subscription_id = get_product_id_by_name(ProductName.SITE)
new_subscription = SuperPopSwitch.from_other_product(old_super_pop_switch, new_subscription_id)
new_subscription = SuperPopSwitch.from_other_product(old_super_pop_switch, new_subscription_id) # type: ignore[arg-type]
return {"subscription": new_subscription}
......
......@@ -85,7 +85,7 @@ def mock_routers(iptrunk_data):
yield mock_get_active_router_subscriptions
@patch("gso.api.v1.imports._start_process")
@patch("gso.api.v1.imports.iptrunk._start_process")
def test_create_imported_iptrunk_successful_with_mocked_process(
mock_start_process, test_client, mock_routers, iptrunk_data
):
......@@ -236,7 +236,7 @@ def test_create_imported_iptrunk_successful_with_real_process(test_client, mock_
assert subscription is not None
@patch("gso.api.v1.imports._start_process")
@patch("gso.api.helpers._start_process")
def test_create_imported_iptrunk_invalid_partner(mock_start_process, test_client, mock_routers, iptrunk_data):
iptrunk_data["partner"] = "not_existing_partner"
mock_start_process.return_value = "123e4567-e89b-12d3-a456-426655440000"
......@@ -254,7 +254,7 @@ def test_create_imported_iptrunk_invalid_partner(mock_start_process, test_client
}
@patch("gso.api.v1.imports._start_process")
@patch("gso.api.helpers._start_process")
def test_create_imported_iptrunk_invalid_router_id_side_a_and_b(mock_start_process, test_client, iptrunk_data):
iptrunk_data["side_a_node_id"] = "NOT FOUND"
iptrunk_data["side_b_node_id"] = "NOT FOUND"
......@@ -279,7 +279,7 @@ def test_create_imported_iptrunk_invalid_router_id_side_a_and_b(mock_start_proce
}
@patch("gso.api.v1.imports._start_process")
@patch("gso.api.helpers._start_process")
def test_create_imported_iptrunk_non_unique_members_side_a(
mock_start_process, test_client, mock_routers, iptrunk_data, faker
):
......@@ -320,7 +320,7 @@ def test_create_imported_iptrunk_non_unique_members_side_a(
}
@patch("gso.api.v1.imports._start_process")
@patch("gso.api.helpers._start_process")
def test_create_imported_iptrunk_fails_on_side_a_member_count_mismatch(
mock_start_process,
test_client,
......@@ -345,7 +345,7 @@ def test_create_imported_iptrunk_fails_on_side_a_member_count_mismatch(
}
@patch("gso.api.v1.imports._start_process")
@patch("gso.api.helpers._start_process")
def test_create_imported_iptrunk_fails_on_side_a_and_b_members_mismatch(
mock_start_process,
test_client,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment