Skip to content
Snippets Groups Projects
Commit 5dcb98a6 authored by Neda Moeini's avatar Neda Moeini
Browse files

Added middleware to modify porecess details response and replace the callback...

Added middleware to modify porecess details response and replace the callback result with a URl, and Implemented get callback result by step-id endpoint.
parent 93936ae1
No related branches found
No related tags found
1 merge request!142Added middleware to modify porecess details response and replace the callback...
......@@ -10,12 +10,14 @@ from orchestrator.cli.main import app as cli_app
import gso.products
import gso.workflows # noqa: F401
from gso.api import router as api_router
from gso.middlewares import ModifyProcessEndpointResponse
def init_gso_app() -> OrchestratorCore:
"""Initialise the :term:`GSO` app."""
app = OrchestratorCore(base_settings=app_settings)
app.include_router(api_router, prefix="/api")
app.add_middleware(ModifyProcessEndpointResponse)
return app
......
......@@ -3,9 +3,11 @@
from fastapi import APIRouter
from gso.api.v1.imports import router as imports_router
from gso.api.v1.processes import router as processes_router
from gso.api.v1.subscriptions import router as subscriptions_router
router = APIRouter()
router.include_router(imports_router)
router.include_router(subscriptions_router)
router.include_router(processes_router)
"""Process related endpoints."""
from typing import Any
from uuid import UUID
from fastapi import APIRouter, HTTPException, status
from orchestrator.db import ProcessStepTable
from orchestrator.schemas.base import OrchestratorBaseModel
router = APIRouter(prefix="/processes", tags=["Processes"])
class CallBackResultsBaseModel(OrchestratorBaseModel):
"""Base model for callback results."""
callback_results: dict
@router.get(
"/steps/{step_id}/callback-results", status_code=status.HTTP_200_OK, response_model=CallBackResultsBaseModel
)
def callback_results(step_id: UUID) -> dict[str, Any]:
"""Retrieve callback results for a specific process step.
Args:
----
step_id (UUID): The unique identifier of the process step.
Returns:
-------
dict: Dictionary containing callback results.
Raises:
------
HTTPException: 404 status code if the specified step_id is not found or if
the 'callback_result' key is not present in the state.
"""
step = ProcessStepTable.query.filter(ProcessStepTable.step_id == step_id).first()
if not (step and step.state.get("callback_result", None)):
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Callback result not found.")
return {"callback_results": step.state["callback_result"]}
"""Custom middlewares for the GSO API."""
import json
import re
from collections.abc import Callable
from typing import Any
from fastapi import Request
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import Response
from starlette.status import HTTP_200_OK
class ModifyProcessEndpointResponse(BaseHTTPMiddleware):
"""Middleware to modify the response for Process details endpoint."""
async def dispatch(self, request: Request, call_next: Callable) -> Response:
"""Middleware to modify the response for Process details endpoint.
Args:
----
request (Request): The incoming HTTP request.
call_next (Callable): The next middleware or endpoint in the stack.
Returns:
-------
Response: The modified HTTP response.
"""
response = await call_next(request)
path_pattern = r"/api/processes/([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})"
match = re.match(path_pattern, request.url.path)
if match and response.status_code == HTTP_200_OK:
# Modify the response body as needed
response_body = b""
async for chunk in response.body_iterator:
response_body += chunk
try:
json_content = json.loads(response_body)
self.modify_response_body(json_content, request)
modified_response_body = json.dumps(json_content).encode()
headers = dict(response.headers)
headers["content-length"] = str(len(modified_response_body))
return Response(
content=modified_response_body,
status_code=response.status_code,
headers=headers,
media_type=response.media_type,
)
except json.JSONDecodeError:
pass
return response
@staticmethod
def modify_response_body(response_body: dict[str, Any], request: Request) -> None:
"""Modify the response body as needed.
Args:
----
response_body (Dict[str, Any]): The response body in dictionary format.
request (Request): The incoming HTTP request.
"""
max_output_length = 1000
try:
for step in response_body["steps"]:
if step["state"].get("callback_result", None):
callback_result = step["state"]["callback_result"]
if callback_result and isinstance(callback_result, str):
callback_result = json.loads(callback_result)
if callback_result.get("output") and len(callback_result["output"]) > max_output_length:
callback_result[
"output"
] = f'{request.base_url}api/v1/processes/steps/{step["step_id"]}/callback-results/'
step["state"]["callback_result"] = callback_result
except (AttributeError, KeyError, TypeError):
pass
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment