-
Mohammad Torkashvand authoredMohammad Torkashvand authored
network.py 5.09 KiB
"""API endpoints for network related operations."""
import ipaddress
from uuid import UUID
from fastapi import APIRouter
from orchestrator.domain import SubscriptionModel
from orchestrator.schemas.base import OrchestratorBaseModel
from orchestrator.services.subscriptions import build_extended_domain_model
from starlette import status
from gso.products.product_blocks.iptrunk import IptrunkType, PhysicalPortCapacity
from gso.products.product_blocks.router import RouterRole
from gso.products.product_blocks.site import LatitudeCoordinate, LongitudeCoordinate
from gso.services.subscriptions import get_active_iptrunk_subscriptions
from gso.utils.shared_enums import Vendor
router = APIRouter(prefix="/networks", tags=["Network"])
class SiteBlock(OrchestratorBaseModel):
"""Site block schema."""
site_name: str
site_city: str
site_country: str
site_latitude: LatitudeCoordinate
site_longitude: LongitudeCoordinate
site_internal_id: int
class RouterBlock(OrchestratorBaseModel):
"""Router block schema."""
subscription_instance_id: UUID
router_fqdn: str
router_access_via_ts: bool
router_lo_ipv4_address: ipaddress.IPv4Address
router_lo_ipv6_address: ipaddress.IPv6Address
router_lo_iso_address: str
router_role: RouterRole
vendor: Vendor
router_site: SiteBlock
class IptrunkSideBlock(OrchestratorBaseModel):
"""Iptrunk side block schema."""
subscription_instance_id: UUID
iptrunk_side_node: RouterBlock
class IptrunkBlock(OrchestratorBaseModel):
"""Iptrunk block schema."""
subscription_instance_id: UUID
iptrunk_speed: PhysicalPortCapacity
iptrunk_type: IptrunkType
iptrunk_ipv4_network: ipaddress.IPv4Network
iptrunk_ipv6_network: ipaddress.IPv6Network
iptrunk_capacity: str
iptrunk_isis_metric: int
iptrunk_sides: list[IptrunkSideBlock]
class IptrunkSchema(OrchestratorBaseModel):
"""Iptrunk schema."""
subscription_id: UUID
insync: bool
iptrunk: IptrunkBlock
class NetworkTopologyDomainModelSchema(OrchestratorBaseModel):
"""Network topology domain model schema."""
iptrunks: list[IptrunkSchema]
def _calculate_iptrunk_capacity(iptrunk_sides: list, iptrunk_speed: PhysicalPortCapacity) -> str:
"""Calculate the total capacity of an IP trunk."""
int_iptrunk_speed = int(iptrunk_speed.value.replace("G", ""))
capacity = int_iptrunk_speed * len(iptrunk_sides[0]["iptrunk_side_ae_members"])
return f"{capacity}G"
@router.get("/topology", status_code=status.HTTP_200_OK, response_model=NetworkTopologyDomainModelSchema)
def network_topology() -> NetworkTopologyDomainModelSchema:
"""Retrieve the network topology."""
topology: dict = {"iptrunks": []}
active_iptrunks = get_active_iptrunk_subscriptions()
for iptrunk in active_iptrunks:
subscription = SubscriptionModel.from_subscription(iptrunk["subscription_id"])
extended_model = build_extended_domain_model(subscription)
formatted_model = {
"subscription_id": extended_model["subscription_id"],
"insync": extended_model["insync"],
"iptrunk": {
"subscription_instance_id": extended_model["iptrunk"]["subscription_instance_id"],
"iptrunk_speed": extended_model["iptrunk"]["iptrunk_speed"],
"iptrunk_type": extended_model["iptrunk"]["iptrunk_type"],
"iptrunk_ipv4_network": extended_model["iptrunk"]["iptrunk_ipv4_network"],
"iptrunk_ipv6_network": extended_model["iptrunk"]["iptrunk_ipv6_network"],
"iptrunk_isis_metric": extended_model["iptrunk"]["iptrunk_isis_metric"],
"iptrunk_capacity": _calculate_iptrunk_capacity(
extended_model["iptrunk"]["iptrunk_sides"], extended_model["iptrunk"]["iptrunk_speed"]
),
"iptrunk_sides": [
{
"subscription_instance_id": side["subscription_instance_id"],
"iptrunk_side_node": {
"subscription_instance_id": side["iptrunk_side_node"]["subscription_instance_id"],
"router_fqdn": side["iptrunk_side_node"]["router_fqdn"],
"router_access_via_ts": side["iptrunk_side_node"]["router_access_via_ts"],
"router_lo_ipv4_address": side["iptrunk_side_node"]["router_lo_ipv4_address"],
"router_lo_ipv6_address": side["iptrunk_side_node"]["router_lo_ipv6_address"],
"router_lo_iso_address": side["iptrunk_side_node"]["router_lo_iso_address"],
"router_role": side["iptrunk_side_node"]["router_role"],
"vendor": side["iptrunk_side_node"]["vendor"],
"router_site": side["iptrunk_side_node"]["router_site"],
},
}
for side in extended_model["iptrunk"]["iptrunk_sides"]
],
},
}
topology["iptrunks"].append(IptrunkSchema(**formatted_model))
return NetworkTopologyDomainModelSchema(**topology)