""":term:`API` endpoint for fetching different types of subscriptions.""" from typing import Any from fastapi import Depends, Response, status from fastapi.routing import APIRouter from orchestrator.domain import SubscriptionModel from orchestrator.schemas import SubscriptionDomainModelSchema from orchestrator.services.subscriptions import build_extended_domain_model from orchestrator.types import SubscriptionLifecycle from gso.auth.api_key_auth import get_api_key from gso.products import ProductType from gso.services.subscriptions import get_router_subscriptions, get_subscriptions router = APIRouter( prefix="/subscriptions", tags=["Subscriptions"], dependencies=[Depends(get_api_key)], ) @router.get( "/routers", status_code=status.HTTP_200_OK, response_model=list[SubscriptionDomainModelSchema], ) def subscription_routers() -> list[dict[str, Any]]: """Retrieve all active or provisioning router subscriptions.""" subscriptions = [] routers = get_router_subscriptions(lifecycles=[SubscriptionLifecycle.ACTIVE, SubscriptionLifecycle.PROVISIONING]) for r in routers: subscription = SubscriptionModel.from_subscription(r["subscription_id"]) extended_model = build_extended_domain_model(subscription) subscriptions.append(extended_model) return subscriptions @router.get( "/dashboard_devices", status_code=status.HTTP_200_OK, response_class=Response, responses={ 200: { "content": {"text/plain": {}}, "description": "Return a flat file of FQDNs.", } }, ) def subscription_dashboard_devices() -> Response: """Retrieve FQDN for all dashboard devices that are monitored.""" fqdns = [] dashboard_devices = get_subscriptions( product_types=[ProductType.ROUTER, ProductType.SUPER_POP_SWITCH, ProductType.OFFICE_ROUTER], lifecycles=[SubscriptionLifecycle.ACTIVE, SubscriptionLifecycle.PROVISIONING], ) for device in dashboard_devices: subscription = SubscriptionModel.from_subscription(device["subscription_id"]) extended_model = build_extended_domain_model(subscription) if extended_model["product"]["product_type"] == ProductType.ROUTER: fqdns.append(extended_model["router"]["router_fqdn"]) elif extended_model["product"]["product_type"] == ProductType.SUPER_POP_SWITCH: fqdns.append(extended_model["super_pop_switch"]["super_pop_switch_fqdn"]) elif extended_model["product"]["product_type"] == ProductType.OFFICE_ROUTER: fqdns.append(extended_model["office_router"]["office_router_fqdn"]) fqdn_flat_file = "\n".join(fqdns) return Response(content=fqdn_flat_file, media_type="text/plain")