Skip to content
Snippets Groups Projects

Add a Kentik service to GSO

Merged Karel van Klink requested to merge feature/add-kentik-service into develop
All threads resolved!
1 file
+ 4
3
Compare changes
  • Side-by-side
  • Inline
@@ -33,18 +33,19 @@ class KentikClient:
def __init__(self) -> None:
"""Instantiate a new Kentik Client."""
self.config = load_oss_params().KENTIK
self.headers = {
self.session = requests.Session()
self.session.headers.update({
"X-CH-Auth-Email": self.config.user_email,
"X-CH-Auth-API-Token": self.config.api_key,
"Content-Type": "application/json",
}
})
def _send_request(
self, method: Literal["GET", "POST", "PUT", "DELETE"], endpoint: str, data: dict[str, Any] | None = None
) -> Response:
url = self.config.api_base + endpoint
logger.debug("Kentik - Sending request", extra={"method": method, "endpoint": url, "form_data": data})
result = requests.request(method, url, json=data, headers=self.headers)
result = self.session.request(method, url, json=data)
logger.debug("Kentik - Received response", extra=result.__dict__)
return result
Loading