Skip to content
Snippets Groups Projects
Commit eaea842c authored by Karel van Klink's avatar Karel van Klink :smiley_cat: Committed by Mohammad Torkashvand
Browse files

Update Kentik handling in router termination workflow

Expand error handling when failing to apply the archiving license, including sending emails to custom destinations.
parent 90d1209f
Branches
Tags
1 merge request!396Update Kentik handling in router validation and termination
......@@ -95,7 +95,8 @@
"starttls_enabled": true,
"smtp_username": "username",
"smtp_password": "password",
"notification_email_destinations": "oc@nren.local, neteng@nren.local, ceo@nren.local"
"notification_email_destinations": "oc@nren.local, neteng@nren.local, ceo@nren.local",
"kentik_email_destinations": "service-management-team@nren.local, operations-team@nren.local"
},
"SHAREPOINT": {
"client_id": "UUID",
......
......@@ -84,6 +84,8 @@ class KentikClient:
devices = self.get_devices()
for device in devices:
if device["device_name"] == device_name:
device.pop("custom_columns", None)
device.pop("custom_column_data", None)
return device
return {}
......
"""The mailer service sends notification emails, as part of workflows that require interaction with external parties."""
import logging
import smtplib
from email.message import EmailMessage
from ssl import create_default_context
from gso.settings import load_oss_params
logger = logging.getLogger(__name__)
def send_mail(subject: str, body: str) -> None:
def send_mail(subject: str, body: str, *, destination: str | None = None) -> None:
"""Send an email message to the given addresses.
Only supports STARTTLS, not SSL.
......@@ -15,11 +18,12 @@ def send_mail(subject: str, body: str) -> None:
Args:
subject: The email subject.
body: The contents of the email message.
destination: The destination of the email, optional.
"""
email_params = load_oss_params().EMAIL
msg = EmailMessage()
msg["From"] = email_params.from_address
msg["To"] = email_params.notification_email_destinations
msg["To"] = destination or email_params.notification_email_destinations
msg["Subject"] = subject
msg.set_content(body)
......@@ -30,3 +34,11 @@ def send_mail(subject: str, body: str) -> None:
if email_params.smtp_username and email_params.smtp_password:
s.login(email_params.smtp_username, email_params.smtp_password)
s.send_message(msg)
logger.info({
"event": "Sent an email",
"from": msg["From"],
"to": msg["To"],
"subject": msg["Subject"],
"body": body,
})
......@@ -197,7 +197,7 @@ def get_active_l3_services_linked_to_edge_port(edge_port_id: UUIDstr) -> list[Su
.join(ProductTable)
.filter(
and_(
ProductTable.product_type == L3_CORE_SERVICE_PRODUCT_TYPE,
ProductTable.product_type.in_([L3_CORE_SERVICE_PRODUCT_TYPE]),
SubscriptionTable.status == SubscriptionLifecycle.ACTIVE,
)
)
......@@ -210,6 +210,8 @@ def get_active_l3_services_linked_to_edge_port(edge_port_id: UUIDstr) -> list[Su
def get_active_layer_3_services_on_router(subscription_id: UUID) -> list[SubscriptionModel]:
"""Get all active Layer 3 services that insist on a given router `subscription_id`.
TODO: Update this method when refactoring layer 3 services.
Args:
subscription_id: Subscription ID of a Router.
......@@ -230,7 +232,7 @@ def get_active_layer_3_services_on_router(subscription_id: UUID) -> list[Subscri
active_l3_services = []
for edge_port in active_edge_ports:
active_l3_services += get_active_l3_services_linked_to_edge_port(str(edge_port.subscription_id))
active_l3_services.extend(get_active_l3_services_linked_to_edge_port(str(edge_port.subscription_id)))
return active_l3_services
......
......@@ -171,6 +171,8 @@ class EmailParams(BaseSettings):
Attributes:
notification_email_destinations: List of email addresses that should receive notifications when validation of a
subscription fails. Can be a comma-separated list of multiple addresses.
kentik_email_destinations: A List of email addresses formatted similarly, but for notifications related to
Kentik.
"""
from_address: EmailStr
......@@ -180,6 +182,7 @@ class EmailParams(BaseSettings):
smtp_username: str | None = None
smtp_password: str | None = None
notification_email_destinations: str
kentik_email_destinations: str
class SharepointParams(BaseSettings):
......
......@@ -42,6 +42,7 @@ from gso.services import infoblox
from gso.services.kentik_client import KentikClient
from gso.services.librenms_client import LibreNMSClient
from gso.services.lso_client import LSOState, lso_interaction
from gso.services.mailer import send_mail
from gso.services.netbox_client import NetboxClient
from gso.settings import load_oss_params
from gso.utils.helpers import generate_inventory_for_routers
......@@ -252,26 +253,61 @@ def remove_device_from_librenms(subscription: Router) -> State:
@step("Apply the archiving license in Kentik")
def kentik_apply_archive_license(subscription: Router) -> State:
def kentik_apply_archive_license(subscription: Router, process_id: UUIDstr) -> State:
"""Apply the archiving license to a PE router in Kentik.
This includes setting the flow rate to one flow per second.
This includes setting the flow rate to one flow per second, and the BGP type to `none`. Service Management will also
be emailed to inform them of an archiving license being consumed. If this step is unsuccessful, an email is sent to
inform them as well. This could be caused by a device being missing in Kentik, or having no more licenses available.
"""
kentik_client = KentikClient()
kentik_archive_plan_id = kentik_client.get_plan_by_name(load_oss_params().KENTIK.archive_license_key)["id"]
oss_params = load_oss_params()
kentik_device = kentik_client.get_device_by_name(subscription.router.router_fqdn)
# Attempt fetching the device from Kentik.
if "id" not in kentik_device and subscription.router.vendor == Vendor.JUNIPER:
# If the device is a Juniper, there is a chance that the FQDN is written with underscores as delimiter.
# We try again when fetching the device was unsuccessful the first time.
kentik_device = kentik_client.get_device_by_name(subscription.router.router_fqdn.replace(".", "_"))
# If still unsuccessful after two attempts, we give up and alert externally by email.
if "id" not in kentik_device:
return {
"kentik_device": f"Device {subscription.router.router_fqdn} not found in Kentik, no license applied! "
f"Also when replacing periods with underscores in the FQDN."
}
updated_device = {"device": {"plan_id": kentik_archive_plan_id, "device_sample_rate": 1}}
send_mail(
"[GSO][Kentik] Failed to terminate router",
f"During the execution of a router termination workflow in GSO, we were unable to find the device "
f"{subscription.router.router_fqdn}.\nPlease update this device manually in Kentik.\n\n"
f"For reference, the workflow run can be found at: "
f"{oss_params.GENERAL.public_hostname}/workflows/{process_id}\n\nRegards, the GÉANT Automation Platform.",
destination=oss_params.EMAIL.kentik_email_destinations,
)
return {"kentik_device": f"Device {subscription.router.router_fqdn} not found in Kentik, no license applied!"}
# Send an email if we are out of archiving licenses.
kentik_archive_plan = kentik_client.get_plan_by_name(oss_params.KENTIK.archive_license_key)
if len(kentik_archive_plan["devices"]) >= kentik_archive_plan["max_devices"]:
send_mail(
"[GSO][Kentik] Failed to apply historical license",
f"During the execution of a router termination workflow on GSO, we were unable to apply a historical "
f"license to device {subscription.router.router_fqdn}.\nNo changes have been made, please update this "
f"device manually.\nIt appears we have run out of available historical licenses, all "
f"{kentik_archive_plan["max_devices"]} licenses are currently in use.\n\nFor reference, the workflow run "
f"can be found at: {oss_params.GENERAL.public_hostname}/workflows/{process_id}\n\nRegards, the GÉANT "
f"Automation Platform.",
destination=oss_params.EMAIL.kentik_email_destinations,
)
return {"kentik_device": "No more archiving licenses available. Nothing is updated in Kentik."}
updated_device = {
"device": {"plan_id": kentik_archive_plan["id"], "device_sample_rate": 1, "device_bgp_type": "none"}
}
kentik_device = kentik_client.update_device(kentik_device["id"], updated_device)
send_mail(
"[GSO][Kentik] Historical license has been applied",
f"A historical license has been applied to device {subscription.router.router_fqdn}.\n"
f"Currently, {len(kentik_archive_plan["devices"]) + 1} out of {kentik_archive_plan["max_devices"]} historical "
f"licenses are in use.\n\nFor reference, the workflow run can be found at: "
f"{oss_params.GENERAL.public_hostname}/workflows/{process_id}\n\nRegards, the GÉANT Automation Platform.",
destination=oss_params.EMAIL.kentik_email_destinations,
)
return {"kentik_device": kentik_device}
......
......@@ -163,10 +163,7 @@ def check_kentik_entry_exists(subscription: Router) -> None:
if bool(get_active_layer_3_services_on_router(subscription.subscription_id)):
kentik_params = load_oss_params().KENTIK
archive_plan = client.get_plan_by_name(kentik_params.archive_license_key)
if next(
(device for device in archive_plan["devices"] if device["device_name"] == subscription.router.router_fqdn),
None,
):
if any(device["device_name"] == subscription.router.router_fqdn for device in archive_plan["devices"]):
raise ProcessFailureError(
message="Device in Kentik incorrectly configured",
details=f"Kentik device {subscription.router.router_fqdn} has the archiving license "
......@@ -174,14 +171,7 @@ def check_kentik_entry_exists(subscription: Router) -> None:
)
placeholder_plan = client.get_plan_by_name(kentik_params.placeholder_license_key)
if next(
(
device
for device in placeholder_plan["devices"]
if device["device_name"] == subscription.router.router_fqdn
),
None,
):
if any(device["device_name"] == subscription.router.router_fqdn for device in placeholder_plan["devices"]):
raise ProcessFailureError(
message="Device in Kentik incorrectly configured",
details=f"Kentik device {subscription.router.router_fqdn} has the placeholder license "
......
......@@ -33,12 +33,14 @@ from sqlalchemy.orm import scoped_session, sessionmaker
from starlette.testclient import TestClient
from urllib3_mock import Responses
import gso.services.mailer
from gso.services.partners import PartnerSchema, create_partner
from gso.services.subscriptions import is_resource_type_value_unique
from gso.utils.types.interfaces import LAGMember, LAGMemberList
from test.fixtures import * # noqa: F403
logging.getLogger("faker.factory").setLevel(logging.WARNING)
logger = logging.getLogger("faker.factory")
logger.setLevel(logging.WARNING)
class UseJuniperSide(strEnum):
......@@ -595,4 +597,9 @@ def responses():
@pytest.fixture(autouse=True)
def _no_mail(monkeypatch):
"""Remove sending mails from all tests."""
monkeypatch.delattr("smtplib.SMTP")
def send_mail(subject: str, body: str, *, destination: str | None = None) -> None:
email = f"*** SENT AN EMAIL ***\nTO: {destination}\nSUBJECT: {subject}\nCONTENT:\n{body}"
logger.info(email)
monkeypatch.setattr(gso.services.mailer, "send_mail", send_mail)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment