"""The Kentik service is used for external interactions with Kentik.""" import logging from typing import Any, Literal import requests from pydantic import BaseModel from requests import Response from gso.settings import load_oss_params logger = logging.getLogger(__name__) class NewKentikDevice(BaseModel): """API model for creating a new device in Kentik.""" device_name: str device_description: str sending_ips: list[str] site_id: int device_snmp_ip: str device_bgp_flowspec: bool device_bgp_neighbor_ip: str device_bgp_neighbor_ip6: str class KentikClient: """The client for Kentik that interacts with an external instance.""" def __init__(self) -> None: """Instantiate a new Kentik Client.""" self.config = load_oss_params().KENTIK self.session = requests.Session() self.session.headers.update({ "X-CH-Auth-Email": self.config.user_email, "X-CH-Auth-API-Token": self.config.api_key, "Content-Type": "application/json", }) def _send_request( self, method: Literal["GET", "POST", "PUT", "DELETE"], endpoint: str, data: dict[str, Any] | None = None ) -> Response: url = self.config.api_base + endpoint logger.debug("Kentik - Sending request", extra={"method": method, "endpoint": url, "form_data": data}) result = self.session.request(method, url, json=data) logger.debug("Kentik - Received response", extra=result.__dict__) return result def get_devices(self) -> list[dict[str, Any]]: """List all devices in Kentik.""" return [self._send_request("GET", "v5/devices").json()] def get_device(self, device_id: str) -> dict[str, Any]: """Get a device by ID.""" return self._send_request("GET", f"v5/device/{device_id}").json() def get_device_by_name(self, device_name: str) -> dict[str, Any]: """Fetch a device in Kentik by its :term:`FQDN`. If the device is not found, returns an empty dict. :param str device_name: The :term:`FQDN` of the sought device. """ devices = self.get_devices() for device in devices: if device["name"] == device_name: return device return {} def get_sites(self) -> list[dict[str, Any]]: """Get a list of all available sites in Kentik.""" return self._send_request("GET", "v5/sites").json()["sites"] def get_site(self, site_id: str) -> dict[str, Any]: """Get a site by ID.""" return self._send_request("GET", f"v5/site/{site_id}").json() def get_site_by_name(self, site_slug: str) -> dict[str, Any]: """Get a Kentik site by its name. If the site is not found, return an empty dict. :param str site_slug: The name of the site, should be a three-letter slug like COR or POZ. """ sites = self.get_sites() for site in sites: if site["site_name"] == site_slug: return site return {} def get_plans(self) -> list[dict[str, Any]]: """Get all Kentik plans available.""" return self._send_request("GET", "v5/plans").json()["plans"] def get_plan_by_name(self, plan_name: str) -> dict[str, Any]: """Get a Kentik plan by its name. If the plan is not found, returns an empty dict. :param str plan_name: The name of the plan. """ plans = self.get_plans() for plan in plans: if plan["name"] == plan_name: return plan return {} def create_device(self, device: NewKentikDevice) -> dict[str, Any]: """Add a new device to Kentik.""" plan_id = self.get_plan_by_name(self.config.placeholder_license_key)["id"] request_body = { "device": { **device.model_dump(exclude=set("device_name")), "device_name": device.device_description, "device_type": self.config.device_type, "device_subtype": self.config.device_type, "minimize_snmp": self.config.minimize_snmp, "device_sample_rate": self.config.sample_rate, "plan_id": plan_id, "device_snmp_community": self.config.snmp_community, "device_bgp_type": self.config.bgp_type, "bgp_lookup_strategy": self.config.bgp_lookup_strategy, "device_bgp_neighbor_asn": str(self.config.ASN), "device_bgp_password": self.config.md5_password, } } new_device = self._send_request("POST", "v5/device", request_body).json() # The name of the device has to be updated from the subscription ID to its FQDN. # This is a limitation of the Kentik API that disallows settings device names containing a . symbol. self.update_device(new_device["device"]["id"], {"device": {"device_name": device.device_name}}) new_device["device"]["device_name"] = device.device_name return new_device def update_device(self, device_id: str, updated_device: dict[str, Any]) -> dict[str, Any]: """Update an existing device in Kentik.""" return self._send_request("PUT", f"v5/device/{device_id}", updated_device).json() def remove_device(self, device_id: str, *, archive: bool) -> None: """Remove a device from Kentik. :param str device_id: The Kentik internal ID of the device that is to be removed. :param bool archive: Archive the device instead of completely deleting it. """ if not archive: self._send_request("DELETE", f"v5/device/{device_id}") self._send_request("DELETE", f"v5/device/{device_id}") def remove_device_by_fqdn(self, fqdn: str, *, archive: bool = True) -> None: """Remove a device from Kentik, by its :term:`FQDN`.""" device_id = self.get_device_by_name(fqdn)["id"] self.remove_device(device_id, archive=archive)