You need to sign in or sign up before continuing.
Newer
Older

Neda Moeini
committed
"""Process related endpoints."""
from typing import Any
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, status

Neda Moeini
committed
from orchestrator.db import ProcessStepTable
from orchestrator.schemas.base import OrchestratorBaseModel
from gso.auth.security import opa_security_default
router = APIRouter(prefix="/processes", tags=["Processes"], dependencies=[Depends(opa_security_default)])

Neda Moeini
committed
class CallBackResultsBaseModel(OrchestratorBaseModel):
"""Base model for callback results."""
callback_results: dict
@router.get(
"/steps/{step_id}/callback-results", status_code=status.HTTP_200_OK, response_model=CallBackResultsBaseModel
)
def callback_results(step_id: UUID) -> dict[str, Any]:
"""Retrieve callback results for a specific process step.
Args:
----
step_id (UUID): The unique identifier of the process step.
Returns:
-------
dict: Dictionary containing callback results.
Raises:
------
HTTPException: 404 status code if the specified step_id is not found or if
the 'callback_result' key is not present in the state.
"""
step = ProcessStepTable.query.filter(ProcessStepTable.step_id == step_id).first()
if not (step and step.state.get("callback_result", None)):
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Callback result not found.")
return {"callback_results": step.state["callback_result"]}