Skip to content
Snippets Groups Projects
Select Git revision
  • 2a2ea58f34aba11dc4c2685fa0d25d7e5c2e98d0
  • python3 default protected
  • feature/exabgp_support2
  • feature/exabgp_support2.bgpextcommunity
  • feature/exabgp_support2.django4.2
  • fix/existingcheck_honor_fragtype
  • feature/python3-authz_netmask
  • feature/authz_netmask
  • fix/wrong_ratelimit_stats
  • feature/requirements_version_update2024-01
  • feature/split_celery
  • feature/improved-warning-mails
  • fix/reenable_expireset_via_restapi
  • feature/admin_user_delete_with_owned_rule_reassigning1
  • feature/admin_user_delete_with_owned_rule_reassigning
  • feature/branded_doc
  • fix/forked_snmp_polling_worker_exit_issue
  • fix/false_user_activation_error
  • feature/exabgp_with_docker-compose
  • fix/prefix_overlap_handling
  • fix/js_security_issues-a
  • save1
  • rpm-1.5-7
  • working1
  • myv1.6
  • t12b1
  • v1.5_newnew2
  • merged_final
  • v1.5_newnew
  • startstop_old
  • myadd2
  • tomas3
  • merge_jra2t6_and_RESTAPI
  • mytomas2
  • mynew1
  • new_jra2t6
  • v1.5_final
  • fod16_ruleroutes-merged_old
  • merged_new
  • v1.6_new_old
  • v1.5_new_old_follower
41 results

models.py

Blame
  • cli.py 8.08 KiB
    import json
    import logging.config
    import os
    import pathlib
    from datetime import datetime
    from functools import partial
    from typing import Callable, Iterable, List
    
    import click
    
    from brian_polling_manager.interface_stats.click_helpers import (
        validate_config,
        validate_hostname,
    )
    from brian_polling_manager.interface_stats.services.netconf import (
        NetconfProvider,
        get_netconf_provider,
    )
    from brian_polling_manager.interface_stats.services.writers import (
        PointWriter,
        get_point_writer,
    )
    from brian_polling_manager.interface_stats.vendors import Vendor, juniper
    from brian_polling_manager.interface_stats import vendors
    
    logger = logging.getLogger(__file__)
    
    LOGGING_DEFAULT_CONFIG = {
        "version": 1,
        "disable_existing_loggers": False,
        "formatters": {
            "simple": {
                "format": "%(asctime)s - %(name)s "
                "(%(lineno)d) - %(levelname)s - %(message)s"
            }
        },
        "handlers": {
            "console": {
                "class": "logging.StreamHandler",
                "level": "DEBUG",
                "formatter": "simple",
                "stream": "ext://sys.stdout",
            },
        },
        "loggers": {
            "brian_polling_manager": {
                "level": "DEBUG",
                "handlers": ["console"],
                "propagate": False,
            }
        },
        "root": {"level": "INFO", "handlers": ["console"]},
    }
    
    
    def setup_logging():
        """
        set up logging using the configured filename
    
        if LOGGING_CONFIG is defined in the environment, use this for
        the filename, otherwise use LOGGING_DEFAULT_CONFIG
        """
        logging_config = LOGGING_DEFAULT_CONFIG
        if "LOGGING_CONFIG" in os.environ:
            filename = os.environ["LOGGING_CONFIG"]
            with open(filename) as f:
                logging_config = json.loads(f.read())
    
        # # TODO: this mac workaround should be removed ...
        # import platform
        # if platform.system() == 'Darwin':
        #     logging_config['handlers']['syslog_handler']['address'] \
        #         = '/var/run/syslog'
    
        logging.config.dictConfig(logging_config)
    
    
    def process_juniper_router(
        router_fqdn: str,
        all_influx_params: dict,
        netconf_provider: NetconfProvider,
        get_point_writer_: Callable[[dict], PointWriter],
    ):
        logger.info(f"processing Juniper router {router_fqdn}")
    
        document = netconf_provider.get(router_fqdn)
        timestamp = datetime.now()
    
        influx_params = all_influx_params["brian-counters"]
        points = _juniper_brian_points(
            router_fqdn=router_fqdn,
            netconf_doc=document,
            timestamp=timestamp,
            measurement_name=influx_params["measurement"],
        )
    
        writer = get_point_writer_(influx_params)
        writer.write_points(points)
    
        influx_params = all_influx_params["error-counters"]
        points = _juniper_error_points(
            router_fqdn=router_fqdn,
            netconf_doc=document,
            timestamp=timestamp,
            measurement_name=influx_params["measurement"],
        )
    
        writer = get_point_writer_(influx_params)
        writer.write_points(points)
    
    
    def _juniper_brian_points(router_fqdn, netconf_doc, timestamp, measurement_name):
        interfaces = juniper.physical_interface_counters(netconf_doc)
        yield from vendors.brian_points(
            router_fqdn, interfaces, timestamp, measurement_name
        )
    
        interfaces = juniper.logical_interface_counters(netconf_doc)
        yield from vendors.brian_points(
            router_fqdn, interfaces, timestamp, measurement_name
        )
    
    
    def _juniper_error_points(router_fqdn, netconf_doc, timestamp, measurement_name):
        interfaces = juniper.physical_interface_counters(netconf_doc)
        yield from vendors.error_points(
            router_fqdn, interfaces, timestamp, measurement_name
        )
    
        # [2024-03-21] We currently have no definition for error points on logical
        # interfaces. This operation is essentially a no-op. Perhaps in the future we will
        # get a definition for errors on logical interfaces
        interfaces = juniper.logical_interface_counters(netconf_doc)
        yield from vendors.error_points(
            router_fqdn, interfaces, timestamp, measurement_name
        )
    
    
    def process_nokia_router(
        router_fqdn: str,
        all_influx_params: dict,
        get_netconf: NetconfProvider,
        writer_factory: Callable[[dict], PointWriter] = get_point_writer,
    ):
        logger.warning(f"skipping Nokia router {router_fqdn}")
    
    
    def get_routers_from_inventory_provider(vendor: Vendor, hosts: Iterable[str]):
        return []
    
    
    def main(
        app_config_params: dict,
        juniper_fqdns: List[str],
        nokia_fqdns: List[str],
        get_netconf_provider_: Callable[[Vendor, dict], NetconfProvider],
        get_point_writer_: Callable[[dict], PointWriter],
        raise_errors=False,
    ):
    
        if not juniper_fqdns and not nokia_fqdns:
            inprov_hosts = app_config_params.get("inventory", [])
            if not inprov_hosts:
                raise ValueError("Must supply at least one inventory provider in config")
            juniper_fqdns = get_routers_from_inventory_provider(
                Vendor.JUNIPER, inprov_hosts
            )
            nokia_fqdns = get_routers_from_inventory_provider(Vendor.NOKIA, inprov_hosts)
    
        if juniper_fqdns:
            juniper_ssh_params = app_config_params.get("juniper")
            if not juniper_ssh_params:
                raise ValueError("'juniper' ssh params are required")
    
            juniper_netconf_provider = get_netconf_provider_(
                Vendor.JUNIPER, juniper_ssh_params
            )
    
        if nokia_fqdns:
            nokia_ssh_params = app_config_params.get("nokia")
            if not nokia_ssh_params:
                raise ValueError("'nokia' ssh params are required")
    
            nokia_netconf_provider = get_netconf_provider_(Vendor.NOKIA, nokia_ssh_params)
    
        error_count = 0
        for router in juniper_fqdns:
            try:
                process_juniper_router(
                    router,
                    app_config_params["influx"],
                    netconf_provider=juniper_netconf_provider,
                    get_point_writer_=get_point_writer_,
                )
            except Exception as e:
                logger.exception(f"Error while processing juniper {router}", exc_info=e)
                if raise_errors:
                    raise
        for router in nokia_fqdns:
            try:
                process_nokia_router(
                    router,
                    app_config_params["influx"],
                    get_netconf=nokia_netconf_provider,
                    get_point_writer_=get_point_writer_,
                )
            except Exception as e:
                logger.exception(f"Error while processing nokia {router}", exc_info=e)
                if raise_errors:
                    raise
        return error_count
    
    
    @click.command()
    @click.option(
        "--config",
        "app_config_params",
        required=True,
        type=click.File("r"),
        help="config filename",
        callback=validate_config,
    )
    @click.option(
        "--juniper",
        "juniper_fqdns",
        multiple=True,
        type=click.STRING,
        help="juniper router fqdn(s)",
        callback=validate_hostname,
    )
    @click.option(
        "--nokia",
        "nokia_fqdns",
        multiple=True,
        type=click.STRING,
        help="nuniper router fqdn(s)",
        callback=validate_hostname,
    )
    @click.option(
        "--source-dir",
        type=click.Path(
            exists=True,
            file_okay=False,
            dir_okay=True,
            readable=True,
            path_type=pathlib.Path,
        ),
        default=None,
        help="Read from a snapshot directory instead of querying the routers",
    )
    @click.option(
        "--dry-run",
        is_flag=True,
        help="Perform a dry run, do not write to influx but dump to stdout",
    )
    def cli(
        app_config_params: dict,
        juniper_fqdns: List[str],
        nokia_fqdns: List[str],
        source_dir: pathlib.Path,
        dry_run: bool,
    ):
    
        writer_factory = partial(get_point_writer, kind="stdout" if dry_run else "influx")
        netconf_provider_factory = partial(get_netconf_provider, source_dir=source_dir)
    
        setup_logging()
    
        error_count = main(
            app_config_params=app_config_params,
            juniper_fqdns=juniper_fqdns,
            nokia_fqdns=nokia_fqdns,
            get_netconf_provider_=netconf_provider_factory,
            get_point_writer_=writer_factory,
        )
        if error_count:
            raise click.ClickException(
                "Errors were encountered while processing interface stats"
            )
    
    
    if __name__ == "__main__":
        cli()