import enum import json import logging import sys import threading import traceback from multiprocessing import Queue from datetime import datetime from logging import LogRecord from typing import Any, Collection, Dict, Iterable, List, Optional import click import jsonschema from brian_polling_manager.influx import influx_client from brian_polling_manager.inventory import ( GWS_INDIRECT_SCHEMA, INVENTORY_INTERFACES_SCHEMA, load_inventory_json, ) from brian_polling_manager.interface_stats.common import PointGroup, RouterProcessor from brian_polling_manager.interface_stats.juniper import JuniperRouterProcessor from brian_polling_manager.interface_stats.nokia import NokiaRouterProcessor logger = logging.getLogger() DEFAULT_INTERFACES_URL = "/poller/interfaces/" def write_points_to_influx( points: Iterable[dict], influx_params: dict, timeout=5, batch_size=50, ): client = influx_client({"timeout": timeout, **influx_params}) with client: client.write_points(points, batch_size=batch_size) def write_points_to_stdout(points, influx_params, stream=sys.stdout, **_): for point in points: stream.write(f"{influx_params['measurement']} - {json.dumps(point)}\n") stream.flush() class OutputMethod(enum.Enum): INFLUX = ("influx", write_points_to_influx) STDOUT = ("stdout", write_points_to_stdout) NO_OUT = ("no-out", lambda *_, **__: None) def write_points(self, points: Iterable[dict], influx_params: dict, **kwargs): return self.value[1](points, influx_params=influx_params, **kwargs) @classmethod def from_string(cls, method: str): return {m.value[0]: m for m in cls}[method] def __str__(self): return self.value[0] class MessageCounter(logging.NullHandler): def __init__(self, level=logging.NOTSET) -> None: super().__init__(level) self.count = 0 def handle(self, record: LogRecord) -> None: self.count += 1 def setup_logging(debug=False) -> MessageCounter: """ :param debug: set log level to DEBUG, or INFO otherwise :returns: a MessageCounter object that tracks error log messages """ # demote ncclient logs def changeLevel(record): if record.levelno == logging.INFO: record.levelno = logging.DEBUG record.levelname = "DEBUG" return record def drop(record): pass logging.getLogger("ncclient.operations.rpc").addFilter(changeLevel) logging.getLogger("ncclient.transport.tls").addFilter(changeLevel) logging.getLogger("ncclient.transport.ssh").addFilter(drop) logging.getLogger("ncclient.transport.parser").addFilter(drop) level = logging.DEBUG if debug else logging.INFO counter = MessageCounter(level=logging.ERROR) stream_handler = logging.StreamHandler(sys.stdout) stream_handler.setLevel(level) logging.basicConfig( format="%(asctime)s - %(levelname)s - %(message)s", level=level, handlers=[counter, stream_handler], ) return counter def load_interfaces( router_fqdn: str, interfaces: Any, point_group: PointGroup, config: dict, ) -> Dict[str, dict]: # if we choose to write points for all interfaces and we have provided inventory # provider hosts, we make a selection based on the interfaces. Otherwise we write # points for all interfaces we find on the router if interfaces is not ALL_: return {ifc: {} for ifc in interfaces} inprov_hosts = config["inventory"] params = point_group.get_config(config) return _get_interfaces_for_router( router_fqdn, inprov_hosts=inprov_hosts, url=params.get("inventory-url", DEFAULT_INTERFACES_URL), point_group=point_group, ) def _get_interfaces_for_router( router: str, inprov_hosts: List[str], url: str, point_group: PointGroup ) -> Dict[str, dict]: logger.info( f"Fetching interfaces from inventory provider: {inprov_hosts} using url '{url}'" ) if point_group == PointGroup.GWS_INDIRECT: all_interfaces = { ifc["interface"]: ifc for ifc in load_inventory_json(url, inprov_hosts, GWS_INDIRECT_SCHEMA) if ifc["hostname"] == router } else: all_interfaces = { ifc["name"]: ifc for ifc in load_inventory_json( url, inprov_hosts, INVENTORY_INTERFACES_SCHEMA ) if ifc["router"] == router } return all_interfaces def process_router( processor: RouterProcessor, point_group: PointGroup, interfaces: Dict[str, dict], timestamp: datetime, output: OutputMethod, ): influx_params = processor.group_config(point_group)["influx"] points = list( processor.points( point_group=point_group, timestamp=timestamp, interfaces=interfaces ) ) _log_interface_points_sorted(points, point_group=point_group) output.write_points(points, influx_params=influx_params) def _log_interface_points_sorted(points: Collection[dict], point_group: PointGroup): N_COLUMNS = 5 num_points = len(points) semicolon = ":" if num_points else "" logger.info(f"Found {point_group} points for {num_points} interfaces{semicolon}") if not points: return interfaces = sorted(p["tags"]["interface_name"] for p in points) longest_ifc = max(len(i) for i in interfaces) ifc_count = len(interfaces) for n in range(ifc_count // N_COLUMNS + (ifc_count % N_COLUMNS > 0)): ifc_slice = interfaces[n * N_COLUMNS: (n + 1) * N_COLUMNS] logger.info(" ".join(i.ljust(longest_ifc) for i in ifc_slice)) ALL_ = object() def main( exception_queue: Optional[Queue], processor: RouterProcessor, interfaces=ALL_, output: OutputMethod = OutputMethod.INFLUX, ): try: logger.info( f"Processing {processor.name.capitalize()} router {processor.router_fqdn}" ) timestamp = datetime.now() for point_group in processor.supported_point_groups: logger.info(f"Processing {str(point_group).upper()} points...") inventory = processor.config.get("inventory") check_interfaces = None if inventory is not None: check_interfaces = load_interfaces( router_fqdn=processor.router_fqdn, interfaces=interfaces, point_group=point_group, config=processor.config, ) if not check_interfaces: logger.info(f"No {str(point_group).upper()} interfaces found") continue process_router( processor=processor, point_group=point_group, interfaces=check_interfaces, timestamp=timestamp, output=output, ) except Exception: if not exception_queue: raise # no queue means we don't run in a thread, so we can just raise the exception exc_info = sys.exc_info() formatted = ''.join(traceback.format_exception(*exc_info)) exception_queue.put(formatted) def validate_config(_unused_ctx, _unused_param, file): # import here because this is the only place we use the config module, and we want # to reuse the name `config` for other purposes elsewheres from brian_polling_manager.interface_stats import config try: return config.load(file) except json.JSONDecodeError: raise click.BadParameter("config file is not valid json") except jsonschema.ValidationError as e: raise click.BadParameter(e) @click.command() @click.option( "--config", "config", required=True, type=click.File("r"), help="config filename", callback=validate_config, ) @click.option("--juniper", help="A Juniper router fqdn") @click.option("--nokia", help="A Nokia router fqdn") @click.option( "-o", "--output", type=click.Choice(["influx", "stdout", "no-out"], case_sensitive=False), default="influx", help="Choose an output method. Default: influx", ) @click.option( "--all", "all_", is_flag=True, default=False, help=( "Write points for all interfaces found in inventory provider for this router." " Do not use this flag when supplying a list of interfaces" ), ) @click.option( "-v", "--verbose", is_flag=True, default=False, help="Run with verbose output" ) @click.argument("interfaces", nargs=-1) def cli( config: dict, juniper: bool, nokia: bool, output: str, all_: bool, verbose: bool, interfaces: List[str], ): if not (interfaces or all_): # Do nothing if no interfaces are specified return if interfaces and all_: raise click.BadParameter("Do not supply both 'interfaces' and '--all'") if not (juniper or nokia) or (juniper and nokia): raise click.BadParameter( "Supply either a '--juniper' or '--nokia' router, but not both" ) router_fqdn = juniper or nokia if juniper: processor = JuniperRouterProcessor(router_fqdn, config) else: processor = NokiaRouterProcessor(router_fqdn, config) error_counter = setup_logging(debug=verbose) exception_queue = Queue() thread = threading.Thread( target=main, args=(exception_queue, processor, interfaces if interfaces else ALL_, OutputMethod.from_string(output.lower())), ) thread.daemon = True thread.start() thread.join(timeout=120) if thread.is_alive() or not exception_queue.empty(): if thread.is_alive(): logger.error("Thread timed out") frames = sys._current_frames() for thread_id, frame in frames.items(): if thread_id == thread.ident: logger.error("Thread stack trace:") for line in traceback.format_stack(frame): logger.error(line.strip()) logger.error("-- Thread stack trace end --") else: logger.error(f"Error while processing {processor.name.capitalize()} router {router_fqdn}:") exception = exception_queue.get() logger.error(exception) raise click.exceptions.Exit(2) if error_counter.count: # Exit code 1 indicates WARNING in Sensu raise click.ClickException( "Errors were encountered while processing interface stats" ) if __name__ == "__main__": cli()