Newer
Older
import ipaddress
from fastapi import Depends, HTTPException, status
from orchestrator.security import opa_security_default
from orchestrator.services import processes
from orchestrator.services import subscriptions as wfo_subscriptions
from pydantic import BaseModel, root_validator, validator
from sqlalchemy.exc import MultipleResultsFound
from gso.products.product_blocks.iptrunk import IptrunkType, PhyPortCapacity
from gso.products.product_blocks.router import RouterRole, RouterVendor
from gso.products.product_blocks.site import SiteTier
from gso.services import subscriptions
from gso.services.crm import CustomerNotFoundError, get_customer_by_name
router = APIRouter(prefix="/imports", tags=["Imports"], dependencies=[Depends(opa_security_default)])
class ImportResponseModel(BaseModel):
pid: UUID
detail: str
class SiteImportModel(BaseModel):
site_name: str
site_city: str
site_country: str
site_country_code: str
site_latitude: float
site_longitude: float
site_bgp_community_id: int
site_internal_id: int
site_tier: SiteTier
site_ts_address: str
customer: str
@validator("site_name", allow_reuse=True)
def site_name_must_be_valid(cls, site_name: str) -> str:
# Accept 3 chapital letters and only one ditigt after capital letters.
pattern = re.compile(r"^[A-Z]{3}[0-9]?$")
if not bool(pattern.match(site_name)):
raise ValueError(f"Enter a valid site name similar looks like AMS, AMS1or LON9. Get: {site_name}")
return site_name
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
class RouterImportModel(BaseModel):
customer: str
router_site: str
hostname: str
ts_port: int
router_vendor: RouterVendor
router_role: RouterRole
is_ias_connected: bool
router_lo_ipv4_address: ipaddress.IPv4Address
router_lo_ipv6_address: ipaddress.IPv6Address
router_lo_iso_address: str
router_si_ipv4_network: ipaddress.IPv4Network | None = None
router_ias_lt_ipv4_network: ipaddress.IPv4Network | None = None
router_ias_lt_ipv6_network: ipaddress.IPv6Network | None = None
class IptrunkImportModel(BaseModel):
customer: str
geant_s_sid: str
iptrunk_type: IptrunkType
iptrunk_description: str
iptrunk_speed: PhyPortCapacity
iptrunk_minimum_links: int
side_a_node_id: str
side_a_ae_iface: str
side_a_ae_geant_a_sid: str
side_a_ae_members: list[LAGMember]
side_b_node_id: str
side_b_ae_iface: str
side_b_ae_geant_a_sid: str
side_b_ae_members: list[LAGMember]
iptrunk_ipv4_network: ipaddress.IPv4Network
iptrunk_ipv6_network: ipaddress.IPv6Network
@classmethod
def _get_active_routers(cls) -> set[str]:
return {
str(router["subscription_id"])
for router in subscriptions.get_active_router_subscriptions(includes=["subscription_id"])
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
}
@validator("customer")
def check_if_customer_exists(cls, value: str) -> str:
try:
get_customer_by_name(value)
except CustomerNotFoundError:
raise ValueError(f"Customer {value} not found")
return value
@validator("side_a_node_id", "side_b_node_id")
def check_if_router_side_is_available(cls, value: str) -> str:
if value not in cls._get_active_routers():
raise ValueError(f"Router {value} not found")
return value
@validator("side_a_ae_members", "side_b_ae_members")
def check_side_uniqueness(cls, value: list[str]) -> list[str]:
if len(value) != len(set(value)):
raise ValueError("Items must be unique")
return value
@root_validator
def check_members(cls, values: dict[str, Any]) -> dict[str, Any]:
min_links = values["iptrunk_minimum_links"]
side_a_members = values.get("side_a_ae_members", [])
side_b_members = values.get("side_b_ae_members", [])
len_a = len(side_a_members)
len_b = len(side_b_members)
if len_a < min_links:
raise ValueError(f"Side A members should be at least {min_links} (iptrunk_minimum_links)")
if len_a != len_b:
raise ValueError("Mismatch between Side A and B members")
return values
def _start_process(process_name: str, data: dict) -> UUID:
pid: UUID = processes.start_process(process_name, [data])
if pid is None:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to start the process.")
if process.last_status == "failed":
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Process {pid} failed because of an internal error. {process.failed_reason}",
)
return pid
@router.post("/sites", status_code=status.HTTP_201_CREATED, response_model=ImportResponseModel)
def import_site(site: SiteImportModel) -> dict[str, Any]:
"""Import a site by running the import_site workflow.
:param site: The site information to be imported.
:type site: SiteImportModel
:return: A dictionary containing the process id of the started process and detail message.
:rtype: dict[str, Any]
:raises HTTPException: If the site already exists or if there's an error in the process.
subscription = wfo_subscriptions.retrieve_subscription_by_subscription_instance_value(
resource_type="site_name", value=site.site_name, sub_status=("provisioning", "active")
)
if subscription:
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Site already exists.")
except MultipleResultsFound:
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Multiple subscriptions found.")
pid = _start_process("import_site", site.dict())
return {"detail": "Site added successfully.", "pid": pid}
@router.post("/routers", status_code=status.HTTP_201_CREATED, response_model=ImportResponseModel)
def import_router(router_data: RouterImportModel) -> dict[str, Any]:
"""Import a router by running the import_router workflow.
:param router_data: The router information to be imported.
:type router_data: RouterImportModel
:return: A dictionary containing the process id of the started process and detail message.
:rtype: dict[str, Any]
:raises HTTPException: If there's an error in the process.
pid = _start_process("import_router", router_data.dict())
return {"detail": "Router added successfully", "pid": pid}
@router.post("/iptrunks", status_code=status.HTTP_201_CREATED, response_model=ImportResponseModel)
def import_iptrunk(iptrunk_data: IptrunkImportModel) -> dict[str, Any]:
"""Import an iptrunk by running the import_iptrunk workflow.
:param iptrunk_data: The iptrunk information to be imported.
:type iptrunk_data: IptrunkImportModel
:return: A dictionary containing the process id of the started process and detail message.
:rtype: dict[str, Any]
:raises HTTPException: If there's an error in the process.
"""
pid = _start_process("import_iptrunk", iptrunk_data.dict())
return {"detail": "Iptrunk added successfully", "pid": pid}