Skip to content
Snippets Groups Projects
Verified Commit 04353eb8 authored by Karel van Klink's avatar Karel van Klink :smiley_cat:
Browse files

Send one email per day, with all failed tasks

Prevent unit tests from sending emails
parent ae90aae5
No related branches found
No related tags found
1 merge request!239Feature/send validation emails
Pipeline #88150 passed
......@@ -7,7 +7,7 @@ from gso.worker import celery
@celery.task
@scheduler(CronScheduleConfig(name="Send email notifications", minute="30"))
@scheduler(CronScheduleConfig(name="Send email notifications", hour="2", minute="30"))
def send_email_notifications() -> None:
"""Run this task every hour on the half hour."""
"""Run this task every night at 2:30 AM."""
start_process("task_send_email_notifications")
......@@ -16,6 +16,7 @@ from orchestrator.db import (
SubscriptionInstanceValueTable,
SubscriptionTable,
)
from orchestrator.domain import SubscriptionModel
from orchestrator.services.subscriptions import query_in_use_by_subscriptions
from orchestrator.types import SubscriptionLifecycle
from pydantic_forms.types import UUIDstr
......@@ -206,17 +207,15 @@ def count_incomplete_validate_products() -> int:
def get_failed_tasks() -> list[ProcessTable]:
"""Get all tasks that have failed."""
return ProcessTable.query.filter(
ProcessTable.is_task == "true",
ProcessTable.last_status == "failed",
).all()
return ProcessTable.query.filter(ProcessTable.is_task.is_(True), ProcessTable.last_status == "failed").all()
def get_subscription_id_by_process_id(process_id: str) -> UUIDstr:
"""Get a subscription ID from a process ID."""
return (
ProcessSubscriptionTable.query.filter(ProcessSubscriptionTable.process_id == process_id).first().subscription_id
)
def get_subscription_by_process_id(process_id: str) -> SubscriptionModel | None:
"""Get a subscription from a process ID."""
subscription_table = ProcessSubscriptionTable.query.filter(
ProcessSubscriptionTable.process_id == process_id
).first()
return SubscriptionModel.from_subscription(subscription_table.subscription_id) if subscription_table else None
def get_insync_subscriptions() -> list[SubscriptionTable]:
......
"""Send email notifications for all tasks that have failed in the last hour.
"""Send email notifications for all tasks that have failed."""
This task must be scheduled to run every hour to prevent missing out on notifications.
"""
from datetime import UTC, datetime
from orchestrator.domain import SubscriptionModel
from orchestrator.targets import Target
from orchestrator.types import State
from orchestrator.workflow import StepList, done, init, step, workflow
from orchestrator.workflow import StepList, conditional, done, init, step, workflow
from gso.services.mailer import send_mail
from gso.services.subscriptions import get_failed_tasks, get_subscription_id_by_process_id
from gso.services.subscriptions import get_failed_tasks, get_subscription_by_process_id
from gso.settings import load_oss_params
@step("Gather all tasks that recently failed")
def gather_failed_tasks() -> State:
"""Gather all tasks from the last hour that have failed."""
current_time = datetime.now(UTC)
failed_tasks = list(filter(lambda task: (current_time - task.last_modified_at).hour <= 1, get_failed_tasks()))
return {"failed_tasks": failed_tasks}
"""Gather all tasks that have failed."""
return {"failed_tasks": get_failed_tasks()}
@step("Send notification emails for all failed tasks")
def send_email_notifications(state: State) -> None:
"""Send out an email notification for every task that has failed in the last hour."""
"""Send out an email notification for all tasks that have failed."""
base_url = load_oss_params().GENERAL.public_hostname
all_alerts = ""
for failure in state["failed_tasks"]:
subscription_id = get_subscription_id_by_process_id(failure["process_id"])
failed_subscription = SubscriptionModel.from_subscription(subscription_id)
failed_task_url = f"{base_url}/workflows/{failure["process_id"]}"
send_mail(
"GAP - A task has failed!",
f"""A {failed_subscription.product.name} validation task has failed to run to completion.
Product name: {failed_subscription.product.name}
Description: {failed_subscription.description}
The step "{failure["last_step"]}" failed for the following reason: "{failure["failed_reason"]}".
Please inspect the full workflow at the following link: {failed_task_url}.
""",
failed_subscription = get_subscription_by_process_id(failure["process_id"])
all_alerts += "------\n\n"
if failed_subscription:
all_alerts += (
f"Product name: {failed_subscription.product.name}\nDescription: {failed_subscription.description}\n"
)
all_alerts += (
f'The step "{failure["last_step"]}" failed for the following reason: "{failure["failed_reason"]}".\n\n'
f'Please inspect the full workflow at the following link: {failed_task_url}.\n\n'
)
send_mail(
"GAP - One or more tasks have failed!",
(
f"Please check the following tasks in GAP which have failed.\n\n{all_alerts}------"
f"\n\nRegards, the GÉANT Automation Platform.\n\n"
),
)
@workflow("Send email notifications for all failed tasks", target=Target.SYSTEM)
def task_send_email_notifications() -> StepList:
"""Gather all failed tasks from the last hour, and send email notifications."""
return init >> gather_failed_tasks >> send_email_notifications >> done
"""Gather all failed tasks, and send an email notification if needed."""
tasks_have_failed = conditional(lambda state: len(state["failed_tasks"]) > 0)
return init >> gather_failed_tasks >> tasks_have_failed(send_email_notifications) >> done
......@@ -540,3 +540,9 @@ def responses():
not_used = set(mocked_urls) - set(used_urls)
if not_used:
pytest.fail(f"Found unused responses mocks: {not_used}", pytrace=False)
@pytest.fixture(autouse=True)
def _no_mail(monkeypatch):
"""Remove sending mails from all tests."""
monkeypatch.delattr("smtplib.SMTP")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment