Skip to content
Snippets Groups Projects
kentik_client.py 5.89 KiB
"""The Kentik service is used for external interactions with Kentik."""

import logging
from typing import Any, Literal

import requests
from pydantic import BaseModel
from requests import Response

from gso.settings import load_oss_params

logger = logging.getLogger(__name__)


class NewKentikDevice(BaseModel):
    """API model for creating a new device in Kentik."""

    device_name: str
    device_description: str
    sending_ips: list[str]
    site_id: int
    device_snmp_ip: str
    device_bgp_flowspec: bool
    device_bgp_neighbor_ip: str
    device_bgp_neighbor_ip6: str


class KentikClient:
    """The client for Kentik that interacts with an external instance."""

    def __init__(self) -> None:
        """Instantiate a new Kentik Client."""
        self.config = load_oss_params().KENTIK
        self.session = requests.Session()
        self.session.headers.update({
            "X-CH-Auth-Email": self.config.user_email,
            "X-CH-Auth-API-Token": self.config.api_key,
            "Content-Type": "application/json",
        })

    def _send_request(
        self, method: Literal["GET", "POST", "PUT", "DELETE"], endpoint: str, data: dict[str, Any] | None = None
    ) -> Response:
        url = self.config.api_base + endpoint
        logger.debug("Kentik - Sending request", extra={"method": method, "endpoint": url, "form_data": data})
        result = self.session.request(method, url, json=data)
        logger.debug("Kentik - Received response", extra=result.__dict__)

        return result

    def get_devices(self) -> list[dict[str, Any]]:
        """List all devices in Kentik."""
        return [self._send_request("GET", "v5/devices").json()]

    def get_device(self, device_id: str) -> dict[str, Any]:
        """Get a device by ID."""
        return self._send_request("GET", f"v5/device/{device_id}").json()

    def get_device_by_name(self, device_name: str) -> dict[str, Any]:
        """Fetch a device in Kentik by its :term:`FQDN`.

        If the device is not found, returns an empty dict.

        :param str device_name: The :term:`FQDN` of the sought device.
        """
        devices = self.get_devices()
        for device in devices:
            if device["name"] == device_name:
                return device

        return {}

    def get_sites(self) -> list[dict[str, Any]]:
        """Get a list of all available sites in Kentik."""
        return self._send_request("GET", "v5/sites").json()["sites"]

    def get_site(self, site_id: str) -> dict[str, Any]:
        """Get a site by ID."""
        return self._send_request("GET", f"v5/site/{site_id}").json()

    def get_site_by_name(self, site_slug: str) -> dict[str, Any]:
        """Get a Kentik site by its name.

        If the site is not found, return an empty dict.

        :param str site_slug: The name of the site, should be a three-letter slug like COR or POZ.
        """
        sites = self.get_sites()
        for site in sites:
            if site["site_name"] == site_slug:
                return site

        return {}

    def get_plans(self) -> list[dict[str, Any]]:
        """Get all Kentik plans available."""
        return self._send_request("GET", "v5/plans").json()["plans"]

    def get_plan_by_name(self, plan_name: str) -> dict[str, Any]:
        """Get a Kentik plan by its name.

        If the plan is not found, returns an empty dict.

        :param str plan_name: The name of the plan.
        """
        plans = self.get_plans()
        for plan in plans:
            if plan["name"] == plan_name:
                return plan

        return {}

    def create_device(self, device: NewKentikDevice) -> dict[str, Any]:
        """Add a new device to Kentik."""
        plan_id = self.get_plan_by_name(self.config.placeholder_license_key)["id"]
        request_body = {
            "device": {
                **device.model_dump(exclude=set("device_name")),
                "device_name": device.device_description,
                "device_type": self.config.device_type,
                "device_subtype": self.config.device_type,
                "minimize_snmp": self.config.minimize_snmp,
                "device_sample_rate": self.config.sample_rate,
                "plan_id": plan_id,
                "device_snmp_community": self.config.snmp_community,
                "device_bgp_type": self.config.bgp_type,
                "bgp_lookup_strategy": self.config.bgp_lookup_strategy,
                "device_bgp_neighbor_asn": str(self.config.ASN),
                "device_bgp_password": self.config.md5_password,
            }
        }

        new_device = self._send_request("POST", "v5/device", request_body).json()

        # The name of the device has to be updated from the subscription ID to its FQDN.
        # This is a limitation of the Kentik API that disallows settings device names containing a . symbol.
        self.update_device(new_device["device"]["id"], {"device": {"device_name": device.device_name}})
        new_device["device"]["device_name"] = device.device_name

        return new_device

    def update_device(self, device_id: str, updated_device: dict[str, Any]) -> dict[str, Any]:
        """Update an existing device in Kentik."""
        return self._send_request("PUT", f"v5/device/{device_id}", updated_device).json()

    def remove_device(self, device_id: str, *, archive: bool) -> None:
        """Remove a device from Kentik.

        :param str device_id: The Kentik internal ID of the device that is to be removed.
        :param bool archive: Archive the device instead of completely deleting it.
        """
        if not archive:
            self._send_request("DELETE", f"v5/device/{device_id}")

        self._send_request("DELETE", f"v5/device/{device_id}")

    def remove_device_by_fqdn(self, fqdn: str, *, archive: bool = True) -> None:
        """Remove a device from Kentik, by its :term:`FQDN`."""
        device_id = self.get_device_by_name(fqdn)["id"]
        self.remove_device(device_id, archive=archive)