diff --git a/gso/__init__.py b/gso/__init__.py index 0227c19d7d29838dc2952ac34abb13d6a9ebc3ea..ecdfd940ffefe85df1613e4a6cbbc74f56bf80dc 100644 --- a/gso/__init__.py +++ b/gso/__init__.py @@ -10,12 +10,14 @@ from orchestrator.cli.main import app as cli_app import gso.products import gso.workflows # noqa: F401 from gso.api import router as api_router +from gso.middlewares import ModifyProcessEndpointResponse def init_gso_app() -> OrchestratorCore: """Initialise the :term:`GSO` app.""" app = OrchestratorCore(base_settings=app_settings) app.include_router(api_router, prefix="/api") + app.add_middleware(ModifyProcessEndpointResponse) return app diff --git a/gso/api/v1/__init__.py b/gso/api/v1/__init__.py index c14de2e3eec324fed3a12e7a85c3c39f236ed83d..983408986b827a35bf32cbc538d39eb7b6e208e1 100644 --- a/gso/api/v1/__init__.py +++ b/gso/api/v1/__init__.py @@ -3,9 +3,11 @@ from fastapi import APIRouter from gso.api.v1.imports import router as imports_router +from gso.api.v1.processes import router as processes_router from gso.api.v1.subscriptions import router as subscriptions_router router = APIRouter() router.include_router(imports_router) router.include_router(subscriptions_router) +router.include_router(processes_router) diff --git a/gso/api/v1/processes.py b/gso/api/v1/processes.py new file mode 100644 index 0000000000000000000000000000000000000000..5edcbf69d483b44e331ceed72762e928abc3bc2c --- /dev/null +++ b/gso/api/v1/processes.py @@ -0,0 +1,43 @@ +"""Process related endpoints.""" + +from typing import Any +from uuid import UUID + +from fastapi import APIRouter, HTTPException, status +from orchestrator.db import ProcessStepTable +from orchestrator.schemas.base import OrchestratorBaseModel + +router = APIRouter(prefix="/processes", tags=["Processes"]) + + +class CallBackResultsBaseModel(OrchestratorBaseModel): + """Base model for callback results.""" + + callback_results: dict + + +@router.get( + "/steps/{step_id}/callback-results", status_code=status.HTTP_200_OK, response_model=CallBackResultsBaseModel +) +def callback_results(step_id: UUID) -> dict[str, Any]: + """Retrieve callback results for a specific process step. + + Args: + ---- + step_id (UUID): The unique identifier of the process step. + + Returns: + ------- + dict: Dictionary containing callback results. + + Raises: + ------ + HTTPException: 404 status code if the specified step_id is not found or if + the 'callback_result' key is not present in the state. + """ + step = ProcessStepTable.query.filter(ProcessStepTable.step_id == step_id).first() + + if not (step and step.state.get("callback_result", None)): + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Callback result not found.") + + return {"callback_results": step.state["callback_result"]} diff --git a/gso/middlewares.py b/gso/middlewares.py new file mode 100644 index 0000000000000000000000000000000000000000..6005ee3874d5fd099dc7e04692478b87a0af36ac --- /dev/null +++ b/gso/middlewares.py @@ -0,0 +1,79 @@ +"""Custom middlewares for the GSO API.""" + +import json +import re +from collections.abc import Callable +from typing import Any + +from fastapi import Request +from starlette.middleware.base import BaseHTTPMiddleware +from starlette.responses import Response +from starlette.status import HTTP_200_OK + + +class ModifyProcessEndpointResponse(BaseHTTPMiddleware): + """Middleware to modify the response for Process details endpoint.""" + + async def dispatch(self, request: Request, call_next: Callable) -> Response: + """Middleware to modify the response for Process details endpoint. + + Args: + ---- + request (Request): The incoming HTTP request. + call_next (Callable): The next middleware or endpoint in the stack. + + Returns: + ------- + Response: The modified HTTP response. + """ + response = await call_next(request) + path_pattern = r"/api/processes/([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})" + + match = re.match(path_pattern, request.url.path) + + if match and response.status_code == HTTP_200_OK: + # Modify the response body as needed + response_body = b"" + async for chunk in response.body_iterator: + response_body += chunk + try: + json_content = json.loads(response_body) + self.modify_response_body(json_content, request) + modified_response_body = json.dumps(json_content).encode() + headers = dict(response.headers) + headers["content-length"] = str(len(modified_response_body)) + return Response( + content=modified_response_body, + status_code=response.status_code, + headers=headers, + media_type=response.media_type, + ) + + except json.JSONDecodeError: + pass + + return response + + @staticmethod + def modify_response_body(response_body: dict[str, Any], request: Request) -> None: + """Modify the response body as needed. + + Args: + ---- + response_body (Dict[str, Any]): The response body in dictionary format. + request (Request): The incoming HTTP request. + """ + max_output_length = 1000 + try: + for step in response_body["steps"]: + if step["state"].get("callback_result", None): + callback_result = step["state"]["callback_result"] + if callback_result and isinstance(callback_result, str): + callback_result = json.loads(callback_result) + if callback_result.get("output") and len(callback_result["output"]) > max_output_length: + callback_result[ + "output" + ] = f'{request.base_url}api/v1/processes/steps/{step["step_id"]}/callback-results/' + step["state"]["callback_result"] = callback_result + except (AttributeError, KeyError, TypeError): + pass