-
Mohammad Torkashvand authoredMohammad Torkashvand authored
processes.py 3.25 KiB
"""A collection of methods that make interaction with coreDB more straight-forward.
This prevents someone from having to re-write database statements many times, that might turn out to be erroneous
or inconsistent when not careful. These methods are related to operations regarding processes and workflows.
"""
from uuid import UUID
from orchestrator.db import ProcessStepTable, ProcessSubscriptionTable, ProcessTable, WorkflowTable, db
from orchestrator.workflow import ProcessStatus, StepStatus
from pydantic_forms.types import UUIDstr
from sqlalchemy import ScalarResult, and_, or_, select
from sqlalchemy.orm import Query, joinedload
def get_processes_by_workflow_name(workflow_name: str) -> Query:
"""Get all processes for a given workflow name."""
return ProcessTable.query.join(WorkflowTable).filter(WorkflowTable.name == workflow_name)
def count_incomplete_validate_products() -> int:
"""Count the number of incomplete validate_geant_products processes.
Returns:
The count of incomplete 'validate_geant_products' processes.
"""
return (
get_processes_by_workflow_name("validate_geant_products")
.filter(ProcessTable.last_status != ProcessStatus.COMPLETED)
.count()
)
def get_failed_tasks() -> list[ProcessTable]:
"""Get all tasks that have failed."""
return ProcessTable.query.filter(
and_(ProcessTable.is_task.is_(True), ProcessTable.last_status == ProcessStatus.FAILED)
).all()
def get_suspended_tasks_by_workflow_name(workflow_name: str) -> list[ProcessTable]:
"""Get all tasks that have gone into a suspended state, for a specific workflow name."""
return (
get_processes_by_workflow_name(workflow_name)
.filter(and_(ProcessTable.is_task.is_(True), ProcessTable.last_status == ProcessStatus.SUSPENDED))
.all()
)
def get_all_cleanup_tasks() -> list[WorkflowTable]:
"""Get a list of all cleanup tasks that run on a schedule."""
return WorkflowTable.query.filter(
or_(WorkflowTable.name == "task_clean_up_tasks", WorkflowTable.name == "task_clean_old_tasks")
).all()
def get_created_and_completed_processes_by_id(workflow_id: UUIDstr) -> ScalarResult:
"""Get all processes that are either created or completed, by workflow ID."""
return db.session.scalars(
select(ProcessTable)
.filter(ProcessTable.is_task.is_(True))
.filter(ProcessTable.workflow_id == workflow_id)
.filter(
or_(
ProcessTable.last_status == ProcessStatus.COMPLETED.value,
ProcessTable.last_status == ProcessStatus.CREATED.value,
)
)
)
def get_stopped_process_by_id(process_id: UUID | UUIDstr) -> ProcessTable | None:
"""Get a stopped process by its ID."""
return (
db.session.query(ProcessTable)
.join(ProcessTable.steps)
.filter(
ProcessTable.process_id == process_id,
ProcessStepTable.status.in_({StepStatus.ABORT, StepStatus.FAILED, StepStatus.COMPLETE}),
)
.options(
joinedload(ProcessTable.steps),
joinedload(ProcessTable.process_subscriptions).joinedload(ProcessSubscriptionTable.subscription),
)
.order_by(ProcessTable.last_modified_at)
.one_or_none()
)