Skip to content
Snippets Groups Projects
Verified Commit 5418d156 authored by Karel van Klink's avatar Karel van Klink :smiley_cat:
Browse files

Add task to send out email notifications for failed validation workflows

parent a66741e7
No related branches found
No related tags found
1 merge request!239Feature/send validation emails
"""Add email notification task.
Revision ID: 3111c27972af
Revises: 31fd1ae8d5bb
Create Date: 2024-07-29 17:38:37.786347
"""
from uuid import uuid4
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = '3111c27972af'
down_revision = '31fd1ae8d5bb'
branch_labels = None
depends_on = None
workflows = [
{
"name": "task_send_email_notifications",
"target": "SYSTEM",
"description": "Send email notifications for all failed tasks",
"workflow_id": uuid4(),
}
]
def upgrade() -> None:
conn = op.get_bind()
for workflow in workflows:
conn.execute(
sa.text(
"INSERT INTO workflows VALUES (:workflow_id, :name, :target, :description, now()) ON CONFLICT DO NOTHING"
),
workflow,
)
def downgrade() -> None:
conn = op.get_bind()
for workflow in workflows:
conn.execute(sa.text("DELETE FROM workflows WHERE name = :name"), {"name": workflow["name"]})
......@@ -85,7 +85,7 @@
"Application_2": "another_REALY_random_AND_3cure_T0keN"
},
"EMAIL": {
"from_address": "noreply@nren.local",
"from_address": "noreply@example.com",
"smtp_host": "smtp.nren.local",
"smtp_port": 487,
"starttls_enabled": true,
......
"""Task that sends out email notifications for failed tasks."""
from orchestrator.services.processes import start_process
from gso.schedules.scheduling import CronScheduleConfig, scheduler
from gso.worker import celery
@celery.task
@scheduler(CronScheduleConfig(name="Send email notifications", minute="30"))
def send_email_notifications() -> None:
"""Run this task every hour on the half hour."""
start_process("task_send_email_notifications")
......@@ -8,6 +8,7 @@ from typing import Any
from uuid import UUID
from orchestrator.db import (
ProcessSubscriptionTable,
ProcessTable,
ProductTable,
ResourceTypeTable,
......@@ -203,6 +204,21 @@ def count_incomplete_validate_products() -> int:
).count()
def get_failed_tasks() -> list[ProcessTable]:
"""Get all tasks that have failed."""
return ProcessTable.query.filter(
ProcessTable.is_task == "true",
ProcessTable.last_status == "failed",
).all()
def get_subscription_id_by_process_id(process_id: str) -> UUIDstr:
"""Get a subscription ID from a process ID."""
return (
ProcessSubscriptionTable.query.filter(ProcessSubscriptionTable.process_id == process_id).first().subscription_id
)
def get_insync_subscriptions() -> list[SubscriptionTable]:
"""Retrieve all subscriptions that are currently in sync."""
return SubscriptionTable.query.join(ProductTable).filter(SubscriptionTable.insync.is_(True)).all()
......
......@@ -67,6 +67,7 @@
"import_opengear": "NOT FOR HUMANS -- Finalize import into an OpenGear",
"validate_iptrunk": "Validate IP Trunk configuration",
"validate_router": "Validate router configuration",
"task_validate_geant_products": "Validation task for GEANT products"
"task_validate_geant_products": "Validation task for GEANT products",
"task_send_email_notifications": "Send email notifications for failed tasks"
}
}
......@@ -67,4 +67,5 @@ LazyWorkflowInstance("gso.workflows.opengear.create_imported_opengear", "create_
LazyWorkflowInstance("gso.workflows.opengear.import_opengear", "import_opengear")
# Tasks
LazyWorkflowInstance("gso.workflows.tasks.send_email_notifications", "task_send_email_notifications")
LazyWorkflowInstance("gso.workflows.tasks.validate_geant_products", "task_validate_geant_products")
"""Send email notifications for all tasks that have failed in the last hour.
This task must be scheduled to run every hour to prevent missing out on notifications.
"""
from datetime import UTC, datetime
from orchestrator.domain import SubscriptionModel
from orchestrator.targets import Target
from orchestrator.types import State
from orchestrator.workflow import StepList, done, init, step, workflow
from gso.services.mailer import send_mail
from gso.services.subscriptions import get_failed_tasks, get_subscription_id_by_process_id
from gso.settings import load_oss_params
@step("Gather all tasks that recently failed")
def gather_failed_tasks() -> State:
"""Gather all tasks from the last hour that have failed."""
current_time = datetime.now(UTC)
failed_tasks = list(filter(lambda task: (current_time - task.last_modified_at).hour <= 1, get_failed_tasks()))
return {"failed_tasks": failed_tasks}
@step("Send notification emails for all failed tasks")
def send_email_notifications(state: State) -> None:
"""Send out an email notification for every task that has failed in the last hour."""
base_url = load_oss_params().GENERAL.public_hostname
for failure in state["failed_tasks"]:
subscription_id = get_subscription_id_by_process_id(failure["process_id"])
failed_subscription = SubscriptionModel.from_subscription(subscription_id)
failed_task_url = f"{base_url}/workflows/{failure["process_id"]}"
send_mail(
"GAP - A task has failed!",
f"""A {failed_subscription.product.name} validation task has failed to run to completion.
Product name: {failed_subscription.product.name}
Description: {failed_subscription.description}
The step "{failure["last_step"]}" failed for the following reason: "{failure["failed_reason"]}".
Please inspect the full workflow at the following link: {failed_task_url}.
""",
)
@workflow("Send email notifications for all failed tasks", target=Target.SYSTEM)
def task_send_email_notifications() -> StepList:
"""Gather all failed tasks from the last hour, and send email notifications."""
return init >> gather_failed_tasks >> send_email_notifications >> done
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment