Skip to content
Snippets Groups Projects

Add iBGP workflow and LibreNMS client

Merged Karel van Klink requested to merge feature/add-ibgp-workflow into develop
All threads resolved!
4 files
+ 123
179
Compare changes
  • Side-by-side
  • Inline
Files
4
"""The LibreNMS module interacts with the LibreNMS instance when
- Creating a device.
- Validating the input of a device.
- Terminating a device.
"""
import json
"""The LibreNMS module interacts with the inventory management system of :term:`GAP`."""
import logging
from http import HTTPStatus
from importlib import metadata
from typing import Any
import requests
from requests import HTTPError
from gso import settings
from gso.settings import load_oss_params
from gso.utils.helpers import SNMPVersion
logger = logging.getLogger(__name__)
class CfgStruct:
pass
def _get_cfg():
"""Internal function to retrieve all needed configuration."""
oss = settings.load_oss_params()
cfg = CfgStruct()
# Hack for later ease: 1st setattr will fill in the inner's dict
cfg._hack = ""
# Update inner dict
cfg.__dict__.update(oss.MONITORING)
assert cfg.__dict__ is not None
# Add parameters on-the-fly
cfg.headers = {"X-Auth-Token": cfg.LIBRENMS.token}
sep = "/"
if cfg.LIBRENMS.endpoint.endswith("/"):
sep = ""
cfg.base_url = f"{cfg.LIBRENMS.endpoint}{sep}api/v0"
cfg.url_devices = f"{cfg.base_url}/devices"
cfg.url_switches = f"{cfg.base_url}/devicegroups/switches"
cfg.device_groups = cfg.LIBRENMS.DEVICE_GROUPS
cfg.environment = oss.GENERAL.environment
if cfg.environment.startswith("lab"):
cfg_dg_rtr_lab = cfg.device_groups.routers_lab
cfg.url_routers = f"{cfg.base_url}/devicegroups/{cfg_dg_rtr_lab}"
elif cfg.environment.startswith("prod"):
cfg_dg_rtr_prod = cfg.device_groups.routers_prod
cfg.url_routers = f"{cfg.base_url}/devicegroups/{cfg_dg_rtr_prod}"
return cfg
def validate_device(fqdn: str):
"""Function that validates the existence of a device in LibreNMS.
:param FQDN of the device to validate.
"""
CFG = _get_cfg()
# Validate existence
nms_result = requests.get(
CFG.url_devices, headers=CFG.headers)
assert nms_result is not None
device_id = [x.get("device_id") for x in filter(lambda x: x.get("hostname") == fqdn,
nms_result.json().get("devices"))]
if len(device_id) != 1 or device_id[0] is None:
error_msg = f"Device with FQDN={fqdn} is not registered in LibreNMS"
print(error_msg)
raise AssertionError(error_msg)
# Validate correctness
device_id = device_id[0]
url_device = f"{CFG.url_devices}/{device_id}"
logger.debug(f"Connecting to URL: {url_device}"
f"with headers: {CFG.headers}")
nms_result = requests.get(
url_device, headers=CFG.headers)
logger.debug(f"LibreNMS response={nms_result.content}")
if nms_result.status_code != 200:
print(nms_result.content)
raise AssertionError(nms_result.content)
# nms_dev_sysname = nms_result.json().get("sysName")
nms_dev_hostname = nms_result.json().get("devices")[0].get("hostname")
if fqdn != nms_dev_hostname:
error_msg = f"Device with FQDN={fqdn} may not be correctly "\
f"registered in LibreNMS (expected FQDN: {nms_dev_hostname})"
print(error_msg)
raise AssertionError(error_msg)
def register_device(fqdn: str):
"""Function that registers a new device in LibreNMS.
:param FQDN of the device to register.
"""
CFG = _get_cfg()
logger.debug(f"Registering FQDN={fqdn} in LibreNMS")
device_data = {
"display": fqdn,
"hostname": fqdn,
"sysName": fqdn,
# "override_icmp_disable": "true",
# IMPORTANT: uncomment if testing with FQDNs that are not reachable
# from LibreNMS (e.g. ContainerLab routers)
# "force_add": "true"
}
if CFG.SNMP.version == "v2c":
device_data.update({
"community": CFG.SNMP.V2.community,
})
elif CFG.SNMP.version == "v3":
for key in [
"authlevel", "authname", "authpass", "authalgo",
"cryptopass", "cryptoalgo"]:
device_data.update({key: getattr(CFG.SNMP.V3, key)})
logger.debug(f"Connecting to URL: {CFG.url_devices}"
f"with headers: {CFG.headers} and"
f"payload: {device_data}")
nms_result = requests.post(
CFG.url_devices, headers=CFG.headers,
data=json.dumps(device_data))
logger.debug(f"LibreNMS response={nms_result.content}")
if nms_result.status_code != 200:
print(nms_result.content)
raise AssertionError(nms_result.content)
def deregister_device(fqdn: str):
"""Function that reregisters a device from LibreNMS.
:param FQDN of the device to deregister.
"""
CFG = _get_cfg()
logger.debug(f"Deregistering FQDN={fqdn} from LibreNMS")
nms_result = requests.get(
CFG.url_devices, headers=CFG.headers)
assert nms_result is not None
device_id = [x.get("device_id") for x in filter(lambda x: x.get("hostname") == fqdn,
nms_result.json().get("devices"))]
if len(device_id) != 1:
return
device_id = device_id[0]
# https://docs.librenms.org/API/Devices/#endpoint-categories
device_data = {
"field": "disabled",
"data": "1",
}
url_device = f"{CFG.url_devices}/{device_id}"
logger.debug(f"Connecting to URL: {url_device}"
f"with headers: {CFG.headers} and"
f"payload: {device_data}")
nms_result = requests.patch(
url_device, headers=CFG.headers,
data=json.dumps(device_data))
logger.debug(f"LibreNMS response={nms_result.content}")
# Fail silently if device was not registered
if nms_result.status_code != 200:
print(nms_result.content)
class LibreNMSClient:
"""The client for LibreNMS that interacts with the inventory management system."""
def __init__(self) -> None:
"""Initialise a new LibreNMS client with an authentication token."""
config = load_oss_params().MONITORING
token = config.LIBRENMS.token
self.base_url = config.LIBRENMS.base_url
self.snmp_config = config.SNMP
self.headers = {
"User-Agent": f"geant-service-orchestrator/{metadata.version('geant-service-orchestrator')}",
"Accept": "application/json",
"Content-Type": "application/json",
"X-Auth-Token": token,
}
def get_device(self, fqdn: str) -> dict[str, Any]:
"""Get an existing device from LibreNMS.
:param str fqdn: The :term:`FQDN` of a device that is retrieved.
:return dict[str, Any]: A :term:`JSON` formatted list of devices that match the queried :term:`FQDN`.
:raises HTTPError: Raises an HTTP error 404 when the device is not found
"""
response = requests.get(f"{self.base_url}/devices/{fqdn}", headers=self.headers, timeout=(0.5, 75))
response.raise_for_status()
return response.json()
def device_exists(self, fqdn: str) -> bool:
"""Check whether a device exists in LibreNMS.
:param str fqdn: The hostname that should be checked for.
:return bool: Whether the device exists or not.
"""
try:
device = self.get_device(fqdn)
except HTTPError as e:
if e.response.status_code == HTTPStatus.NOT_FOUND:
return False
raise
return device["status"] == "ok"
def add_device(self, fqdn: str, snmp_version: SNMPVersion) -> dict[str, Any]:
"""Add a new device to LibreNMS.
:param str fqdn: The hostname of the newly added device.
:param SNMPVersion snmp_version: The SNMP version of the new device, which decides the authentication parameters
that LibreNMS should use to poll the device.
"""
device_data = {
"display": fqdn,
"hostname": fqdn,
"sysName": fqdn,
"snmpver": snmp_version.value,
}
device_data.update(getattr(self.snmp_config, snmp_version))
device = requests.post(f"{self.base_url}/devices", headers=self.headers, json=device_data, timeout=(0.5, 75))
return device.json()
def remove_device(self, fqdn: str) -> dict[str, Any]:
"""Remove a device from LibreNMS.
:param str fqdn: The :term:`FQDN` of the hostname that should get deleted.
:return dict[str, Any]: A JSON representation of the device that got removed.
:raises HTTPError: Raises an exception if the request did not succeed.
"""
device = requests.delete(f"{self.base_url}/devices/{fqdn}", headers=self.headers, timeout=(0.5, 75))
device.raise_for_status()
return device.json()
def validate_device(self, fqdn: str) -> list[str]:
"""Validate a device in LibreNMS by fetching the record match the queried :term:`FQDN` against its hostname.
:param str fqdn: The :term:`FQDN` of the host that is validated.
:return list[str]: A list of errors, if empty the device is successfully validated.
"""
errors = []
device = self.get_device(fqdn)
if device["status"] != "ok":
errors += ["Device does not exist in LibreNMS."]
if device["hostname"] != fqdn:
errors += ["Device hostname in LibreNMS does not match FQDN."]
return errors
Loading