Skip to content
Snippets Groups Projects
Verified Commit d37b2699 authored by Karel van Klink's avatar Karel van Klink :smiley_cat:
Browse files

Update timing of cleanup tasks, move db queries to processes service

parent bc683c5c
No related branches found
No related tags found
No related merge requests found
......@@ -7,7 +7,7 @@ from gso.worker import celery
@celery.task
@scheduler(CronScheduleConfig(name="Clean up tasks", hour="3"))
@scheduler(CronScheduleConfig(name="Clean up tasks", hour="23"))
def clean_old_tasks() -> None:
"""Run all cleanup tasks every 3 AM UTC."""
"""Run all cleanup tasks every 11 PM UTC."""
start_process("task_clean_old_tasks")
......@@ -7,7 +7,7 @@ from gso.worker import celery
@celery.task
@scheduler(CronScheduleConfig(name="Clean up tasks", hour="*/6"))
@scheduler(CronScheduleConfig(name="Clean up tasks", hour="1"))
def vacuum_tasks() -> None:
"""Run all cleanup tasks every 6 hours."""
"""Run all cleanup tasks every 1 AM UTC."""
start_process("task_clean_up_tasks")
......@@ -3,7 +3,7 @@
from orchestrator.services.processes import start_process
from gso.schedules.scheduling import CronScheduleConfig, scheduler
from gso.services.subscriptions import count_incomplete_validate_products
from gso.services.processes import count_incomplete_validate_products
from gso.worker import celery
......
"""A collection of methods that make interaction with coreDB more straight-forward.
This prevents someone from having to re-write database statements many times, that might turn out to be erroneous
or inconsistent when not careful. These methods are related to operations regarding processes and workflows.
"""
from orchestrator.db import ProcessTable, WorkflowTable, db
from orchestrator.devtools.populator import UUIDstr
from orchestrator.workflow import ProcessStatus
from sqlalchemy import ScalarResult, or_, select
def count_incomplete_validate_products() -> int:
"""Count the number of incomplete validate_geant_products processes.
:return: The count of incomplete 'validate_geant_products' processes.
:rtype: int
"""
return ProcessTable.query.filter(
ProcessTable.workflow_name == "validate_geant_products",
ProcessTable.last_status != ProcessStatus.COMPLETED.value,
).count()
def get_failed_tasks() -> list[ProcessTable]:
"""Get all tasks that have failed."""
return ProcessTable.query.filter(
ProcessTable.is_task.is_(True), ProcessTable.last_status == ProcessStatus.FAILED.value
).all()
def get_all_cleanup_tasks() -> list[WorkflowTable]:
"""Get a list of all cleanup tasks that run on a schedule."""
return WorkflowTable.query.filter(
or_(WorkflowTable.name == "task_clean_up_tasks", WorkflowTable.name == "task_clean_old_tasks")
).all()
def get_created_and_completed_processes_by_id(workflow_id: UUIDstr) -> ScalarResult:
"""Get all processes that are either created or completed, by workflow ID."""
return db.session.scalars(
select(ProcessTable)
.filter(ProcessTable.is_task.is_(True))
.filter(ProcessTable.workflow_id == workflow_id)
.filter(
or_(
ProcessTable.last_status == ProcessStatus.COMPLETED.value,
ProcessTable.last_status == ProcessStatus.CREATED.value,
)
)
)
"""A collection of methods that make interaction with coreDB more straight-forward.
This prevents someone from having to re-write database statements many times, that might turn out to be erroneous
or inconsistent when not careful.
or inconsistent when not careful. These methods relate to operations on subscriptions.
"""
from typing import Any
......@@ -9,7 +9,6 @@ from uuid import UUID
from orchestrator.db import (
ProcessSubscriptionTable,
ProcessTable,
ProductTable,
ResourceTypeTable,
SubscriptionInstanceTable,
......@@ -20,7 +19,6 @@ from orchestrator.db import (
from orchestrator.domain import SubscriptionModel
from orchestrator.services.subscriptions import query_in_use_by_subscriptions
from orchestrator.types import SubscriptionLifecycle
from orchestrator.workflow import ProcessStatus
from pydantic_forms.types import UUIDstr
from gso.products import ProductName, ProductType
......@@ -200,25 +198,6 @@ def get_active_subscriptions_by_field_and_value(field_name: str, field_value: st
)
def count_incomplete_validate_products() -> int:
"""Count the number of incomplete validate_geant_products processes.
:return: The count of incomplete 'validate_geant_products' processes.
:rtype: int
"""
return ProcessTable.query.filter(
ProcessTable.workflow_name == "validate_geant_products",
ProcessTable.last_status != ProcessStatus.COMPLETED.value,
).count()
def get_failed_tasks() -> list[ProcessTable]:
"""Get all tasks that have failed."""
return ProcessTable.query.filter(
ProcessTable.is_task.is_(True), ProcessTable.last_status == ProcessStatus.FAILED.value
).all()
def get_subscription_by_process_id(process_id: str) -> SubscriptionModel | None:
"""Get a subscription from a process ID."""
subscription_table = ProcessSubscriptionTable.query.filter(
......
"""A cleanup task for removing past runs."""
from orchestrator.db import ProcessTable, WorkflowTable, db
from orchestrator.db import db
from orchestrator.targets import Target
from orchestrator.workflow import ProcessStatus, StepList, begin, done, step, workflow
from sqlalchemy import or_, select
from orchestrator.workflow import StepList, begin, done, step, workflow
from gso.services.processes import get_all_cleanup_tasks, get_created_and_completed_processes_by_id
@step("Clean up all cleanup tasks")
def remove_meta_tasks() -> None:
"""Find and remove runs of old cleanup tasks."""
cleanup_tasks = WorkflowTable.query.filter(
or_(WorkflowTable.name == "task_clean_up_tasks", WorkflowTable.name == "task_clean_old_tasks")
).all()
cleanup_tasks = get_all_cleanup_tasks()
for cleanup_task in cleanup_tasks:
tasks = db.session.scalars(
select(ProcessTable)
.filter(ProcessTable.is_task.is_(True))
.filter(ProcessTable.workflow_id == cleanup_task.workflow_id)
.filter(
or_(
ProcessTable.last_status == ProcessStatus.COMPLETED.value,
ProcessTable.last_status == ProcessStatus.CREATED.value,
)
)
)
tasks = get_created_and_completed_processes_by_id(cleanup_task.workflow_id)
for task in tasks:
db.session.delete(task)
......
......@@ -5,7 +5,8 @@ from orchestrator.types import State
from orchestrator.workflow import StepList, conditional, done, init, step, workflow
from gso.services.mailer import send_mail
from gso.services.subscriptions import get_failed_tasks, get_subscription_by_process_id
from gso.services.processes import get_failed_tasks
from gso.services.subscriptions import get_subscription_by_process_id
from gso.settings import load_oss_params
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment