cli.py 7.19 KiB
import contextlib
from datetime import datetime
from functools import partial
import json
import socket
import click
import jsonschema
from lxml import etree
from brian_polling_manager.interface_stats import config, brian, errors
from brian_polling_manager.interface_stats.vendors import juniper
from brian_polling_manager import influx, inventory
import logging.config
import os
LOGGING_DEFAULT_CONFIG = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'simple': {
'format': '%(asctime)s - %(name)s '
'(%(lineno)d) - %(levelname)s - %(message)s'
}
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'level': 'DEBUG',
'formatter': 'simple',
'stream': 'ext://sys.stdout'
},
},
'loggers': {
'brian_polling_manager': {
'level': 'DEBUG',
'handlers': ['console'],
'propagate': False
}
},
'root': {
'level': 'INFO',
'handlers': ['console']
}
}
def setup_logging():
"""
set up logging using the configured filename
if LOGGING_CONFIG is defined in the environment, use this for
the filename, otherwise use LOGGING_DEFAULT_CONFIG
"""
logging_config = LOGGING_DEFAULT_CONFIG
if 'LOGGING_CONFIG' in os.environ:
filename = os.environ['LOGGING_CONFIG']
with open(filename) as f:
logging_config = json.loads(f.read())
# # TODO: this mac workaround should be removed ...
# import platform
# if platform.system() == 'Darwin':
# logging_config['handlers']['syslog_handler']['address'] \
# = '/var/run/syslog'
logging.config.dictConfig(logging_config)
def ctr2point(measurement, timestamp, counters):
"""
:param measurement: the measurement where the point will be written
:param counters: either a BRIAN_COUNTER_DICT_SCHEMA or
ERROR_COUNTER_DICT_SCHEMA object
:return: a brian_polling_manager.influx.INFLUX_POINT object
"""
def _is_tag(key_name):
return key_name == 'hostname' or key_name == 'interface_name'
fields = dict([_k, _v] for _k, _v in counters.items() if not _is_tag(_k))
tags = dict([_k, _v] for _k, _v in counters.items() if _is_tag(_k))
return {
'time': timestamp.strftime('%Y-%m-%dT%H:%M:%SZ'),
'measurement': measurement,
'tags': tags,
'fields': fields,
}
def _validate_config(_unused_ctx, _unused_param, file):
try:
return config.load(file)
except json.JSONDecodeError:
raise click.BadParameter('config file is not valid json')
except jsonschema.ValidationError as e:
raise click.BadParameter(e)
def _validate_hostname(_unused_ctx, _unused_param, hostname):
try:
socket.gethostbyname(hostname)
except socket.error:
raise click.BadParameter(f'{hostname} is not resolveable')
return hostname
def _brian_points(router_fqdn, netconf_doc, timestamp, measurement_name):
"""
returns an interable of points that can be written to influxdb
:param router_fqdn: 'hostname' tag to be set in the generated points
:param netconf_doc: a valid loaded netconf doc
:param timestamp: timestamp to put in the generated points
:param measurement_name: measurement name to put in the generated points
:return:
"""
_ctr2point = partial(ctr2point, measurement_name, timestamp)
interfaces = juniper.physical_interface_counters(netconf_doc)
counters = brian.counters(router_fqdn=router_fqdn, interface_counters=interfaces)
yield from map(_ctr2point, counters)
interfaces = juniper.logical_interface_counters(netconf_doc)
counters = brian.counters(router_fqdn=router_fqdn, interface_counters=interfaces)
yield from map(_ctr2point, counters)
def _error_points(router_fqdn, netconf_doc, timestamp, measurement_name):
"""
returns an interable of points that can be written to influxdb
:param router_fqdn: 'hostname' tag to be set in the generated points
:param netconf_doc: a valid loaded netconf doc
:param timestamp: timestamp to put in the generated points
:param measurement_name: measurement name to put in the generated points
:return:
"""
_ctr2point = partial(ctr2point, measurement_name, timestamp)
interfaces = juniper.physical_interface_counters(netconf_doc)
counters = errors.counters(router_fqdn=router_fqdn, interface_counters=interfaces)
yield from map(_ctr2point, counters)
interfaces = juniper.logical_interface_counters(netconf_doc)
counters = errors.counters(router_fqdn=router_fqdn, interface_counters=interfaces)
yield from map(_ctr2point, counters)
def _main(router_fqdn: str, app_config_params: dict):
"""
callable entry point, without click - for testing
loads the netconf 'show interfaces' info for the router fqdn,
then uses this to generate brian and error points and writes them to influxdb
note: the optional inventory param is only used to validate the hostname
:param router_fqdn:
:param app_config_params:
:return:
"""
if 'inventory' in app_config_params:
all_routers = {_ifc['router'] for _ifc in inventory.load_interfaces()}
if router_fqdn not in all_routers:
raise ValueError(f'{router_fqdn} must be one of: {all_routers}')
setup_logging()
netconf_doc = juniper.get_interface_info_ncrpc(
router_fqdn,
ssh_config=app_config_params['ssh-config'])
netconf_timestamp = datetime.now()
influx_params = app_config_params['influx']['brian-counters']
points = _brian_points(
router_fqdn=router_fqdn,
netconf_doc=netconf_doc,
timestamp=netconf_timestamp,
measurement_name=influx_params['measurement'])
with contextlib.closing(influx.influx_client(influx_params)) as client:
client.write_points(points)
influx_params = app_config_params['influx']['error-counters']
points = _error_points(
router_fqdn=router_fqdn,
netconf_doc=netconf_doc,
timestamp=netconf_timestamp,
measurement_name=influx_params['measurement'])
with contextlib.closing(influx.influx_client(influx_params)) as client:
client.write_points(points)
@click.command()
@click.option(
'--config', 'app_config_params',
required=True,
type=click.File('r'),
help='config filename',
callback=_validate_config)
@click.option(
'--router', 'router_fqdn',
required=True,
type=click.STRING,
help='router fqdn',
callback=_validate_hostname)
@click.option(
'--netconf-only/--no-netconf-only', 'netconf_only',
default=False,
type=click.BOOL,
help='just run the netconf query and output the simplified xml, then exit')
def main(router_fqdn: str, app_config_params: dict, netconf_only: bool):
# this is expected to be called only when debugging config/setup
if netconf_only:
_doc = juniper.get_interface_info_ncrpc(
router_fqdn,
ssh_config=app_config_params['ssh-config'])
print(etree.tostring(_doc, pretty_print=True).decode('utf-8'))
return
# normal flow ...
_main(router_fqdn, app_config_params)
if __name__ == '__main__':
main()