""":term:`API` endpoint for fetching different types of subscriptions.""" from typing import Any from fastapi import Depends, status from fastapi.routing import APIRouter from orchestrator.domain import SubscriptionModel from orchestrator.schemas import SubscriptionDomainModelSchema from orchestrator.security import opa_security_default from orchestrator.services.subscriptions import build_extended_domain_model from gso.services.subscriptions import get_active_router_subscriptions router = APIRouter( prefix="/subscriptions", tags=["Subscriptions"], dependencies=[Depends(opa_security_default)], ) @router.get( "/routers", status_code=status.HTTP_200_OK, response_model=list[SubscriptionDomainModelSchema], ) def subscription_routers() -> list[dict[str, Any]]: """Retrieve all active routers subscriptions.""" subscriptions = [] for r in get_active_router_subscriptions(): subscription = SubscriptionModel.from_subscription(r["subscription_id"]) extended_model = build_extended_domain_model(subscription) subscriptions.append(extended_model) return subscriptions