Skip to content
Snippets Groups Projects
Verified Commit 1e65d594 authored by Karel van Klink's avatar Karel van Klink :smiley_cat:
Browse files

rework the LibreNMS client

parent 1f137e1e
Branches
Tags
1 merge request!126Add iBGP workflow and LibreNMS client
...@@ -48,19 +48,14 @@ ...@@ -48,19 +48,14 @@
}, },
"MONITORING": { "MONITORING": {
"LIBRENMS": { "LIBRENMS": {
"endpoint": "https://librenms.test.gap.geant.org/", "base_url": "https://librenms/api/v0",
"token": "<token>", "token": "<token>"
"DEVICE_GROUPS": {
"routers_lab": "lab_routers",
"routers_prod": "prod_routers"
}
}, },
"SNMP": { "SNMP": {
"version": "v2c", "v2c": {
"V2": { "community": "secret-community"
"community": "librenms-community"
}, },
"V3": { "v3": {
"authlevel": "AuthPriv", "authlevel": "AuthPriv",
"authname": "librenms", "authname": "librenms",
"authpass": "<password1>", "authpass": "<password1>",
......
"""The LibreNMS module interacts with the LibreNMS instance when """The LibreNMS module interacts with the inventory management system of :term:`GAP`."""
- Creating a device.
- Validating the input of a device.
- Terminating a device.
"""
import json
import logging import logging
from http import HTTPStatus
from importlib import metadata
from typing import Any
import requests import requests
from requests import HTTPError
from gso import settings from gso.settings import load_oss_params
from gso.utils.helpers import SNMPVersion
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class CfgStruct: class LibreNMSClient:
pass """The client for LibreNMS that interacts with the inventory management system."""
def __init__(self) -> None:
def _get_cfg(): """Initialise a new LibreNMS client with an authentication token."""
"""Internal function to retrieve all needed configuration.""" config = load_oss_params().MONITORING
oss = settings.load_oss_params() token = config.LIBRENMS.token
cfg = CfgStruct()
# Hack for later ease: 1st setattr will fill in the inner's dict self.base_url = config.LIBRENMS.base_url
cfg._hack = "" self.snmp_config = config.SNMP
# Update inner dict
cfg.__dict__.update(oss.MONITORING) self.headers = {
assert cfg.__dict__ is not None "User-Agent": f"geant-service-orchestrator/{metadata.version('geant-service-orchestrator')}",
"Accept": "application/json",
# Add parameters on-the-fly "Content-Type": "application/json",
cfg.headers = {"X-Auth-Token": cfg.LIBRENMS.token} "X-Auth-Token": token,
}
sep = "/"
if cfg.LIBRENMS.endpoint.endswith("/"): def get_device(self, fqdn: str) -> dict[str, Any]:
sep = "" """Get an existing device from LibreNMS.
cfg.base_url = f"{cfg.LIBRENMS.endpoint}{sep}api/v0"
cfg.url_devices = f"{cfg.base_url}/devices" :param str fqdn: The :term:`FQDN` of a device that is retrieved.
cfg.url_switches = f"{cfg.base_url}/devicegroups/switches" :return dict[str, Any]: A :term:`JSON` formatted list of devices that match the queried :term:`FQDN`.
cfg.device_groups = cfg.LIBRENMS.DEVICE_GROUPS :raises HTTPError: Raises an HTTP error 404 when the device is not found
cfg.environment = oss.GENERAL.environment """
if cfg.environment.startswith("lab"): response = requests.get(f"{self.base_url}/devices/{fqdn}", headers=self.headers, timeout=(0.5, 75))
cfg_dg_rtr_lab = cfg.device_groups.routers_lab response.raise_for_status()
cfg.url_routers = f"{cfg.base_url}/devicegroups/{cfg_dg_rtr_lab}"
elif cfg.environment.startswith("prod"): return response.json()
cfg_dg_rtr_prod = cfg.device_groups.routers_prod
cfg.url_routers = f"{cfg.base_url}/devicegroups/{cfg_dg_rtr_prod}" def device_exists(self, fqdn: str) -> bool:
"""Check whether a device exists in LibreNMS.
return cfg
:param str fqdn: The hostname that should be checked for.
:return bool: Whether the device exists or not.
def validate_device(fqdn: str): """
"""Function that validates the existence of a device in LibreNMS. try:
device = self.get_device(fqdn)
:param FQDN of the device to validate. except HTTPError as e:
""" if e.response.status_code == HTTPStatus.NOT_FOUND:
CFG = _get_cfg() return False
raise
# Validate existence
nms_result = requests.get( return device["status"] == "ok"
CFG.url_devices, headers=CFG.headers)
assert nms_result is not None def add_device(self, fqdn: str, snmp_version: SNMPVersion) -> dict[str, Any]:
"""Add a new device to LibreNMS.
device_id = [x.get("device_id") for x in filter(lambda x: x.get("hostname") == fqdn,
nms_result.json().get("devices"))] :param str fqdn: The hostname of the newly added device.
:param SNMPVersion snmp_version: The SNMP version of the new device, which decides the authentication parameters
if len(device_id) != 1 or device_id[0] is None: that LibreNMS should use to poll the device.
error_msg = f"Device with FQDN={fqdn} is not registered in LibreNMS" """
print(error_msg) device_data = {
raise AssertionError(error_msg) "display": fqdn,
"hostname": fqdn,
# Validate correctness "sysName": fqdn,
device_id = device_id[0] "snmpver": snmp_version.value,
url_device = f"{CFG.url_devices}/{device_id}" }
logger.debug(f"Connecting to URL: {url_device}" device_data.update(getattr(self.snmp_config, snmp_version))
f"with headers: {CFG.headers}")
nms_result = requests.get( device = requests.post(f"{self.base_url}/devices", headers=self.headers, json=device_data, timeout=(0.5, 75))
url_device, headers=CFG.headers)
logger.debug(f"LibreNMS response={nms_result.content}") return device.json()
if nms_result.status_code != 200: def remove_device(self, fqdn: str) -> dict[str, Any]:
print(nms_result.content) """Remove a device from LibreNMS.
raise AssertionError(nms_result.content)
:param str fqdn: The :term:`FQDN` of the hostname that should get deleted.
# nms_dev_sysname = nms_result.json().get("sysName") :return dict[str, Any]: A JSON representation of the device that got removed.
nms_dev_hostname = nms_result.json().get("devices")[0].get("hostname") :raises HTTPError: Raises an exception if the request did not succeed.
if fqdn != nms_dev_hostname: """
error_msg = f"Device with FQDN={fqdn} may not be correctly "\ device = requests.delete(f"{self.base_url}/devices/{fqdn}", headers=self.headers, timeout=(0.5, 75))
f"registered in LibreNMS (expected FQDN: {nms_dev_hostname})"
print(error_msg) device.raise_for_status()
raise AssertionError(error_msg) return device.json()
def validate_device(self, fqdn: str) -> list[str]:
def register_device(fqdn: str): """Validate a device in LibreNMS by fetching the record match the queried :term:`FQDN` against its hostname.
"""Function that registers a new device in LibreNMS.
:param str fqdn: The :term:`FQDN` of the host that is validated.
:param FQDN of the device to register. :return list[str]: A list of errors, if empty the device is successfully validated.
""" """
CFG = _get_cfg() errors = []
logger.debug(f"Registering FQDN={fqdn} in LibreNMS") device = self.get_device(fqdn)
device_data = { if device["status"] != "ok":
"display": fqdn, errors += ["Device does not exist in LibreNMS."]
"hostname": fqdn,
"sysName": fqdn, if device["hostname"] != fqdn:
# "override_icmp_disable": "true", errors += ["Device hostname in LibreNMS does not match FQDN."]
# IMPORTANT: uncomment if testing with FQDNs that are not reachable
# from LibreNMS (e.g. ContainerLab routers) return errors
# "force_add": "true"
}
if CFG.SNMP.version == "v2c":
device_data.update({
"community": CFG.SNMP.V2.community,
})
elif CFG.SNMP.version == "v3":
for key in [
"authlevel", "authname", "authpass", "authalgo",
"cryptopass", "cryptoalgo"]:
device_data.update({key: getattr(CFG.SNMP.V3, key)})
logger.debug(f"Connecting to URL: {CFG.url_devices}"
f"with headers: {CFG.headers} and"
f"payload: {device_data}")
nms_result = requests.post(
CFG.url_devices, headers=CFG.headers,
data=json.dumps(device_data))
logger.debug(f"LibreNMS response={nms_result.content}")
if nms_result.status_code != 200:
print(nms_result.content)
raise AssertionError(nms_result.content)
def deregister_device(fqdn: str):
"""Function that reregisters a device from LibreNMS.
:param FQDN of the device to deregister.
"""
CFG = _get_cfg()
logger.debug(f"Deregistering FQDN={fqdn} from LibreNMS")
nms_result = requests.get(
CFG.url_devices, headers=CFG.headers)
assert nms_result is not None
device_id = [x.get("device_id") for x in filter(lambda x: x.get("hostname") == fqdn,
nms_result.json().get("devices"))]
if len(device_id) != 1:
return
device_id = device_id[0]
# https://docs.librenms.org/API/Devices/#endpoint-categories
device_data = {
"field": "disabled",
"data": "1",
}
url_device = f"{CFG.url_devices}/{device_id}"
logger.debug(f"Connecting to URL: {url_device}"
f"with headers: {CFG.headers} and"
f"payload: {device_data}")
nms_result = requests.patch(
url_device, headers=CFG.headers,
data=json.dumps(device_data))
logger.debug(f"LibreNMS response={nms_result.content}")
# Fail silently if device was not registered
if nms_result.status_code != 200:
print(nms_result.content)
...@@ -94,13 +94,6 @@ class IPAMParams(BaseSettings): ...@@ -94,13 +94,6 @@ class IPAMParams(BaseSettings):
LT_IAS: ServiceNetworkParams LT_IAS: ServiceNetworkParams
class MonitoringLibreNMSDevGroupsParams(BaseSettings):
"""Parameters related to LibreNMS device groups."""
routers_lab: str
routers_prod: str
class MonitoringSNMPV2Params(BaseSettings): class MonitoringSNMPV2Params(BaseSettings):
"""Parameters related to SNMPv2.""" """Parameters related to SNMPv2."""
...@@ -121,16 +114,24 @@ class MonitoringSNMPV3Params(BaseSettings): ...@@ -121,16 +114,24 @@ class MonitoringSNMPV3Params(BaseSettings):
class MonitoringLibreNMSParams(BaseSettings): class MonitoringLibreNMSParams(BaseSettings):
"""Parameters related to LibreNMS.""" """Parameters related to LibreNMS."""
endpoint: str base_url: str
token: str token: str
DEVICE_GROUPS: MonitoringLibreNMSDevGroupsParams
class SNMPParams(BaseSettings):
"""Parameters for SNMP in LibreNMS."""
v2c: MonitoringSNMPV2Params
#: .. versionadded :: 2.0
#: Support for :term:`SNMP` v3 will get added in a later version of :term:`GSO`. Parameters are optional for now.
v3: MonitoringSNMPV3Params | None
class MonitoringParams(BaseSettings): class MonitoringParams(BaseSettings):
"""Parameters related to the monitoring.""" """Parameters related to the monitoring."""
LIBRENMS: MonitoringLibreNMSParams LIBRENMS: MonitoringLibreNMSParams
SNMP: MonitoringSNMPV2Params | MonitoringSNMPV3Params SNMP: SNMPParams
class ProvisioningProxyParams(BaseSettings): class ProvisioningProxyParams(BaseSettings):
......
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
import ipaddress import ipaddress
import re import re
from enum import StrEnum
from ipaddress import IPv4Address from ipaddress import IPv4Address
from uuid import UUID from uuid import UUID
...@@ -30,6 +31,13 @@ class LAGMember(BaseModel): ...@@ -30,6 +31,13 @@ class LAGMember(BaseModel):
return hash((self.interface_name, self.interface_description)) return hash((self.interface_name, self.interface_description))
class SNMPVersion(StrEnum):
"""An enumerator for the two relevant versions of :term:`SNMP`: v2c and 3."""
V2C = "v2c"
V3 = "v3"
def available_interfaces_choices(router_id: UUID, speed: str) -> Choice | None: def available_interfaces_choices(router_id: UUID, speed: str) -> Choice | None:
"""Return a list of available interfaces for a given router and speed. """Return a list of available interfaces for a given router and speed.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment