diff --git a/gso/schedules/clean_old_tasks.py b/gso/schedules/clean_old_tasks.py index c37d44450209a90033f66b36094e91b6f5403442..c05543d355ac359f579f4517956bf85283da3113 100644 --- a/gso/schedules/clean_old_tasks.py +++ b/gso/schedules/clean_old_tasks.py @@ -7,7 +7,7 @@ from gso.worker import celery @celery.task -@scheduler(CronScheduleConfig(name="Clean up tasks", hour="3")) +@scheduler(CronScheduleConfig(name="Clean up tasks", hour="23")) def clean_old_tasks() -> None: - """Run all cleanup tasks every 3 AM UTC.""" + """Run all cleanup tasks every 11 PM UTC.""" start_process("task_clean_old_tasks") diff --git a/gso/schedules/task_vacuum.py b/gso/schedules/task_vacuum.py index be04380f8caaf53152f77ca129ba74b9d63bd164..de4d44842f485cc8df41f9c01eed3ead2d651a30 100644 --- a/gso/schedules/task_vacuum.py +++ b/gso/schedules/task_vacuum.py @@ -7,7 +7,7 @@ from gso.worker import celery @celery.task -@scheduler(CronScheduleConfig(name="Clean up tasks", hour="*/6")) +@scheduler(CronScheduleConfig(name="Clean up tasks", hour="1")) def vacuum_tasks() -> None: - """Run all cleanup tasks every 6 hours.""" + """Run all cleanup tasks every 1 AM UTC.""" start_process("task_clean_up_tasks") diff --git a/gso/schedules/validate_products.py b/gso/schedules/validate_products.py index eeb689a247e337272a5f96eb5992a87e26227dc7..9d8e6f18a2ab6dcbf684f16f0a100b530df02b6c 100644 --- a/gso/schedules/validate_products.py +++ b/gso/schedules/validate_products.py @@ -3,7 +3,7 @@ from orchestrator.services.processes import start_process from gso.schedules.scheduling import CronScheduleConfig, scheduler -from gso.services.subscriptions import count_incomplete_validate_products +from gso.services.processes import count_incomplete_validate_products from gso.worker import celery diff --git a/gso/services/processes.py b/gso/services/processes.py new file mode 100644 index 0000000000000000000000000000000000000000..3a146da90780a9d47647f88f7245f27793f198a6 --- /dev/null +++ b/gso/services/processes.py @@ -0,0 +1,51 @@ +"""A collection of methods that make interaction with coreDB more straight-forward. + +This prevents someone from having to re-write database statements many times, that might turn out to be erroneous +or inconsistent when not careful. These methods are related to operations regarding processes and workflows. +""" + +from orchestrator.db import ProcessTable, WorkflowTable, db +from orchestrator.devtools.populator import UUIDstr +from orchestrator.workflow import ProcessStatus +from sqlalchemy import ScalarResult, or_, select + + +def count_incomplete_validate_products() -> int: + """Count the number of incomplete validate_geant_products processes. + + :return: The count of incomplete 'validate_geant_products' processes. + :rtype: int + """ + return ProcessTable.query.filter( + ProcessTable.workflow_name == "validate_geant_products", + ProcessTable.last_status != ProcessStatus.COMPLETED.value, + ).count() + + +def get_failed_tasks() -> list[ProcessTable]: + """Get all tasks that have failed.""" + return ProcessTable.query.filter( + ProcessTable.is_task.is_(True), ProcessTable.last_status == ProcessStatus.FAILED.value + ).all() + + +def get_all_cleanup_tasks() -> list[WorkflowTable]: + """Get a list of all cleanup tasks that run on a schedule.""" + return WorkflowTable.query.filter( + or_(WorkflowTable.name == "task_clean_up_tasks", WorkflowTable.name == "task_clean_old_tasks") + ).all() + + +def get_created_and_completed_processes_by_id(workflow_id: UUIDstr) -> ScalarResult: + """Get all processes that are either created or completed, by workflow ID.""" + return db.session.scalars( + select(ProcessTable) + .filter(ProcessTable.is_task.is_(True)) + .filter(ProcessTable.workflow_id == workflow_id) + .filter( + or_( + ProcessTable.last_status == ProcessStatus.COMPLETED.value, + ProcessTable.last_status == ProcessStatus.CREATED.value, + ) + ) + ) diff --git a/gso/services/subscriptions.py b/gso/services/subscriptions.py index 85d3972630333ec4295c7bb624b04ac050978e97..e0a6211cccda945d4eff7a88ad3920e9dd543104 100644 --- a/gso/services/subscriptions.py +++ b/gso/services/subscriptions.py @@ -1,7 +1,7 @@ """A collection of methods that make interaction with coreDB more straight-forward. This prevents someone from having to re-write database statements many times, that might turn out to be erroneous -or inconsistent when not careful. +or inconsistent when not careful. These methods relate to operations on subscriptions. """ from typing import Any @@ -9,7 +9,6 @@ from uuid import UUID from orchestrator.db import ( ProcessSubscriptionTable, - ProcessTable, ProductTable, ResourceTypeTable, SubscriptionInstanceTable, @@ -20,7 +19,6 @@ from orchestrator.db import ( from orchestrator.domain import SubscriptionModel from orchestrator.services.subscriptions import query_in_use_by_subscriptions from orchestrator.types import SubscriptionLifecycle -from orchestrator.workflow import ProcessStatus from pydantic_forms.types import UUIDstr from gso.products import ProductName, ProductType @@ -200,25 +198,6 @@ def get_active_subscriptions_by_field_and_value(field_name: str, field_value: st ) -def count_incomplete_validate_products() -> int: - """Count the number of incomplete validate_geant_products processes. - - :return: The count of incomplete 'validate_geant_products' processes. - :rtype: int - """ - return ProcessTable.query.filter( - ProcessTable.workflow_name == "validate_geant_products", - ProcessTable.last_status != ProcessStatus.COMPLETED.value, - ).count() - - -def get_failed_tasks() -> list[ProcessTable]: - """Get all tasks that have failed.""" - return ProcessTable.query.filter( - ProcessTable.is_task.is_(True), ProcessTable.last_status == ProcessStatus.FAILED.value - ).all() - - def get_subscription_by_process_id(process_id: str) -> SubscriptionModel | None: """Get a subscription from a process ID.""" subscription_table = ProcessSubscriptionTable.query.filter( diff --git a/gso/workflows/tasks/clean_old_tasks.py b/gso/workflows/tasks/clean_old_tasks.py index 963e108f7c959bb273a772d9d37e50be1b239d2d..cdc9a0decf57fda5de80b3ad14092ad326d4a054 100644 --- a/gso/workflows/tasks/clean_old_tasks.py +++ b/gso/workflows/tasks/clean_old_tasks.py @@ -1,30 +1,19 @@ """A cleanup task for removing past runs.""" -from orchestrator.db import ProcessTable, WorkflowTable, db +from orchestrator.db import db from orchestrator.targets import Target -from orchestrator.workflow import ProcessStatus, StepList, begin, done, step, workflow -from sqlalchemy import or_, select +from orchestrator.workflow import StepList, begin, done, step, workflow + +from gso.services.processes import get_all_cleanup_tasks, get_created_and_completed_processes_by_id @step("Clean up all cleanup tasks") def remove_meta_tasks() -> None: """Find and remove runs of old cleanup tasks.""" - cleanup_tasks = WorkflowTable.query.filter( - or_(WorkflowTable.name == "task_clean_up_tasks", WorkflowTable.name == "task_clean_old_tasks") - ).all() + cleanup_tasks = get_all_cleanup_tasks() for cleanup_task in cleanup_tasks: - tasks = db.session.scalars( - select(ProcessTable) - .filter(ProcessTable.is_task.is_(True)) - .filter(ProcessTable.workflow_id == cleanup_task.workflow_id) - .filter( - or_( - ProcessTable.last_status == ProcessStatus.COMPLETED.value, - ProcessTable.last_status == ProcessStatus.CREATED.value, - ) - ) - ) + tasks = get_created_and_completed_processes_by_id(cleanup_task.workflow_id) for task in tasks: db.session.delete(task) diff --git a/gso/workflows/tasks/send_email_notifications.py b/gso/workflows/tasks/send_email_notifications.py index f346cf1f5d14db2f1b00e9307ae22b83b609066d..42b94bd421f4d2e45af354efa00205f975530c07 100644 --- a/gso/workflows/tasks/send_email_notifications.py +++ b/gso/workflows/tasks/send_email_notifications.py @@ -5,7 +5,8 @@ from orchestrator.types import State from orchestrator.workflow import StepList, conditional, done, init, step, workflow from gso.services.mailer import send_mail -from gso.services.subscriptions import get_failed_tasks, get_subscription_by_process_id +from gso.services.processes import get_failed_tasks +from gso.services.subscriptions import get_subscription_by_process_id from gso.settings import load_oss_params