-
Karel van Klink authoredKarel van Klink authored
subscriptions.py 1.01 KiB
from typing import Any
from fastapi import Depends, status
from fastapi.routing import APIRouter
from orchestrator.domain import SubscriptionModel
from orchestrator.schemas import SubscriptionDomainModelSchema
from orchestrator.security import opa_security_default
from orchestrator.services.subscriptions import build_extended_domain_model
from gso.services.subscriptions import get_active_router_subscriptions
router = APIRouter(prefix="/subscriptions", tags=["Subscriptions"], dependencies=[Depends(opa_security_default)])
@router.get("/routers", status_code=status.HTTP_200_OK, response_model=list[SubscriptionDomainModelSchema])
def subscription_routers() -> list[dict[str, Any]]:
"""Retrieve all active router subscriptions."""
subscriptions = []
for r in get_active_router_subscriptions():
subscription = SubscriptionModel.from_subscription(r["subscription_id"])
extended_model = build_extended_domain_model(subscription)
subscriptions.append(extended_model)
return subscriptions