Skip to content
Snippets Groups Projects
Commit 05f70d48 authored by Mohammad Torkashvand's avatar Mohammad Torkashvand
Browse files

remove proccess api

parent 8cd2be13
No related branches found
No related tags found
1 merge request!215Feature/nat 468 refactor auth
Pipeline #87929 failed
...@@ -34,7 +34,7 @@ def init_worker_app() -> OrchestratorCore: ...@@ -34,7 +34,7 @@ def init_worker_app() -> OrchestratorCore:
def init_cli_app() -> typer.Typer: def init_cli_app() -> typer.Typer:
"""Initialise :term:`GSO` as a CLI application.""" """Initialise :term:`GSO` as a CLI application."""
from gso.cli import imports, netbox from gso.cli import imports, netbox # noqa: PLC0415
cli_app.add_typer(imports.app, name="import-cli") cli_app.add_typer(imports.app, name="import-cli")
cli_app.add_typer(netbox.app, name="netbox-cli") cli_app.add_typer(netbox.app, name="netbox-cli")
......
"""Process related endpoints."""
from typing import Any
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, status
from orchestrator.db import ProcessStepTable
from orchestrator.schemas.base import OrchestratorBaseModel
from orchestrator.security import authorize
router = APIRouter(prefix="/processes", tags=["Processes"], dependencies=[Depends(authorize)])
class CallBackResultsBaseModel(OrchestratorBaseModel):
"""Base model for callback results."""
callback_results: dict
@router.get(
"/steps/{step_id}/callback-results", status_code=status.HTTP_200_OK, response_model=CallBackResultsBaseModel
)
def callback_results(step_id: UUID) -> dict[str, Any]:
"""Retrieve callback results for a specific process step.
:param step_id: The unique identifier of the process step.
:type step_id: UUID
:return: Dictionary containing callback results.
:rtype: dict[str, Any]
:raises HTTPException: 404 status code if the specified step_id is not found or if the 'callback_result' key
is not present in the state.
"""
step = ProcessStepTable.query.filter(ProcessStepTable.step_id == step_id).first()
if not (step and step.state.get("callback_result", None)):
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Callback result not found.")
return {"callback_results": step.state["callback_result"]}
...@@ -37,7 +37,7 @@ def _setup_connection() -> tuple[connector.Connector, IPAMParams]: ...@@ -37,7 +37,7 @@ def _setup_connection() -> tuple[connector.Connector, IPAMParams]:
return connector.Connector(options), oss return connector.Connector(options), oss
def _allocate_network( def _allocate_network( # noqa: PLR0917
conn: connector.Connector, conn: connector.Connector,
dns_view: str, dns_view: str,
network_view: str, network_view: str,
......
...@@ -9,7 +9,7 @@ from gso.settings import load_oss_params ...@@ -9,7 +9,7 @@ from gso.settings import load_oss_params
class OrchestratorCelery(Celery): class OrchestratorCelery(Celery):
"""A :term:`GSO` instance that functions as a Celery worker.""" """A :term:`GSO` instance that functions as a Celery worker."""
def on_init(self) -> None: def on_init(self) -> None: # noqa: PLR6301
"""Initialise a new Celery worker.""" """Initialise a new Celery worker."""
init_worker_app() init_worker_app()
......
from uuid import uuid4
import pytest
from orchestrator.db import (
ProcessStepTable,
ProcessSubscriptionTable,
ProcessTable,
db,
)
from orchestrator.workflow import ProcessStatus
@pytest.fixture()
def create_process(test_workflow, nokia_router_subscription_factory):
process_id = uuid4()
process = ProcessTable(
process_id=process_id, workflow_id=test_workflow.workflow_id, last_status=ProcessStatus.SUSPENDED
)
subscription = nokia_router_subscription_factory()
process_subscription = ProcessSubscriptionTable(process_id=process_id, subscription_id=subscription)
db.session.add(process)
db.session.add(process_subscription)
db.session.commit()
return process_id
def test_callback_results_endpoint(test_client, create_process, faker):
expected_result = {"id": 1, "output": faker.sentence()}
step = ProcessStepTable(
process_id=create_process,
name="Modify",
status="suspend",
state={"subscription_id": uuid4(), "callback_result": expected_result},
)
db.session.add(step)
db.session.commit()
response = test_client.get(f"/api/v1/processes/steps/{step.step_id}/callback-results")
assert response.status_code == 200
assert response.json() == {"callback_results": expected_result}
def test_callback_results_endpoint_with_wrong_step_id(test_client):
response = test_client.get(f"/api/v1/processes/steps/{uuid4()}/callback-results")
assert response.status_code == 404
assert response.json() == {"detail": "Callback result not found."}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment