Skip to content
Snippets Groups Projects
Verified Commit 4d7d8109 authored by Karel van Klink's avatar Karel van Klink :smiley_cat:
Browse files

rework the LibreNMS client

parent 5b934aa9
Branches
Tags
No related merge requests found
This commit is part of merge request !126. Comments created here will be created in the context of that merge request.
......@@ -48,19 +48,14 @@
},
"MONITORING": {
"LIBRENMS": {
"endpoint": "https://librenms.test.gap.geant.org/",
"token": "<token>",
"DEVICE_GROUPS": {
"routers_lab": "lab_routers",
"routers_prod": "prod_routers"
}
"base_url": "https://librenms/api/v0",
"token": "<token>"
},
"SNMP": {
"version": "v2c",
"V2": {
"community": "librenms-community"
"v2c": {
"community": "secret-community"
},
"V3": {
"v3": {
"authlevel": "AuthPriv",
"authname": "librenms",
"authpass": "<password1>",
......
"""The LibreNMS module interacts with the LibreNMS instance when
- Creating a device.
- Validating the input of a device.
- Terminating a device.
"""
import json
"""The LibreNMS module interacts with the inventory management system of :term:`GAP`."""
import logging
from http import HTTPStatus
from importlib import metadata
from typing import Any
import requests
from requests import HTTPError
from gso import settings
from gso.settings import load_oss_params
from gso.utils.helpers import SNMPVersion
logger = logging.getLogger(__name__)
class CfgStruct:
pass
def _get_cfg():
"""Internal function to retrieve all needed configuration."""
oss = settings.load_oss_params()
cfg = CfgStruct()
# Hack for later ease: 1st setattr will fill in the inner's dict
cfg._hack = ""
# Update inner dict
cfg.__dict__.update(oss.MONITORING)
assert cfg.__dict__ is not None
# Add parameters on-the-fly
cfg.headers = {"X-Auth-Token": cfg.LIBRENMS.token}
sep = "/"
if cfg.LIBRENMS.endpoint.endswith("/"):
sep = ""
cfg.base_url = f"{cfg.LIBRENMS.endpoint}{sep}api/v0"
cfg.url_devices = f"{cfg.base_url}/devices"
cfg.url_switches = f"{cfg.base_url}/devicegroups/switches"
cfg.device_groups = cfg.LIBRENMS.DEVICE_GROUPS
cfg.environment = oss.GENERAL.environment
if cfg.environment.startswith("lab"):
cfg_dg_rtr_lab = cfg.device_groups.routers_lab
cfg.url_routers = f"{cfg.base_url}/devicegroups/{cfg_dg_rtr_lab}"
elif cfg.environment.startswith("prod"):
cfg_dg_rtr_prod = cfg.device_groups.routers_prod
cfg.url_routers = f"{cfg.base_url}/devicegroups/{cfg_dg_rtr_prod}"
return cfg
def validate_device(fqdn: str):
"""Function that validates the existence of a device in LibreNMS.
:param FQDN of the device to validate.
"""
CFG = _get_cfg()
# Validate existence
nms_result = requests.get(
CFG.url_devices, headers=CFG.headers)
assert nms_result is not None
device_id = [x.get("device_id") for x in filter(lambda x: x.get("hostname") == fqdn,
nms_result.json().get("devices"))]
if len(device_id) != 1 or device_id[0] is None:
error_msg = f"Device with FQDN={fqdn} is not registered in LibreNMS"
print(error_msg)
raise AssertionError(error_msg)
# Validate correctness
device_id = device_id[0]
url_device = f"{CFG.url_devices}/{device_id}"
logger.debug(f"Connecting to URL: {url_device}"
f"with headers: {CFG.headers}")
nms_result = requests.get(
url_device, headers=CFG.headers)
logger.debug(f"LibreNMS response={nms_result.content}")
if nms_result.status_code != 200:
print(nms_result.content)
raise AssertionError(nms_result.content)
# nms_dev_sysname = nms_result.json().get("sysName")
nms_dev_hostname = nms_result.json().get("devices")[0].get("hostname")
if fqdn != nms_dev_hostname:
error_msg = f"Device with FQDN={fqdn} may not be correctly "\
f"registered in LibreNMS (expected FQDN: {nms_dev_hostname})"
print(error_msg)
raise AssertionError(error_msg)
def register_device(fqdn: str):
"""Function that registers a new device in LibreNMS.
:param FQDN of the device to register.
"""
CFG = _get_cfg()
logger.debug(f"Registering FQDN={fqdn} in LibreNMS")
device_data = {
"display": fqdn,
"hostname": fqdn,
"sysName": fqdn,
# "override_icmp_disable": "true",
# IMPORTANT: uncomment if testing with FQDNs that are not reachable
# from LibreNMS (e.g. ContainerLab routers)
# "force_add": "true"
}
if CFG.SNMP.version == "v2c":
device_data.update({
"community": CFG.SNMP.V2.community,
})
elif CFG.SNMP.version == "v3":
for key in [
"authlevel", "authname", "authpass", "authalgo",
"cryptopass", "cryptoalgo"]:
device_data.update({key: getattr(CFG.SNMP.V3, key)})
logger.debug(f"Connecting to URL: {CFG.url_devices}"
f"with headers: {CFG.headers} and"
f"payload: {device_data}")
nms_result = requests.post(
CFG.url_devices, headers=CFG.headers,
data=json.dumps(device_data))
logger.debug(f"LibreNMS response={nms_result.content}")
if nms_result.status_code != 200:
print(nms_result.content)
raise AssertionError(nms_result.content)
def deregister_device(fqdn: str):
"""Function that reregisters a device from LibreNMS.
:param FQDN of the device to deregister.
"""
CFG = _get_cfg()
logger.debug(f"Deregistering FQDN={fqdn} from LibreNMS")
nms_result = requests.get(
CFG.url_devices, headers=CFG.headers)
assert nms_result is not None
device_id = [x.get("device_id") for x in filter(lambda x: x.get("hostname") == fqdn,
nms_result.json().get("devices"))]
if len(device_id) != 1:
return
device_id = device_id[0]
# https://docs.librenms.org/API/Devices/#endpoint-categories
device_data = {
"field": "disabled",
"data": "1",
}
url_device = f"{CFG.url_devices}/{device_id}"
logger.debug(f"Connecting to URL: {url_device}"
f"with headers: {CFG.headers} and"
f"payload: {device_data}")
nms_result = requests.patch(
url_device, headers=CFG.headers,
data=json.dumps(device_data))
logger.debug(f"LibreNMS response={nms_result.content}")
# Fail silently if device was not registered
if nms_result.status_code != 200:
print(nms_result.content)
class LibreNMSClient:
"""The client for LibreNMS that interacts with the inventory management system."""
def __init__(self) -> None:
"""Initialise a new LibreNMS client with an authentication token."""
config = load_oss_params().MONITORING
token = config.LIBRENMS.token
self.base_url = config.LIBRENMS.base_url
self.snmp_config = config.SNMP
self.headers = {
"User-Agent": f"geant-service-orchestrator/{metadata.version('geant-service-orchestrator')}",
"Accept": "application/json",
"Content-Type": "application/json",
"X-Auth-Token": token,
}
def get_device(self, fqdn: str) -> dict[str, Any]:
"""Get an existing device from LibreNMS.
:param str fqdn: The :term:`FQDN` of a device that is retrieved.
:return dict[str, Any]: A :term:`JSON` formatted list of devices that match the queried :term:`FQDN`.
:raises HTTPError: Raises an HTTP error 404 when the device is not found
"""
response = requests.get(f"{self.base_url}/devices/{fqdn}", headers=self.headers, timeout=(0.5, 75))
response.raise_for_status()
return response.json()
def device_exists(self, fqdn: str) -> bool:
"""Check whether a device exists in LibreNMS.
:param str fqdn: The hostname that should be checked for.
:return bool: Whether the device exists or not.
"""
try:
device = self.get_device(fqdn)
except HTTPError as e:
if e.response.status_code == HTTPStatus.NOT_FOUND:
return False
raise
return device["status"] == "ok"
def add_device(self, fqdn: str, snmp_version: SNMPVersion) -> dict[str, Any]:
"""Add a new device to LibreNMS.
:param str fqdn: The hostname of the newly added device.
:param SNMPVersion snmp_version: The SNMP version of the new device, which decides the authentication parameters
that LibreNMS should use to poll the device.
"""
device_data = {
"display": fqdn,
"hostname": fqdn,
"sysName": fqdn,
"snmpver": snmp_version.value,
}
device_data.update(getattr(self.snmp_config, snmp_version))
device = requests.post(f"{self.base_url}/devices", headers=self.headers, json=device_data, timeout=(0.5, 75))
return device.json()
def remove_device(self, fqdn: str) -> dict[str, Any]:
"""Remove a device from LibreNMS.
:param str fqdn: The :term:`FQDN` of the hostname that should get deleted.
:return dict[str, Any]: A JSON representation of the device that got removed.
:raises HTTPError: Raises an exception if the request did not succeed.
"""
device = requests.delete(f"{self.base_url}/devices/{fqdn}", headers=self.headers, timeout=(0.5, 75))
device.raise_for_status()
return device.json()
def validate_device(self, fqdn: str) -> list[str]:
"""Validate a device in LibreNMS by fetching the record match the queried :term:`FQDN` against its hostname.
:param str fqdn: The :term:`FQDN` of the host that is validated.
:return list[str]: A list of errors, if empty the device is successfully validated.
"""
errors = []
device = self.get_device(fqdn)
if device["status"] != "ok":
errors += ["Device does not exist in LibreNMS."]
if device["hostname"] != fqdn:
errors += ["Device hostname in LibreNMS does not match FQDN."]
return errors
......@@ -94,13 +94,6 @@ class IPAMParams(BaseSettings):
LT_IAS: ServiceNetworkParams
class MonitoringLibreNMSDevGroupsParams(BaseSettings):
"""Parameters related to LibreNMS device groups."""
routers_lab: str
routers_prod: str
class MonitoringSNMPV2Params(BaseSettings):
"""Parameters related to SNMPv2."""
......@@ -121,16 +114,24 @@ class MonitoringSNMPV3Params(BaseSettings):
class MonitoringLibreNMSParams(BaseSettings):
"""Parameters related to LibreNMS."""
endpoint: str
base_url: str
token: str
DEVICE_GROUPS: MonitoringLibreNMSDevGroupsParams
class SNMPParams(BaseSettings):
"""Parameters for SNMP in LibreNMS."""
v2c: MonitoringSNMPV2Params
#: .. versionadded :: 2.0
#: Support for :term:`SNMP` v3 will get added in a later version of :term:`GSO`. Parameters are optional for now.
v3: MonitoringSNMPV3Params | None
class MonitoringParams(BaseSettings):
"""Parameters related to the monitoring."""
LIBRENMS: MonitoringLibreNMSParams
SNMP: MonitoringSNMPV2Params | MonitoringSNMPV3Params
SNMP: SNMPParams
class ProvisioningProxyParams(BaseSettings):
......
......@@ -2,6 +2,7 @@
import ipaddress
import re
from enum import StrEnum
from ipaddress import IPv4Address
from uuid import UUID
......@@ -36,6 +37,13 @@ class LAGMember(BaseModel):
return hash((self.interface_name, self.interface_description))
class SNMPVersion(StrEnum):
"""An enumerator for the two relevant versions of :term:`SNMP`: v2c and 3."""
V2C = "v2c"
V3 = "v3"
def available_interfaces_choices(router_id: UUID, speed: str) -> Choice | None:
"""Return a list of available interfaces for a given router and speed.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment