Skip to content
Snippets Groups Projects
Verified Commit 8d0e9017 authored by Karel van Klink's avatar Karel van Klink :otter:
Browse files

Fix serialization problem in email notification task

parent 55923663
No related branches found
No related tags found
1 merge request!379Fix serialization problem in email notification task
Pipeline #92496 passed
......@@ -5,8 +5,6 @@ of subscription validation workflows. The second email contains an overview of t
failed.
"""
from typing import Any
from orchestrator.targets import Target
from orchestrator.workflow import StepList, conditional, done, init, step, workflow
from pydantic_forms.types import State
......@@ -23,15 +21,24 @@ def gather_failed_tasks() -> State:
failed_prefix_list_tasks = get_failed_tasks_by_workflow_name("validate_prefix_list")
all_other_tasks = list(set(get_failed_tasks()) - set(failed_prefix_list_tasks))
return {"failed_tasks": all_other_tasks, "failed_prefix_list_checks": failed_prefix_list_tasks}
return {
"failed_tasks": [
{"process_id": failure.process_id, "last_step": failure.last_step, "failed_reason": failure.failed_reason}
for failure in all_other_tasks
],
"failed_prefix_list_checks": [
{"process_id": failure.process_id, "last_step": failure.last_step, "failed_reason": failure.failed_reason}
for failure in failed_prefix_list_tasks
],
}
@step("Send notification email for failed tasks")
def send_email_notification(failed_tasks: list[dict[str, Any]]) -> None:
def send_email_notification(state: State) -> None:
"""Send out an email notification for all tasks that have failed."""
general_settings = load_oss_params().GENERAL
all_alerts = ""
for failure in failed_tasks:
for failure in state["failed_tasks"]:
failed_task_url = f"{general_settings.public_hostname}/workflows/{failure["process_id"]}"
failed_subscription = get_subscription_by_process_id(failure["process_id"])
all_alerts = f"{all_alerts}------\n\n"
......@@ -56,11 +63,11 @@ def send_email_notification(failed_tasks: list[dict[str, Any]]) -> None:
@step("Send notification emails for failed prefix list tasks")
def send_prefix_list_email_notification(failed_prefix_list_checks: list[dict[str, Any]]) -> None:
def send_prefix_list_email_notification(state: State) -> None:
"""Send out an email notification for all prefix list validation tasks that have failed."""
general_settings = load_oss_params().GENERAL
all_alerts = ""
for failure in failed_prefix_list_checks:
for failure in state["failed_prefix_list_checks"]:
failed_task_url = f"{general_settings.public_hostname}/workflows/{failure["process_id"]}"
failed_subscription = get_subscription_by_process_id(failure["process_id"])
all_alerts = f"{all_alerts}------\n\n"
......
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment