Skip to content
Snippets Groups Projects
Commit 0b944476 authored by geant-release-service's avatar geant-release-service
Browse files

Finished release 2.43.

parents 7c8a484f 4d9ff6f9
No related branches found
No related tags found
No related merge requests found
Pipeline #92504 passed
# Changelog
## [2.43] - 2025-03-14
- fix send email notification for failed prefix list checks.
## [2.42] - 2025-03-13
- Send a separate notification email for failed prefix list checks.
- Upgrade `orchestrator-core` to 3.1.1.
......
......@@ -5,8 +5,6 @@ of subscription validation workflows. The second email contains an overview of t
failed.
"""
from typing import Any
from orchestrator.targets import Target
from orchestrator.workflow import StepList, conditional, done, init, step, workflow
from pydantic_forms.types import State
......@@ -23,11 +21,20 @@ def gather_failed_tasks() -> State:
failed_prefix_list_tasks = get_failed_tasks_by_workflow_name("validate_prefix_list")
all_other_tasks = list(set(get_failed_tasks()) - set(failed_prefix_list_tasks))
return {"failed_tasks": all_other_tasks, "failed_prefix_list_checks": failed_prefix_list_tasks}
return {
"failed_tasks": [
{"process_id": failure.process_id, "last_step": failure.last_step, "failed_reason": failure.failed_reason}
for failure in all_other_tasks
],
"failed_prefix_list_checks": [
{"process_id": failure.process_id, "last_step": failure.last_step, "failed_reason": failure.failed_reason}
for failure in failed_prefix_list_tasks
],
}
@step("Send notification email for failed tasks")
def send_email_notification(failed_tasks: list[dict[str, Any]]) -> None:
def send_email_notification(failed_tasks: list[dict]) -> None:
"""Send out an email notification for all tasks that have failed."""
general_settings = load_oss_params().GENERAL
all_alerts = ""
......@@ -56,7 +63,7 @@ def send_email_notification(failed_tasks: list[dict[str, Any]]) -> None:
@step("Send notification emails for failed prefix list tasks")
def send_prefix_list_email_notification(failed_prefix_list_checks: list[dict[str, Any]]) -> None:
def send_prefix_list_email_notification(failed_prefix_list_checks: list[dict]) -> None:
"""Send out an email notification for all prefix list validation tasks that have failed."""
general_settings = load_oss_params().GENERAL
all_alerts = ""
......
......@@ -4,7 +4,7 @@ from setuptools import find_packages, setup
setup(
name="geant-service-orchestrator",
version="2.42",
version="2.43",
author="GÉANT Orchestration and Automation Team",
author_email="goat@geant.org",
description="GÉANT Service Orchestrator",
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment