Skip to content
Snippets Groups Projects

Add a Kentik service to GSO

Merged Karel van Klink requested to merge feature/add-kentik-service into develop
All threads resolved!
1 file
+ 54
29
Compare changes
  • Side-by-side
  • Inline
+ 162
0
"""The Kentik service is used for external interactions with Kentik."""
import logging
from typing import Any, Literal
import requests
from pydantic import BaseModel
from requests import Response
from gso.products.product_blocks.site import SiteTier
from gso.settings import load_oss_params
logger = logging.getLogger(__name__)
class NewKentikDevice(BaseModel):
"""API model for creating a new device in Kentik."""
device_name: str
device_description: str
sending_ips: list[str]
site_id: int
site_tier: SiteTier
device_snmp_ip: str
device_bgp_flowspec: bool
device_bgp_neighbor_ip: str
device_bgp_neighbor_ip6: str
class KentikClient:
"""The client for Kentik that interacts with an external instance."""
def __init__(self) -> None:
"""Instantiate a new Kentik Client."""
self.config = load_oss_params().KENTIK
self.session = requests.Session()
self.session.headers.update({
"X-CH-Auth-Email": self.config.user_email,
"X-CH-Auth-API-Token": self.config.api_key,
"Content-Type": "application/json",
})
def _send_request(
self, method: Literal["GET", "POST", "PUT", "DELETE"], endpoint: str, data: dict[str, Any] | None = None
) -> Response:
url = self.config.api_base + endpoint
logger.debug("Kentik - Sending request", extra={"method": method, "endpoint": url, "form_data": data})
result = self.session.request(method, url, json=data)
logger.debug("Kentik - Received response", extra=result.__dict__)
return result
def get_devices(self) -> list[dict[str, Any]]:
"""List all devices in Kentik."""
return [self._send_request("GET", "v5/devices").json()]
def get_device(self, device_id: str) -> dict[str, Any]:
"""Get a device by ID."""
return self._send_request("GET", f"v5/device/{device_id}").json()
def get_device_by_name(self, device_name: str) -> dict[str, Any]:
"""Fetch a device in Kentik by its :term:`FQDN`.
If the device is not found, returns an empty dict.
:param str device_name: The :term:`FQDN` of the sought device.
"""
devices = self.get_devices()
for device in devices:
if device["name"] == device_name:
return device
return {}
def get_sites(self) -> list[dict[str, Any]]:
"""Get a list of all available sites in Kentik."""
return self._send_request("GET", "v5/sites").json()["sites"]
def get_site(self, site_id: str) -> dict[str, Any]:
"""Get a site by ID."""
return self._send_request("GET", f"v5/site/{site_id}").json()
def get_site_by_name(self, site_slug: str) -> dict[str, Any]:
"""Get a Kentik site by its name.
If the site is not found, return an empty dict.
:param str site_slug: The name of the site, should be a three-letter slug like COR or POZ.
"""
sites = self.get_sites()
for site in sites:
if site["site_name"] == site_slug:
return site
return {}
def get_plans(self) -> list[dict[str, Any]]:
"""Get all Kentik plans available."""
return self._send_request("GET", "v5/plans").json()["plans"]
def get_plan_by_name(self, plan_name: str) -> dict[str, Any]:
"""Get a Kentik plan by its name.
If the plan is not found, returns an empty dict.
:param str plan_name: The name of the plan.
"""
plans = self.get_plans()
for plan in plans:
if plan["name"] == plan_name:
return plan
return {}
def create_device(self, device: NewKentikDevice) -> dict[str, Any]:
"""Add a new device to Kentik."""
plan_id = self.get_plan_by_name(self.config.billing_plans[device.site_tier])["id"]
request_body = {
"device": {
**device.model_dump(exclude=set("device_name" "site_tier")),
"device_name": device.device_description,
"device_type": self.config.device_type,
"device_subtype": self.config.device_type,
"minimize_snmp": self.config.minimize_snmp,
"device_sample_rate": self.config.sample_rate,
"plan_id": plan_id,
"device_snmp_community": self.config.snmp_community,
"device_bgp_type": self.config.bgp_type,
"bgp_lookup_strategy": self.config.bgp_lookup_strategy,
"device_bgp_neighbor_asn": str(self.config.ASN),
"device_bgp_password": self.config.md5_password,
}
}
new_device = self._send_request("POST", "v5/device", request_body).json()
# The name of the device has to be updated from the subscription ID to its FQDN.
# This is a limitation of the Kentik API that disallows settings device names containing a . symbol.
self.update_device(new_device["device"]["id"], {"device": {"device_name": device.device_name}})
new_device["device"]["device_name"] = device.device_name
return new_device
def update_device(self, device_id: str, updated_device: dict[str, Any]) -> dict[str, Any]:
"""Update an existing device in Kentik."""
return self._send_request("PUT", f"v5/device/{device_id}", updated_device).json()
def remove_device(self, device_id: str, *, archive: bool) -> None:
"""Remove a device from Kentik.
:param str device_id: The Kentik internal ID of the device that is to be removed.
:param bool archive: Archive the device instead of completely deleting it.
"""
if not archive:
self._send_request("DELETE", f"v5/device/{device_id}")
self._send_request("DELETE", f"v5/device/{device_id}")
def remove_device_by_fqdn(self, fqdn: str, *, archive: bool = True) -> None:
"""Remove a device from Kentik, by its :term:`FQDN`."""
device_id = self.get_device_by_name(fqdn)["id"]
self.remove_device(device_id, archive=archive)
Loading