Skip to content
Snippets Groups Projects
Commit 82825131 authored by Mohammad Torkashvand's avatar Mohammad Torkashvand
Browse files

remove proccess api

parent 8cd2be13
Branches
Tags
No related merge requests found
Pipeline #87928 failed
"""Process related endpoints."""
from typing import Any
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, status
from orchestrator.db import ProcessStepTable
from orchestrator.schemas.base import OrchestratorBaseModel
from orchestrator.security import authorize
router = APIRouter(prefix="/processes", tags=["Processes"], dependencies=[Depends(authorize)])
class CallBackResultsBaseModel(OrchestratorBaseModel):
"""Base model for callback results."""
callback_results: dict
@router.get(
"/steps/{step_id}/callback-results", status_code=status.HTTP_200_OK, response_model=CallBackResultsBaseModel
)
def callback_results(step_id: UUID) -> dict[str, Any]:
"""Retrieve callback results for a specific process step.
:param step_id: The unique identifier of the process step.
:type step_id: UUID
:return: Dictionary containing callback results.
:rtype: dict[str, Any]
:raises HTTPException: 404 status code if the specified step_id is not found or if the 'callback_result' key
is not present in the state.
"""
step = ProcessStepTable.query.filter(ProcessStepTable.step_id == step_id).first()
if not (step and step.state.get("callback_result", None)):
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Callback result not found.")
return {"callback_results": step.state["callback_result"]}
from uuid import uuid4
import pytest
from orchestrator.db import (
ProcessStepTable,
ProcessSubscriptionTable,
ProcessTable,
db,
)
from orchestrator.workflow import ProcessStatus
@pytest.fixture()
def create_process(test_workflow, nokia_router_subscription_factory):
process_id = uuid4()
process = ProcessTable(
process_id=process_id, workflow_id=test_workflow.workflow_id, last_status=ProcessStatus.SUSPENDED
)
subscription = nokia_router_subscription_factory()
process_subscription = ProcessSubscriptionTable(process_id=process_id, subscription_id=subscription)
db.session.add(process)
db.session.add(process_subscription)
db.session.commit()
return process_id
def test_callback_results_endpoint(test_client, create_process, faker):
expected_result = {"id": 1, "output": faker.sentence()}
step = ProcessStepTable(
process_id=create_process,
name="Modify",
status="suspend",
state={"subscription_id": uuid4(), "callback_result": expected_result},
)
db.session.add(step)
db.session.commit()
response = test_client.get(f"/api/v1/processes/steps/{step.step_id}/callback-results")
assert response.status_code == 200
assert response.json() == {"callback_results": expected_result}
def test_callback_results_endpoint_with_wrong_step_id(test_client):
response = test_client.get(f"/api/v1/processes/steps/{uuid4()}/callback-results")
assert response.status_code == 404
assert response.json() == {"detail": "Callback result not found."}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment