Skip to content
Snippets Groups Projects
Commit 9c21a52b authored by Karel van Klink's avatar Karel van Klink :smiley_cat:
Browse files

Rework Kentik client

parent ce8af675
No related branches found
No related tags found
1 merge request!257Feature/add kentik workflow
......@@ -4,6 +4,7 @@ import logging
from typing import Any, Literal
import requests
from orchestrator.utils.errors import ProcessFailureError
from pydantic import BaseModel
from requests import Response
......@@ -39,22 +40,37 @@ class KentikClient:
})
def _send_request(
self, method: Literal["GET", "POST", "PUT", "DELETE"], endpoint: str, data: dict[str, Any] | None = None
) -> Response:
self, method: Literal["GET", "POST", "PUT"], endpoint: str, data: dict[str, Any] | None = None
) -> dict[str, Any]:
url = self.config.api_base + endpoint
logger.debug("Kentik - Sending request", extra={"method": method, "endpoint": url, "form_data": data})
result = self.session.request(method, url, json=data)
logger.debug("Kentik - Received response", extra=result.__dict__)
logger.debug("Kentik - Sending %s request to %s with headers %s", method, url, data)
result = self.session.request(method, url, json=data).json()
if "error" in result or "kentik_error" in result:
msg = "Failed to process request in Kentik"
raise ProcessFailureError(msg, details=result)
logger.debug("Kentik - Received response %s", result)
return result
def _send_delete(self, endpoint: str, data: dict[str, Any] | None = None) -> Response:
url = self.config.api_base + endpoint
logger.debug("Kentik - Sending delete request to %s with headers %s", url, data)
return self.session.delete(url, json=data)
def get_devices(self) -> list[dict[str, Any]]:
"""List all devices in Kentik."""
return [self._send_request("GET", "v5/devices").json()]
"""List all devices in Kentik.
Returns a list of shape ``[{**device_1}, {**device_2}, ..., {**device_n}]}``.
"""
return self._send_request("GET", "v5/devices")["devices"]
def get_device(self, device_id: str) -> dict[str, Any]:
"""Get a device by ID."""
return self._send_request("GET", f"v5/device/{device_id}").json()
device = self._send_request("GET", f"v5/device/{device_id}")
device.pop("custom_column_data", None)
device.pop("custom_columns", None)
return device
def get_device_by_name(self, device_name: str) -> dict[str, Any]:
"""Fetch a device in Kentik by its :term:`FQDN`.
......@@ -65,18 +81,18 @@ class KentikClient:
"""
devices = self.get_devices()
for device in devices:
if device["name"] == device_name:
if device["device_name"] == device_name:
return device
return {}
def get_sites(self) -> list[dict[str, Any]]:
"""Get a list of all available sites in Kentik."""
return self._send_request("GET", "v5/sites").json()["sites"]
return self._send_request("GET", "v5/sites")["sites"]
def get_site(self, site_id: str) -> dict[str, Any]:
"""Get a site by ID."""
return self._send_request("GET", f"v5/site/{site_id}").json()
return self._send_request("GET", f"v5/site/{site_id}")
def get_site_by_name(self, site_slug: str) -> dict[str, Any]:
"""Get a Kentik site by its name.
......@@ -94,7 +110,11 @@ class KentikClient:
def get_plans(self) -> list[dict[str, Any]]:
"""Get all Kentik plans available."""
return self._send_request("GET", "v5/plans").json()["plans"]
return self._send_request("GET", "v5/plans")["plans"]
def get_plan(self, plan_id: int) -> dict[str, Any]:
"""Get a Kentik plan by ID."""
return self._send_request("GET", f"v5/plan/{plan_id}")
def get_plan_by_name(self, plan_name: str) -> dict[str, Any]:
"""Get a Kentik plan by its name.
......@@ -130,18 +150,23 @@ class KentikClient:
}
}
new_device = self._send_request("POST", "v5/device", request_body).json()
new_device = self._send_request("POST", "v5/device", request_body)["device"]
# The name of the device has to be updated from the subscription ID to its FQDN.
# This is a limitation of the Kentik API that disallows settings device names containing a . symbol.
self.update_device(new_device["device"]["id"], {"device": {"device_name": device.device_name}})
new_device["device"]["device_name"] = device.device_name
self.update_device(new_device["id"], {"device": {"device_name": device.device_name}})
new_device["device_name"] = device.device_name
new_device.pop("custom_column_data", None)
new_device.pop("custom_columns", None)
return new_device
def update_device(self, device_id: str, updated_device: dict[str, Any]) -> dict[str, Any]:
"""Update an existing device in Kentik."""
return self._send_request("PUT", f"v5/device/{device_id}", updated_device).json()
device = self._send_request("PUT", f"v5/device/{device_id}", updated_device)["device"]
device.pop("custom_column_data", None)
device.pop("custom_columns", None)
return device
def remove_device(self, device_id: str, *, archive: bool) -> None:
"""Remove a device from Kentik.
......@@ -150,9 +175,9 @@ class KentikClient:
:param bool archive: Archive the device instead of completely deleting it.
"""
if not archive:
self._send_request("DELETE", f"v5/device/{device_id}")
self._send_delete(f"v5/device/{device_id}")
self._send_request("DELETE", f"v5/device/{device_id}")
self._send_delete(f"v5/device/{device_id}")
def remove_device_by_fqdn(self, fqdn: str, *, archive: bool = True) -> None:
"""Remove a device from Kentik, by its :term:`FQDN`."""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment