From 5418d156a64c23afb4398cf95b92d4f84013ec2b Mon Sep 17 00:00:00 2001 From: Karel van Klink <karel.vanklink@geant.org> Date: Tue, 30 Jul 2024 13:17:18 +0200 Subject: [PATCH] Add task to send out email notifications for failed validation workflows --- ...111c27972af_add_email_notification_task.py | 44 +++++++++++++++ gso/oss-params-example.json | 2 +- gso/schedules/send_email_notifications.py | 13 +++++ gso/services/subscriptions.py | 16 ++++++ gso/translations/en-GB.json | 3 +- gso/workflows/__init__.py | 1 + .../tasks/send_email_notifications.py | 53 +++++++++++++++++++ 7 files changed, 130 insertions(+), 2 deletions(-) create mode 100644 gso/migrations/versions/2024-07-29_3111c27972af_add_email_notification_task.py create mode 100644 gso/schedules/send_email_notifications.py create mode 100644 gso/workflows/tasks/send_email_notifications.py diff --git a/gso/migrations/versions/2024-07-29_3111c27972af_add_email_notification_task.py b/gso/migrations/versions/2024-07-29_3111c27972af_add_email_notification_task.py new file mode 100644 index 00000000..4b4b8061 --- /dev/null +++ b/gso/migrations/versions/2024-07-29_3111c27972af_add_email_notification_task.py @@ -0,0 +1,44 @@ +"""Add email notification task. + +Revision ID: 3111c27972af +Revises: 31fd1ae8d5bb +Create Date: 2024-07-29 17:38:37.786347 + +""" + +from uuid import uuid4 + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = '3111c27972af' +down_revision = '31fd1ae8d5bb' +branch_labels = None +depends_on = None + +workflows = [ + { + "name": "task_send_email_notifications", + "target": "SYSTEM", + "description": "Send email notifications for all failed tasks", + "workflow_id": uuid4(), + } +] + + +def upgrade() -> None: + conn = op.get_bind() + for workflow in workflows: + conn.execute( + sa.text( + "INSERT INTO workflows VALUES (:workflow_id, :name, :target, :description, now()) ON CONFLICT DO NOTHING" + ), + workflow, + ) + + +def downgrade() -> None: + conn = op.get_bind() + for workflow in workflows: + conn.execute(sa.text("DELETE FROM workflows WHERE name = :name"), {"name": workflow["name"]}) diff --git a/gso/oss-params-example.json b/gso/oss-params-example.json index 902659e9..1eacbb1a 100644 --- a/gso/oss-params-example.json +++ b/gso/oss-params-example.json @@ -85,7 +85,7 @@ "Application_2": "another_REALY_random_AND_3cure_T0keN" }, "EMAIL": { - "from_address": "noreply@nren.local", + "from_address": "noreply@example.com", "smtp_host": "smtp.nren.local", "smtp_port": 487, "starttls_enabled": true, diff --git a/gso/schedules/send_email_notifications.py b/gso/schedules/send_email_notifications.py new file mode 100644 index 00000000..f554052a --- /dev/null +++ b/gso/schedules/send_email_notifications.py @@ -0,0 +1,13 @@ +"""Task that sends out email notifications for failed tasks.""" + +from orchestrator.services.processes import start_process + +from gso.schedules.scheduling import CronScheduleConfig, scheduler +from gso.worker import celery + + +@celery.task +@scheduler(CronScheduleConfig(name="Send email notifications", minute="30")) +def send_email_notifications() -> None: + """Run this task every hour on the half hour.""" + start_process("task_send_email_notifications") diff --git a/gso/services/subscriptions.py b/gso/services/subscriptions.py index ead378a7..b0b84c09 100644 --- a/gso/services/subscriptions.py +++ b/gso/services/subscriptions.py @@ -8,6 +8,7 @@ from typing import Any from uuid import UUID from orchestrator.db import ( + ProcessSubscriptionTable, ProcessTable, ProductTable, ResourceTypeTable, @@ -203,6 +204,21 @@ def count_incomplete_validate_products() -> int: ).count() +def get_failed_tasks() -> list[ProcessTable]: + """Get all tasks that have failed.""" + return ProcessTable.query.filter( + ProcessTable.is_task == "true", + ProcessTable.last_status == "failed", + ).all() + + +def get_subscription_id_by_process_id(process_id: str) -> UUIDstr: + """Get a subscription ID from a process ID.""" + return ( + ProcessSubscriptionTable.query.filter(ProcessSubscriptionTable.process_id == process_id).first().subscription_id + ) + + def get_insync_subscriptions() -> list[SubscriptionTable]: """Retrieve all subscriptions that are currently in sync.""" return SubscriptionTable.query.join(ProductTable).filter(SubscriptionTable.insync.is_(True)).all() diff --git a/gso/translations/en-GB.json b/gso/translations/en-GB.json index 81af4e6b..34420d58 100644 --- a/gso/translations/en-GB.json +++ b/gso/translations/en-GB.json @@ -67,6 +67,7 @@ "import_opengear": "NOT FOR HUMANS -- Finalize import into an OpenGear", "validate_iptrunk": "Validate IP Trunk configuration", "validate_router": "Validate router configuration", - "task_validate_geant_products": "Validation task for GEANT products" + "task_validate_geant_products": "Validation task for GEANT products", + "task_send_email_notifications": "Send email notifications for failed tasks" } } diff --git a/gso/workflows/__init__.py b/gso/workflows/__init__.py index b6072c1c..99231170 100644 --- a/gso/workflows/__init__.py +++ b/gso/workflows/__init__.py @@ -67,4 +67,5 @@ LazyWorkflowInstance("gso.workflows.opengear.create_imported_opengear", "create_ LazyWorkflowInstance("gso.workflows.opengear.import_opengear", "import_opengear") # Tasks +LazyWorkflowInstance("gso.workflows.tasks.send_email_notifications", "task_send_email_notifications") LazyWorkflowInstance("gso.workflows.tasks.validate_geant_products", "task_validate_geant_products") diff --git a/gso/workflows/tasks/send_email_notifications.py b/gso/workflows/tasks/send_email_notifications.py new file mode 100644 index 00000000..0974c438 --- /dev/null +++ b/gso/workflows/tasks/send_email_notifications.py @@ -0,0 +1,53 @@ +"""Send email notifications for all tasks that have failed in the last hour. + +This task must be scheduled to run every hour to prevent missing out on notifications. +""" + +from datetime import UTC, datetime + +from orchestrator.domain import SubscriptionModel +from orchestrator.targets import Target +from orchestrator.types import State +from orchestrator.workflow import StepList, done, init, step, workflow + +from gso.services.mailer import send_mail +from gso.services.subscriptions import get_failed_tasks, get_subscription_id_by_process_id +from gso.settings import load_oss_params + + +@step("Gather all tasks that recently failed") +def gather_failed_tasks() -> State: + """Gather all tasks from the last hour that have failed.""" + current_time = datetime.now(UTC) + + failed_tasks = list(filter(lambda task: (current_time - task.last_modified_at).hour <= 1, get_failed_tasks())) + + return {"failed_tasks": failed_tasks} + + +@step("Send notification emails for all failed tasks") +def send_email_notifications(state: State) -> None: + """Send out an email notification for every task that has failed in the last hour.""" + base_url = load_oss_params().GENERAL.public_hostname + for failure in state["failed_tasks"]: + subscription_id = get_subscription_id_by_process_id(failure["process_id"]) + failed_subscription = SubscriptionModel.from_subscription(subscription_id) + failed_task_url = f"{base_url}/workflows/{failure["process_id"]}" + + send_mail( + "GAP - A task has failed!", + f"""A {failed_subscription.product.name} validation task has failed to run to completion. + +Product name: {failed_subscription.product.name} +Description: {failed_subscription.description} +The step "{failure["last_step"]}" failed for the following reason: "{failure["failed_reason"]}". + +Please inspect the full workflow at the following link: {failed_task_url}. +""", + ) + + +@workflow("Send email notifications for all failed tasks", target=Target.SYSTEM) +def task_send_email_notifications() -> StepList: + """Gather all failed tasks from the last hour, and send email notifications.""" + return init >> gather_failed_tasks >> send_email_notifications >> done -- GitLab