Skip to content
Snippets Groups Projects
cli.py 7.52 KiB
import json
import logging.config
import os
from datetime import datetime
from typing import Iterable, List

import click
from brian_polling_manager.interface_stats import vendors
from brian_polling_manager.interface_stats.click_helpers import (
    validate_config,
    validate_hostname,
)
from brian_polling_manager.interface_stats.services import (
    get_juniper_netconf,
    get_netconf_from_source_dir,
    write_points_to_influx,
    write_points_to_stdout,
)

from brian_polling_manager.interface_stats.vendors import Vendor, juniper
from brian_polling_manager.inventory import load_interfaces

logger = logging.getLogger(__file__)

LOGGING_DEFAULT_CONFIG = {
    "version": 1,
    "disable_existing_loggers": False,
    "formatters": {
        "simple": {
            "format": "%(asctime)s - %(name)s "
            "(%(lineno)d) - %(levelname)s - %(message)s"
        }
    },
    "handlers": {
        "console": {
            "class": "logging.StreamHandler",
            "level": "DEBUG",
            "formatter": "simple",
            "stream": "ext://sys.stdout",
        },
    },
    "loggers": {
        "brian_polling_manager": {
            "level": "DEBUG",
            "handlers": ["console"],
            "propagate": False,
        }
    },
    "root": {"level": "INFO", "handlers": ["console"]},
}

_APP_CONFIG_PARAMS = {}


def set_app_params(params: dict):
    global _APP_CONFIG_PARAMS
    _APP_CONFIG_PARAMS = params


def setup_logging():
    """
    set up logging using the configured filename

    if LOGGING_CONFIG is defined in the environment, use this for
    the filename, otherwise use LOGGING_DEFAULT_CONFIG
    """
    logging_config = LOGGING_DEFAULT_CONFIG
    if "LOGGING_CONFIG" in os.environ:
        filename = os.environ["LOGGING_CONFIG"]
        with open(filename) as f:
            logging_config = json.loads(f.read())

    logging.config.dictConfig(logging_config)


def write_points(points: Iterable[dict], influx_params: dict, **kwargs):
    if _APP_CONFIG_PARAMS.get("testing", {}).get("no-out"):
        return

    if _APP_CONFIG_PARAMS.get("testing", {}).get("dry_run"):
        return write_points_to_stdout(points, influx_params=influx_params, **kwargs)

    return write_points_to_influx(points, influx_params=influx_params, **kwargs)


def get_netconf(router_name, vendor=Vendor.JUNIPER, **kwargs):
    source_dir = _APP_CONFIG_PARAMS.get("testing", {}).get("netconf-source-dir")
    if source_dir:
        return get_netconf_from_source_dir(router_name, source_dir)

    ssh_params = _APP_CONFIG_PARAMS[vendor.value]
    return get_juniper_netconf(router_name, ssh_params, **kwargs)


def validate_router_hosts(
    hostnames, vendor: Vendor, inprov_hosts=None, load_interfaces_=load_interfaces
):
    if inprov_hosts is None:
        return True

    logger.info(
        f"Validating hosts {' '.join(hostnames)} using providers {inprov_hosts}"
    )
    if vendor == Vendor.NOKIA:
        all_fqdns = []
    else:
        all_fqdns = {ifc["router"] for ifc in load_interfaces_(hostnames)}

    extra_fqdns = set(hostnames) - set(all_fqdns)
    if extra_fqdns:
        raise ValueError(
            f"Routers are not in inventory provider or not {vendor.value}: "
            f"{' '.join(extra_fqdns)}"
        )
    return True


def process_router(
    router_fqdn: str,
    vendor: Vendor,
    all_influx_params: dict,
):
    if vendor == Vendor.JUNIPER:
        return process_juniper_router(router_fqdn, all_influx_params)
    else:
        return process_nokia_router(router_fqdn, all_influx_params)


