Skip to content
Snippets Groups Projects

Feature/send validation emails

Merged Karel van Klink requested to merge feature/send-validation-emails into develop
All threads resolved!
1 file
+ 3
3
Compare changes
  • Side-by-side
  • Inline
@@ -8,6 +8,7 @@ from typing import Any
from uuid import UUID
from orchestrator.db import (
ProcessSubscriptionTable,
ProcessTable,
ProductTable,
ResourceTypeTable,
@@ -15,8 +16,10 @@ from orchestrator.db import (
SubscriptionInstanceValueTable,
SubscriptionTable,
)
from orchestrator.domain import SubscriptionModel
from orchestrator.services.subscriptions import query_in_use_by_subscriptions
from orchestrator.types import SubscriptionLifecycle
from orchestrator.workflow import ProcessStatus
from pydantic_forms.types import UUIDstr
from gso.products import ProductName, ProductType
@@ -192,17 +195,32 @@ def get_active_subscriptions_by_field_and_value(field_name: str, field_value: st
def count_incomplete_validate_products() -> int:
"""Count the number of incomplete validate_products processes.
"""Count the number of incomplete validate_geant_products processes.
:return: The count of incomplete 'validate_products' processes.
:return: The count of incomplete 'validate_geant_products' processes.
:rtype: int
"""
return ProcessTable.query.filter(
ProcessTable.workflow_name == "validate_products",
ProcessTable.last_status != "completed",
ProcessTable.workflow_name == "validate_geant_products",
ProcessTable.last_status != ProcessStatus.COMPLETED.value,
).count()
def get_failed_tasks() -> list[ProcessTable]:
"""Get all tasks that have failed."""
return ProcessTable.query.filter(
ProcessTable.is_task.is_(True), ProcessTable.last_status == ProcessStatus.FAILED.value
).all()
def get_subscription_by_process_id(process_id: str) -> SubscriptionModel | None:
"""Get a subscription from a process ID."""
subscription_table = ProcessSubscriptionTable.query.filter(
ProcessSubscriptionTable.process_id == process_id
).first()
return SubscriptionModel.from_subscription(subscription_table.subscription_id) if subscription_table else None
def get_insync_subscriptions() -> list[SubscriptionTable]:
"""Retrieve all subscriptions that are currently in sync."""
return SubscriptionTable.query.join(ProductTable).filter(SubscriptionTable.insync.is_(True)).all()
Loading