From d37b2699b55b71e184e3c768ce21e15f7056fd1e Mon Sep 17 00:00:00 2001 From: Karel van Klink <karel.vanklink@geant.org> Date: Tue, 13 Aug 2024 15:51:21 +0200 Subject: [PATCH] Update timing of cleanup tasks, move db queries to processes service --- gso/schedules/clean_old_tasks.py | 4 +- gso/schedules/task_vacuum.py | 4 +- gso/schedules/validate_products.py | 2 +- gso/services/processes.py | 51 +++++++++++++++++++ gso/services/subscriptions.py | 23 +-------- gso/workflows/tasks/clean_old_tasks.py | 23 +++------ .../tasks/send_email_notifications.py | 3 +- 7 files changed, 65 insertions(+), 45 deletions(-) create mode 100644 gso/services/processes.py diff --git a/gso/schedules/clean_old_tasks.py b/gso/schedules/clean_old_tasks.py index c37d4445..c05543d3 100644 --- a/gso/schedules/clean_old_tasks.py +++ b/gso/schedules/clean_old_tasks.py @@ -7,7 +7,7 @@ from gso.worker import celery @celery.task -@scheduler(CronScheduleConfig(name="Clean up tasks", hour="3")) +@scheduler(CronScheduleConfig(name="Clean up tasks", hour="23")) def clean_old_tasks() -> None: - """Run all cleanup tasks every 3 AM UTC.""" + """Run all cleanup tasks every 11 PM UTC.""" start_process("task_clean_old_tasks") diff --git a/gso/schedules/task_vacuum.py b/gso/schedules/task_vacuum.py index be04380f..de4d4484 100644 --- a/gso/schedules/task_vacuum.py +++ b/gso/schedules/task_vacuum.py @@ -7,7 +7,7 @@ from gso.worker import celery @celery.task -@scheduler(CronScheduleConfig(name="Clean up tasks", hour="*/6")) +@scheduler(CronScheduleConfig(name="Clean up tasks", hour="1")) def vacuum_tasks() -> None: - """Run all cleanup tasks every 6 hours.""" + """Run all cleanup tasks every 1 AM UTC.""" start_process("task_clean_up_tasks") diff --git a/gso/schedules/validate_products.py b/gso/schedules/validate_products.py index eeb689a2..9d8e6f18 100644 --- a/gso/schedules/validate_products.py +++ b/gso/schedules/validate_products.py @@ -3,7 +3,7 @@ from orchestrator.services.processes import start_process from gso.schedules.scheduling import CronScheduleConfig, scheduler -from gso.services.subscriptions import count_incomplete_validate_products +from gso.services.processes import count_incomplete_validate_products from gso.worker import celery diff --git a/gso/services/processes.py b/gso/services/processes.py new file mode 100644 index 00000000..3a146da9 --- /dev/null +++ b/gso/services/processes.py @@ -0,0 +1,51 @@ +"""A collection of methods that make interaction with coreDB more straight-forward. + +This prevents someone from having to re-write database statements many times, that might turn out to be erroneous +or inconsistent when not careful. These methods are related to operations regarding processes and workflows. +""" + +from orchestrator.db import ProcessTable, WorkflowTable, db +from orchestrator.devtools.populator import UUIDstr +from orchestrator.workflow import ProcessStatus +from sqlalchemy import ScalarResult, or_, select + + +def count_incomplete_validate_products() -> int: + """Count the number of incomplete validate_geant_products processes. + + :return: The count of incomplete 'validate_geant_products' processes. + :rtype: int + """ + return ProcessTable.query.filter( + ProcessTable.workflow_name == "validate_geant_products", + ProcessTable.last_status != ProcessStatus.COMPLETED.value, + ).count() + + +def get_failed_tasks() -> list[ProcessTable]: + """Get all tasks that have failed.""" + return ProcessTable.query.filter( + ProcessTable.is_task.is_(True), ProcessTable.last_status == ProcessStatus.FAILED.value + ).all() + + +def get_all_cleanup_tasks() -> list[WorkflowTable]: + """Get a list of all cleanup tasks that run on a schedule.""" + return WorkflowTable.query.filter( + or_(WorkflowTable.name == "task_clean_up_tasks", WorkflowTable.name == "task_clean_old_tasks") + ).all() + + +def get_created_and_completed_processes_by_id(workflow_id: UUIDstr) -> ScalarResult: + """Get all processes that are either created or completed, by workflow ID.""" + return db.session.scalars( + select(ProcessTable) + .filter(ProcessTable.is_task.is_(True)) + .filter(ProcessTable.workflow_id == workflow_id) + .filter( + or_( + ProcessTable.last_status == ProcessStatus.COMPLETED.value, + ProcessTable.last_status == ProcessStatus.CREATED.value, + ) + ) + ) diff --git a/gso/services/subscriptions.py b/gso/services/subscriptions.py index 85d39726..e0a6211c 100644 --- a/gso/services/subscriptions.py +++ b/gso/services/subscriptions.py @@ -1,7 +1,7 @@ """A collection of methods that make interaction with coreDB more straight-forward. This prevents someone from having to re-write database statements many times, that might turn out to be erroneous -or inconsistent when not careful. +or inconsistent when not careful. These methods relate to operations on subscriptions. """ from typing import Any @@ -9,7 +9,6 @@ from uuid import UUID from orchestrator.db import ( ProcessSubscriptionTable, - ProcessTable, ProductTable, ResourceTypeTable, SubscriptionInstanceTable, @@ -20,7 +19,6 @@ from orchestrator.db import ( from orchestrator.domain import SubscriptionModel from orchestrator.services.subscriptions import query_in_use_by_subscriptions from orchestrator.types import SubscriptionLifecycle -from orchestrator.workflow import ProcessStatus from pydantic_forms.types import UUIDstr from gso.products import ProductName, ProductType @@ -200,25 +198,6 @@ def get_active_subscriptions_by_field_and_value(field_name: str, field_value: st ) -def count_incomplete_validate_products() -> int: - """Count the number of incomplete validate_geant_products processes. - - :return: The count of incomplete 'validate_geant_products' processes. - :rtype: int - """ - return ProcessTable.query.filter( - ProcessTable.workflow_name == "validate_geant_products", - ProcessTable.last_status != ProcessStatus.COMPLETED.value, - ).count() - - -def get_failed_tasks() -> list[ProcessTable]: - """Get all tasks that have failed.""" - return ProcessTable.query.filter( - ProcessTable.is_task.is_(True), ProcessTable.last_status == ProcessStatus.FAILED.value - ).all() - - def get_subscription_by_process_id(process_id: str) -> SubscriptionModel | None: """Get a subscription from a process ID.""" subscription_table = ProcessSubscriptionTable.query.filter( diff --git a/gso/workflows/tasks/clean_old_tasks.py b/gso/workflows/tasks/clean_old_tasks.py index 963e108f..cdc9a0de 100644 --- a/gso/workflows/tasks/clean_old_tasks.py +++ b/gso/workflows/tasks/clean_old_tasks.py @@ -1,30 +1,19 @@ """A cleanup task for removing past runs.""" -from orchestrator.db import ProcessTable, WorkflowTable, db +from orchestrator.db import db from orchestrator.targets import Target -from orchestrator.workflow import ProcessStatus, StepList, begin, done, step, workflow -from sqlalchemy import or_, select +from orchestrator.workflow import StepList, begin, done, step, workflow + +from gso.services.processes import get_all_cleanup_tasks, get_created_and_completed_processes_by_id @step("Clean up all cleanup tasks") def remove_meta_tasks() -> None: """Find and remove runs of old cleanup tasks.""" - cleanup_tasks = WorkflowTable.query.filter( - or_(WorkflowTable.name == "task_clean_up_tasks", WorkflowTable.name == "task_clean_old_tasks") - ).all() + cleanup_tasks = get_all_cleanup_tasks() for cleanup_task in cleanup_tasks: - tasks = db.session.scalars( - select(ProcessTable) - .filter(ProcessTable.is_task.is_(True)) - .filter(ProcessTable.workflow_id == cleanup_task.workflow_id) - .filter( - or_( - ProcessTable.last_status == ProcessStatus.COMPLETED.value, - ProcessTable.last_status == ProcessStatus.CREATED.value, - ) - ) - ) + tasks = get_created_and_completed_processes_by_id(cleanup_task.workflow_id) for task in tasks: db.session.delete(task) diff --git a/gso/workflows/tasks/send_email_notifications.py b/gso/workflows/tasks/send_email_notifications.py index f346cf1f..42b94bd4 100644 --- a/gso/workflows/tasks/send_email_notifications.py +++ b/gso/workflows/tasks/send_email_notifications.py @@ -5,7 +5,8 @@ from orchestrator.types import State from orchestrator.workflow import StepList, conditional, done, init, step, workflow from gso.services.mailer import send_mail -from gso.services.subscriptions import get_failed_tasks, get_subscription_by_process_id +from gso.services.processes import get_failed_tasks +from gso.services.subscriptions import get_subscription_by_process_id from gso.settings import load_oss_params -- GitLab