-
Pelle Koster authoredPelle Koster authored
cli.py 7.52 KiB
import json
import logging.config
import os
from datetime import datetime
from typing import Iterable, List
import click
from brian_polling_manager.interface_stats import vendors
from brian_polling_manager.interface_stats.click_helpers import (
validate_config,
validate_hostname,
)
from brian_polling_manager.interface_stats.services import (
get_juniper_netconf,
get_netconf_from_source_dir,
write_points_to_influx,
write_points_to_stdout,
)
from brian_polling_manager.interface_stats.vendors import Vendor, juniper
from brian_polling_manager.inventory import load_interfaces
logger = logging.getLogger(__file__)
LOGGING_DEFAULT_CONFIG = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"simple": {
"format": "%(asctime)s - %(name)s "
"(%(lineno)d) - %(levelname)s - %(message)s"
}
},
"handlers": {
"console": {
"class": "logging.StreamHandler",
"level": "DEBUG",
"formatter": "simple",
"stream": "ext://sys.stdout",
},
},
"loggers": {
"brian_polling_manager": {
"level": "DEBUG",
"handlers": ["console"],
"propagate": False,
}
},
"root": {"level": "INFO", "handlers": ["console"]},
}
_APP_CONFIG_PARAMS = {}
def set_app_params(params: dict):
global _APP_CONFIG_PARAMS
_APP_CONFIG_PARAMS = params
def setup_logging():
"""
set up logging using the configured filename
if LOGGING_CONFIG is defined in the environment, use this for
the filename, otherwise use LOGGING_DEFAULT_CONFIG
"""
logging_config = LOGGING_DEFAULT_CONFIG
if "LOGGING_CONFIG" in os.environ:
filename = os.environ["LOGGING_CONFIG"]
with open(filename) as f:
logging_config = json.loads(f.read())
logging.config.dictConfig(logging_config)
def write_points(points: Iterable[dict], influx_params: dict, **kwargs):
if _APP_CONFIG_PARAMS.get("testing", {}).get("no-out"):
return
if _APP_CONFIG_PARAMS.get("testing", {}).get("dry_run"):
return write_points_to_stdout(points, influx_params=influx_params, **kwargs)
return write_points_to_influx(points, influx_params=influx_params, **kwargs)
def get_netconf(router_name, vendor=Vendor.JUNIPER, **kwargs):
source_dir = _APP_CONFIG_PARAMS.get("testing", {}).get("netconf-source-dir")
if source_dir:
return get_netconf_from_source_dir(router_name, source_dir)
ssh_params = _APP_CONFIG_PARAMS[vendor.value]
return get_juniper_netconf(router_name, ssh_params, **kwargs)
def validate_router_hosts(
hostnames, vendor: Vendor, inprov_hosts=None, load_interfaces_=load_interfaces
):
if inprov_hosts is None:
return True
logger.info(
f"Validating hosts {' '.join(hostnames)} using providers {inprov_hosts}"
)
if vendor == Vendor.NOKIA:
all_fqdns = []
else:
all_fqdns = {ifc["router"] for ifc in load_interfaces_(hostnames)}
extra_fqdns = set(hostnames) - set(all_fqdns)
if extra_fqdns:
raise ValueError(
f"Routers are not in inventory provider or not {vendor.value}: "
f"{' '.join(extra_fqdns)}"
)
return True
def process_router(
router_fqdn: str,
vendor: Vendor,
all_influx_params: dict,
):
if vendor == Vendor.JUNIPER:
return process_juniper_router(router_fqdn, all_influx_params)
else:
return process_nokia_router(router_fqdn, all_influx_params)
def process_juniper_router(
router_fqdn: str,
all_influx_params: dict,
):
logger.info(f"processing Juniper router {router_fqdn}")
document = get_netconf(router_fqdn, vendor=Vendor.JUNIPER)
timestamp = datetime.now()
influx_params = all_influx_params["brian-counters"]
points = _juniper_brian_points(
router_fqdn=router_fqdn,
netconf_doc=document,
timestamp=timestamp,
measurement_name=influx_params["measurement"],
)
write_points(points, influx_params=influx_params)
influx_params = all_influx_params["error-counters"]
points = _juniper_error_points(
router_fqdn=router_fqdn,
netconf_doc=document,
timestamp=timestamp,
measurement_name=influx_params["measurement"],
)
write_points(points, influx_params=influx_params)
def _juniper_brian_points(router_fqdn, netconf_doc, timestamp, measurement_name):
interfaces = juniper.physical_interface_counters(netconf_doc)
yield from vendors.brian_points(
router_fqdn, interfaces, timestamp, measurement_name
)
interfaces = juniper.logical_interface_counters(netconf_doc)
yield from vendors.brian_points(
router_fqdn, interfaces, timestamp, measurement_name
)
def _juniper_error_points(router_fqdn, netconf_doc, timestamp, measurement_name):
interfaces = juniper.physical_interface_counters(netconf_doc)
yield from vendors.error_points(
router_fqdn, interfaces, timestamp, measurement_name
)
# [2024-03-21] We currently have no definition for error points on logical
# interfaces. This operation is essentially a no-op. Perhaps in the future we will
# get a definition for errors on logical interfaces
interfaces = juniper.logical_interface_counters(netconf_doc)
yield from vendors.error_points(
router_fqdn, interfaces, timestamp, measurement_name
)
def process_nokia_router(
router_fqdn: str,
all_influx_params: dict,
):
logger.warning(f"skipping Nokia router {router_fqdn}")
def main(
app_config_params: dict,
router_fqdns: List[str],
vendor: Vendor,
raise_errors=False,
):
vendor_str = vendor.value
inprov_hosts = app_config_params.get("inventory")
validate_router_hosts(router_fqdns, vendor=vendor, inprov_hosts=inprov_hosts)
ssh_params = app_config_params.get(vendor_str)
if not ssh_params:
raise ValueError(f"'{vendor_str}' ssh params are required")
error_count = 0
for router in router_fqdns:
try:
process_router(
router_fqdn=router,
vendor=vendor,
all_influx_params=app_config_params["influx"],
)
except Exception as e:
logger.exception(
f"Error while processing {vendor_str} {router}", exc_info=e
)
if raise_errors:
raise
return error_count
@click.command()
@click.option(
"--config",
"app_config_params",
required=True,
type=click.File("r"),
help="config filename",
callback=validate_config,
)
@click.option(
"--juniper",
is_flag=True,
help="The given router fqdns are juniper routers",
)
@click.option(
"--nokia",
is_flag=True,
help="The given router fqdns are nokia routers",
)
@click.argument("router-fqdn", nargs=-1, callback=validate_hostname)
def cli(
app_config_params: dict,
juniper: bool,
nokia: bool,
router_fqdn,
):
if not router_fqdn:
# Do nothing if no routers are specified
return
if not (juniper ^ nokia):
raise click.BadParameter("Set either '--juniper' or '--nokia', but not both")
vendor = Vendor.JUNIPER if juniper else Vendor.NOKIA
set_app_params(app_config_params)
setup_logging()
error_count = main(
app_config_params=app_config_params,
router_fqdns=router_fqdn,
vendor=vendor,
)
if error_count:
raise click.ClickException(
"Errors were encountered while processing interface stats"
)
if __name__ == "__main__":
cli()