Skip to content
Snippets Groups Projects
Select Git revision
  • e1152b8ae4b93d7ee0025e3762826a5b5d206581
  • develop default
  • master protected
  • feature/frontend-tests
  • 0.99
  • 0.98
  • 0.97
  • 0.96
  • 0.95
  • 0.94
  • 0.93
  • 0.92
  • 0.91
  • 0.90
  • 0.89
  • 0.88
  • 0.87
  • 0.86
  • 0.85
  • 0.84
  • 0.83
  • 0.82
  • 0.81
  • 0.80
24 results

DownloadDataButton.tsx

Blame
  • cli.py 10.18 KiB
    import enum
    import json
    from logging import LogRecord
    import logging.config
    import socket
    import sys
    from datetime import datetime
    from typing import Iterable, List, Optional, Collection
    
    import click
    import jsonschema
    from brian_polling_manager.influx import influx_client
    from brian_polling_manager.interface_stats import vendors, config
    from brian_polling_manager.interface_stats.vendors import Vendor, juniper, nokia
    from brian_polling_manager.inventory import load_interfaces
    from lxml import etree
    
    logger = logging.getLogger()
    
    
    class MessageCounter(logging.NullHandler):
        def __init__(self, level=logging.NOTSET) -> None:
            super().__init__(level)
            self.count = 0
    
        def handle(self, record: LogRecord) -> None:
            self.count += 1
    
    
    class OutputMethod(enum.Enum):
        INFLUX = "influx"
        STDOUT = "stdout"
        NO_OUT = "no-out"
    
    
    def setup_logging(debug=False) -> MessageCounter:
        """
        :param debug: set log level to DEBUG, or INFO otherwise
        :returns: a MessageCounter object that tracks error log messages
        """
    
        # demote ncclient logs
        def changeLevel(record):
            if record.levelno == logging.INFO:
                record.levelno = logging.DEBUG
                record.levelname = "DEBUG"
            return record
    
        def drop(record):
            pass
    
        logging.getLogger("ncclient.operations.rpc").addFilter(changeLevel)
        logging.getLogger("ncclient.transport.tls").addFilter(changeLevel)
        logging.getLogger("ncclient.transport.ssh").addFilter(drop)
        logging.getLogger("ncclient.transport.parser").addFilter(drop)
    
        level = logging.DEBUG if debug else logging.INFO
        counter = MessageCounter(level=logging.ERROR)
        stream_handler = logging.StreamHandler(sys.stdout)
        stream_handler.setLevel(level)
        logging.basicConfig(
            format="%(asctime)s - %(levelname)s - %(message)s",
            level=level,
            handlers=[counter, stream_handler],
        )
        return counter
    
    
    def write_points(
        points: Iterable[dict], influx_params: dict, output: OutputMethod, **kwargs
    ):
        if output == OutputMethod.INFLUX:
            return write_points_to_influx(points, influx_params=influx_params, **kwargs)
        if output == OutputMethod.STDOUT:
            return write_points_to_stdout(points, influx_params=influx_params, **kwargs)
        if output == OutputMethod.NO_OUT:
            return
        raise ValueError(f"Unsupported output method: {output.value()}")
    
    
    def write_points_to_influx(
        points: Iterable[dict],
        influx_params: dict,
        timeout=5,
        batch_size=50,
    ):
        client = influx_client({"timeout": timeout, **influx_params})
        with client:
            client.write_points(points, batch_size=batch_size)
    
    
    def write_points_to_stdout(points, influx_params, stream=sys.stdout, **_):
        for point in points:
            stream.write(f"{influx_params['measurement']} - {json.dumps(point)}\n")
        stream.flush()
    
    
    def get_netconf(router_name, vendor: Vendor, ssh_params: dict, **kwargs):
        module = juniper if vendor == Vendor.JUNIPER else nokia
        return module.get_netconf_interface_info(
            router_name, ssh_params=ssh_params, **kwargs
        )
    
    
    def get_interfaces_for_router(
        router: str,
        inprov_hosts: List[str],
    ) -> List[str]:
        logger.info(f"Fetching interfaces from inventory provider: {inprov_hosts}")
    
        all_interfaces = [
            ifc["name"] for ifc in load_interfaces(inprov_hosts) if ifc["router"] == router
        ]
    
        if not all_interfaces:
            raise click.ClickException(f"No interfaces found for router {router}")
    
        return all_interfaces
    
    
    def process_router(
        router_fqdn: str,
        vendor: Vendor,
        interfaces: Optional[List[str]],
        app_config_params: dict,
        output: OutputMethod,
    ):
    
        logger.info(f"Processing {vendor.value.capitalize()} router {router_fqdn}")
    
        ssh_params = app_config_params[vendor.value]
        document = get_netconf(router_fqdn, vendor=vendor, ssh_params=ssh_params)
        timestamp = datetime.now()
    
        influx_params = app_config_params["influx"]["brian-counters"]
        logger.info("Processing Brian points...")
        points = list(
            _brian_points(
                router_fqdn=router_fqdn,
                netconf_doc=document,
                interfaces=interfaces,
                timestamp=timestamp,
                measurement_name=influx_params["measurement"],
                vendor=vendor,
            )
        )
        _log_interface_points_sorted(points)
        write_points(points, influx_params=influx_params, output=output)
    
        influx_params = app_config_params["influx"]["error-counters"]
        logger.info("Processing Error points...")
        points = list(
            _error_points(
                router_fqdn=router_fqdn,
                netconf_doc=document,
                interfaces=interfaces,
                timestamp=timestamp,
                measurement_name=influx_params["measurement"],
                vendor=vendor,
            )
        )
    
        _log_interface_points_sorted(points, point_kind="error")
        write_points(points, influx_params=influx_params, output=output)
    
    
    def _brian_points(
        router_fqdn: str,
        netconf_doc: etree.Element,
        interfaces: Optional[List[str]],
        timestamp: datetime,
        measurement_name: str,
        vendor: Vendor,
    ):
        module = juniper if vendor == Vendor.JUNIPER else nokia
        interfaces = module.interface_counters(netconf_doc, interfaces=interfaces)
        yield from vendors.brian_points(
            router_fqdn, interfaces, timestamp, measurement_name
        )
    
    
    def _error_points(
        router_fqdn: str,
        netconf_doc: etree.Element,
        interfaces: Optional[List[str]],
        timestamp: datetime,
        measurement_name: str,
        vendor: Vendor,
    ):
        module = juniper if vendor == Vendor.JUNIPER else nokia
        interfaces = module.interface_counters(netconf_doc, interfaces=interfaces)
        yield from vendors.error_points(
            router_fqdn, interfaces, timestamp, measurement_name
        )
    
    
    def _log_interface_points_sorted(points: Collection[dict], point_kind=""):
        N_COLUMNS = 5
        num_points = len(points)
        point_kind = point_kind + " " if point_kind else ""
        semicolon = ":" if num_points else ""
        logger.info(f"Found {point_kind}points for {num_points} interfaces{semicolon}")
    
        if not points:
            return
    
        interfaces = sorted(p["tags"]["interface_name"] for p in points)
        longest_ifc = max(len(i) for i in interfaces)
        ifc_count = len(interfaces)
        for n in range(ifc_count // N_COLUMNS + (ifc_count % N_COLUMNS > 0)):
            ifc_slice = interfaces[n * N_COLUMNS : (n + 1) * N_COLUMNS]
            logger.info("    ".join(i.ljust(longest_ifc) for i in ifc_slice))
    
    
    ALL_ = object()
    
    
    def main(
        app_config_params: dict,
        router_fqdn: str,
        vendor: Vendor,
        output: OutputMethod = OutputMethod.INFLUX,
        interfaces=ALL_,
    ):
        vendor_str = vendor.value
        inprov_hosts = app_config_params.get("inventory")
    
        if not app_config_params.get(vendor_str):
            raise ValueError(f"'{vendor_str}' ssh params are required")
    
        # if we choose to write points for all interfaces and we have provided inventory
        # provider hosts, we make a selection based on the interfaces. Otherwise we write
        # points for all interfaces we find on the router
        if interfaces is ALL_:
            if inprov_hosts is not None:
                interfaces = get_interfaces_for_router(
                    router_fqdn, inprov_hosts=inprov_hosts
                )
            else:
                interfaces = None
    
        process_router(
            router_fqdn=router_fqdn,
            vendor=vendor,
            interfaces=interfaces,
            app_config_params=app_config_params,
            output=output,
        )
    
    
    def validate_config(_unused_ctx, _unused_param, file):
        try:
            return config.load(file)
        except json.JSONDecodeError:
            raise click.BadParameter("config file is not valid json")
        except jsonschema.ValidationError as e:
            raise click.BadParameter(e)
    
    
    def validate_hostname(_unused_ctx, _unused_param, hostname_or_names):
        hostnames = (
            hostname_or_names
            if isinstance(hostname_or_names, (list, tuple))
            else [hostname_or_names]
        )
        for _h in hostnames:
            try:
                socket.gethostbyname(_h)
            except socket.error:
                raise click.BadParameter(f"{_h} is not resolveable")
        return hostname_or_names
    
    
    @click.command()
    @click.option(
        "--config",
        "app_config_params",
        required=True,
        type=click.File("r"),
        help="config filename",
        callback=validate_config,
    )
    @click.option(
        "--juniper",
        help="A Juniper router fqdn",
    )
    @click.option(
        "--nokia",
        help="A Nokia router fqdn",
    )
    @click.option(
        "-o",
        "--output",
        type=click.Choice(["influx", "stdout", "no-out"], case_sensitive=False),
        default="influx",
        help="Choose an output method. Default: influx",
    )
    @click.option(
        "--all",
        "all_",
        is_flag=True,
        default=False,
        help=(
            "Write points for all interfaces found in inventory provider for this router."
            " Do not use this flag when supplying a list of interfaces"
        ),
    )
    @click.option(
        "-v",
        "--verbose",
        is_flag=True,
        default=False,
        help="Run with verbose output",
    )
    @click.argument("interfaces", nargs=-1)
    def cli(
        app_config_params: dict,
        juniper: bool,
        nokia: bool,
        output: str,
        all_: bool,
        verbose: bool,
        interfaces: List[str],
    ):
        if not (interfaces or all_):
            # Do nothing if no interfaces are specified
            return
    
        if interfaces and all_:
            raise click.BadParameter("Do not supply both 'interfaces' and '--all'")
    
        if not (juniper or nokia) or (juniper and nokia):
            raise click.BadParameter(
                "Supply either a '--juniper' or '--nokia' router, but not both"
            )
        router_fqdn = juniper or nokia
        vendor = Vendor.JUNIPER if juniper else Vendor.NOKIA
    
        error_counter = setup_logging(debug=verbose)
    
        try:
            main(
                app_config_params=app_config_params,
                router_fqdn=router_fqdn,
                vendor=vendor,
                output=OutputMethod(output.lower()),
                interfaces=interfaces if interfaces else ALL_,
            )
        except Exception:
            logger.exception(
                f"Error while processing {vendor.value.capitalize()} router {router_fqdn}"
            )
    
        if error_counter.count:
            raise click.ClickException(
                "Errors were encountered while processing interface stats"
            )
    
    
    if __name__ == "__main__":
        cli()