def process_juniper_router(
    router_fqdn: str,
    all_influx_params: dict,
):
    logger.info(f"processing Juniper router {router_fqdn}")

    document = get_netconf(router_fqdn, vendor=Vendor.JUNIPER)
    timestamp = datetime.now()

    influx_params = all_influx_params["brian-counters"]
    points = _juniper_brian_points(
        router_fqdn=router_fqdn,
        netconf_doc=document,
        timestamp=timestamp,
        measurement_name=influx_params["measurement"],
    )
    write_points(points, influx_params=influx_params)

    influx_params = all_influx_params["error-counters"]
    points = _juniper_error_points(
        router_fqdn=router_fqdn,
        netconf_doc=document,
        timestamp=timestamp,
        measurement_name=influx_params["measurement"],
    )

    write_points(points, influx_params=influx_params)


def _juniper_brian_points(router_fqdn, netconf_doc, timestamp, measurement_name):
    interfaces = juniper.physical_interface_counters(netconf_doc)
    yield from vendors.brian_points(
        router_fqdn, interfaces, timestamp, measurement_name
    )

    interfaces = juniper.logical_interface_counters(netconf_doc)
    yield from vendors.brian_points(
        router_fqdn, interfaces, timestamp, measurement_name
    )


def _juniper_error_points(router_fqdn, netconf_doc, timestamp, measurement_name):
    interfaces = juniper.physical_interface_counters(netconf_doc)
    yield from vendors.error_points(
        router_fqdn, interfaces, timestamp, measurement_name
    )

    # [2024-03-21] We currently have no definition for error points on logical
    # interfaces. This operation is essentially a no-op. Perhaps in the future we will
    # get a definition for errors on logical interfaces
    interfaces = juniper.logical_interface_counters(netconf_doc)
    yield from vendors.error_points(
        router_fqdn, interfaces, timestamp, measurement_name
    )


def process_nokia_router(
    router_fqdn: str,
    all_influx_params: dict,
):
    logger.warning(f"skipping Nokia router {router_fqdn}")


def main(
    app_config_params: dict,
    router_fqdns: List[str],
    vendor: Vendor,
    raise_errors=False,
):
    vendor_str = vendor.value
    inprov_hosts = app_config_params.get("inventory")

    validate_router_hosts(router_fqdns, vendor=vendor, inprov_hosts=inprov_hosts)

    ssh_params = app_config_params.get(vendor_str)
    if not ssh_params:
        raise ValueError(f"'{vendor_str}' ssh params are required")

    error_count = 0
    for router in router_fqdns:
        try:
            process_router(
                router_fqdn=router,
                vendor=vendor,
                all_influx_params=app_config_params["influx"],
            )
        except Exception as e:
            logger.exception(
                f"Error while processing {vendor_str} {router}", exc_info=e
            )
            if raise_errors:
                raise

    return error_count


@click.command()
@click.option(
    "--config",
    "app_config_params",
    required=True,
    type=click.File("r"),
    help="config filename",
    callback=validate_config,
)
@click.option(
    "--juniper",
    is_flag=True,
    help="The given router fqdns are juniper routers",
)
@click.option(
    "--nokia",
    is_flag=True,
    help="The given router fqdns are nokia routers",
)
@click.argument("router-fqdn", nargs=-1, callback=validate_hostname)
def cli(
    app_config_params: dict,
    juniper: bool,
    nokia: bool,
    router_fqdn,
):
    if not router_fqdn:
        # Do nothing if no routers are specified
        return
    if not (juniper ^ nokia):
        raise click.BadParameter("Set either '--juniper' or '--nokia', but not both")

    vendor = Vendor.JUNIPER if juniper else Vendor.NOKIA

    set_app_params(app_config_params)

    setup_logging()

    error_count = main(
        app_config_params=app_config_params,
        router_fqdns=router_fqdn,
        vendor=vendor,
    )
    if error_count:
        raise click.ClickException(
            "Errors were encountered while processing interface stats"
        )


if __name__ == "__main__":
    cli()