-
Karel van Klink authoredKarel van Klink authored
kentik_client.py 5.89 KiB
"""The Kentik service is used for external interactions with Kentik."""
import logging
from typing import Any, Literal
import requests
from pydantic import BaseModel
from requests import Response
from gso.settings import load_oss_params
logger = logging.getLogger(__name__)
class NewKentikDevice(BaseModel):
"""API model for creating a new device in Kentik."""
device_name: str
device_description: str
sending_ips: list[str]
site_id: int
device_snmp_ip: str
device_bgp_flowspec: bool
device_bgp_neighbor_ip: str
device_bgp_neighbor_ip6: str
class KentikClient:
"""The client for Kentik that interacts with an external instance."""
def __init__(self) -> None:
"""Instantiate a new Kentik Client."""
self.config = load_oss_params().KENTIK
self.session = requests.Session()
self.session.headers.update({
"X-CH-Auth-Email": self.config.user_email,
"X-CH-Auth-API-Token": self.config.api_key,
"Content-Type": "application/json",
})
def _send_request(
self, method: Literal["GET", "POST", "PUT", "DELETE"], endpoint: str, data: dict[str, Any] | None = None
) -> Response:
url = self.config.api_base + endpoint
logger.debug("Kentik - Sending request", extra={"method": method, "endpoint": url, "form_data": data})
result = self.session.request(method, url, json=data)
logger.debug("Kentik - Received response", extra=result.__dict__)
return result
def get_devices(self) -> list[dict[str, Any]]:
"""List all devices in Kentik."""
return [self._send_request("GET", "v5/devices").json()]
def get_device(self, device_id: str) -> dict[str, Any]:
"""Get a device by ID."""
return self._send_request("GET", f"v5/device/{device_id}").json()
def get_device_by_name(self, device_name: str) -> dict[str, Any]:
"""Fetch a device in Kentik by its :term:`FQDN`.
If the device is not found, returns an empty dict.
:param str device_name: The :term:`FQDN` of the sought device.
"""
devices = self.get_devices()
for device in devices:
if device["name"] == device_name:
return device
return {}
def get_sites(self) -> list[dict[str, Any]]:
"""Get a list of all available sites in Kentik."""
return self._send_request("GET", "v5/sites").json()["sites"]
def get_site(self, site_id: str) -> dict[str, Any]:
"""Get a site by ID."""
return self._send_request("GET", f"v5/site/{site_id}").json()
def get_site_by_name(self, site_slug: str) -> dict[str, Any]:
"""Get a Kentik site by its name.
If the site is not found, return an empty dict.
:param str site_slug: The name of the site, should be a three-letter slug like COR or POZ.
"""
sites = self.get_sites()
for site in sites:
if site["site_name"] == site_slug:
return site
return {}
def get_plans(self) -> list[dict[str, Any]]:
"""Get all Kentik plans available."""
return self._send_request("GET", "v5/plans").json()["plans"]
def get_plan_by_name(self, plan_name: str) -> dict[str, Any]:
"""Get a Kentik plan by its name.
If the plan is not found, returns an empty dict.
:param str plan_name: The name of the plan.
"""
plans = self.get_plans()
for plan in plans:
if plan["name"] == plan_name:
return plan
return {}
def create_device(self, device: NewKentikDevice) -> dict[str, Any]:
"""Add a new device to Kentik."""
plan_id = self.get_plan_by_name(self.config.placeholder_license_key)["id"]
request_body = {
"device": {
**device.model_dump(exclude=set("device_name")),
"device_name": device.device_description,
"device_type": self.config.device_type,
"device_subtype": self.config.device_type,
"minimize_snmp": self.config.minimize_snmp,
"device_sample_rate": self.config.sample_rate,
"plan_id": plan_id,
"device_snmp_community": self.config.snmp_community,
"device_bgp_type": self.config.bgp_type,
"bgp_lookup_strategy": self.config.bgp_lookup_strategy,
"device_bgp_neighbor_asn": str(self.config.ASN),
"device_bgp_password": self.config.md5_password,
}
}
new_device = self._send_request("POST", "v5/device", request_body).json()
# The name of the device has to be updated from the subscription ID to its FQDN.
# This is a limitation of the Kentik API that disallows settings device names containing a . symbol.
self.update_device(new_device["device"]["id"], {"device": {"device_name": device.device_name}})
new_device["device"]["device_name"] = device.device_name
return new_device
def update_device(self, device_id: str, updated_device: dict[str, Any]) -> dict[str, Any]:
"""Update an existing device in Kentik."""
return self._send_request("PUT", f"v5/device/{device_id}", updated_device).json()
def remove_device(self, device_id: str, *, archive: bool) -> None:
"""Remove a device from Kentik.
:param str device_id: The Kentik internal ID of the device that is to be removed.
:param bool archive: Archive the device instead of completely deleting it.
"""
if not archive:
self._send_request("DELETE", f"v5/device/{device_id}")
self._send_request("DELETE", f"v5/device/{device_id}")
def remove_device_by_fqdn(self, fqdn: str, *, archive: bool = True) -> None:
"""Remove a device from Kentik, by its :term:`FQDN`."""
device_id = self.get_device_by_name(fqdn)["id"]
self.remove_device(device_id, archive=archive)