Skip to content
Snippets Groups Projects
Commit f614e056 authored by Erik Reid's avatar Erik Reid
Browse files

added influx output support

parent a0819099
No related branches found
No related tags found
No related merge requests found
"""
influx i/o
==============
.. contents:: :local:
.. autofunction:: infinera_usage_stats.influx.save_counters
"""
import contextlib
import functools
import logging
from typing import Callable, Iterable, Optional
from influxdb import InfluxDBClient
logger = logging.getLogger(__name__)
BATCH_SIZE = 100
INFLUX_POINT = {
'$schema': 'http://json-schema.org/draft-07/schema#',
'type': 'object',
'properties': {
'measurement': {'type': 'string'},
'time': {'type': 'string'},
'tags': {'type': 'object'},
'fields': {'type': 'object'}
},
'required': ['measurement', 'time', 'tags', 'fields'],
'additionalProperties': False
}
def influx_client(influx_params):
"""
build the influx connection
:param influx_params: the `influx` element from `CONFIG_DATA`_
:return: an InfluxDBClient instance
"""
return InfluxDBClient(
host=influx_params['hostname'],
port=influx_params['port'],
database=influx_params['database'],
username=influx_params['username'],
password=influx_params['password'],
ssl=influx_params['ssl'],
verify_ssl=influx_params['ssl'])
def save_counters(
influx_params: dict,
measurement: str,
data: Iterable[dict],
to_point: Optional[Callable[[dict, str], dict]]) -> int:
"""
Write the volume counters to influxdb.
:param influx_params: the `influx` element from `CONFIG_SCHEMA`_
:param measurement: measurement name
:param data: iterable
of objects to save
:param to_point: callable converting elements from data
returning iterable of `INFLUX_POINT` objects
:return: number of points generated
"""
_convert_point = functools.partial(
to_point, measurement=measurement)
with contextlib.closing(influx_client(influx_params)) as influx:
_current_batch = []
for _p in map(_convert_point, data):
_current_batch.append(_p)
if len(_current_batch) >= BATCH_SIZE:
influx.write_points(_current_batch)
_current_batch = []
if _current_batch:
influx.write_points(_current_batch)
from datetime import datetime
BRIAN_POINT_SCHEMA = { BRIAN_COUNTER_DICT_SCHEMA = {
'$schema': 'https://json-schema.org/draft/2020-12/schema', '$schema': 'https://json-schema.org/draft/2020-12/schema',
'type': 'object', 'type': 'object',
...@@ -29,7 +30,8 @@ BRIAN_POINT_SCHEMA = { ...@@ -29,7 +30,8 @@ BRIAN_POINT_SCHEMA = {
'additionalProperties': False 'additionalProperties': False
} }
def points(router_fqdn, interface_counters):
def counters(router_fqdn, interface_counters):
""" """
:param router_fqdn: hostname :param router_fqdn: hostname
:param interface_counters: either PHYSICAL_INTERFACE_COUNTER_SCHEMA :param interface_counters: either PHYSICAL_INTERFACE_COUNTER_SCHEMA
...@@ -66,3 +68,25 @@ def points(router_fqdn, interface_counters): ...@@ -66,3 +68,25 @@ def points(router_fqdn, interface_counters):
return _p return _p
return map(_counters2point, interface_counters) return map(_counters2point, interface_counters)
def ctr2point(measurement, counters):
"""
:param measurement: the measurement where the point will be written
:param counters: a BRIAN_COUNTER_DICT_SCHEMA object
:return: a brian_polling_manager.influx.INFLUX_POINT object
"""
def _is_tag(key_name):
return key_name == 'hostname' or key_name == 'interface_name'
fields = dict([_k, _v] for _k, _v in counters.items() if not _is_tag(_k))
tags = dict([_k, _v] for _k, _v in counters.items() if _is_tag(_k))
return {
'measurement': measurement,
'tags': tags,
'fields': fields,
# TODO: let's set this at neconf query/response time
'time': datetime.now().strftime('%Y-%m-%dT%H:%M:%SZ'),
}
\ No newline at end of file
...@@ -5,6 +5,7 @@ statsd ...@@ -5,6 +5,7 @@ statsd
flask flask
lxml lxml
ncclient ncclient
influxdb
pytest pytest
responses responses
......
...@@ -15,7 +15,8 @@ setup( ...@@ -15,7 +15,8 @@ setup(
'statsd', 'statsd',
'flask', 'flask',
'lxml', 'lxml',
'ncclient' 'ncclient',
'influxdb'
], ],
entry_points={ entry_points={
'console_scripts': [ 'console_scripts': [
......
from functools import partial
import itertools import itertools
import os import os
import re import re
...@@ -11,7 +12,7 @@ import responses ...@@ -11,7 +12,7 @@ import responses
from brian_polling_manager.interface_stats import \ from brian_polling_manager.interface_stats import \
PHYSICAL_INTERFACE_COUNTER_SCHEMA, LOGICAL_INTERFACE_COUNTER_SCHEMA PHYSICAL_INTERFACE_COUNTER_SCHEMA, LOGICAL_INTERFACE_COUNTER_SCHEMA
from brian_polling_manager.interface_stats import brian, juniper from brian_polling_manager.interface_stats import brian, juniper
from brian_polling_manager import inventory from brian_polling_manager import inventory, influx
# logging.basicConfig(level=logging.INFO) # logging.basicConfig(level=logging.INFO)
# logging.getLogger('ncclient').setLevel(level=logging.WARNING) # logging.getLogger('ncclient').setLevel(level=logging.WARNING)
...@@ -118,18 +119,39 @@ def test_verify_all_interfaces_present(router_fqdn, ifc_netconf_rpc): ...@@ -118,18 +119,39 @@ def test_verify_all_interfaces_present(router_fqdn, ifc_netconf_rpc):
assert not _is_enabled(ifc_name, doc) assert not _is_enabled(ifc_name, doc)
@pytest.mark.parametrize('router_fqdn', ROUTERS)
def test_physical_brian_counters(router_fqdn, ifc_netconf_rpc):
doc = juniper.get_interface_info_ncrpc(router_fqdn)
interfaces = juniper.physical_interface_counters(doc)
counters = brian.counters(router_fqdn=router_fqdn, interface_counters=interfaces)
for ctrs in counters:
jsonschema.validate(ctrs, brian.BRIAN_COUNTER_DICT_SCHEMA)
@pytest.mark.parametrize('router_fqdn', ROUTERS)
def test_logical_brian_counters(router_fqdn, ifc_netconf_rpc):
doc = juniper.get_interface_info_ncrpc(router_fqdn)
interfaces = juniper.logical_interface_counters(doc)
counters = brian.counters(router_fqdn=router_fqdn, interface_counters=interfaces)
for ctrs in counters:
jsonschema.validate(ctrs, brian.BRIAN_COUNTER_DICT_SCHEMA)
@pytest.mark.parametrize('router_fqdn', ROUTERS) @pytest.mark.parametrize('router_fqdn', ROUTERS)
def test_physical_brian_points(router_fqdn, ifc_netconf_rpc): def test_physical_brian_points(router_fqdn, ifc_netconf_rpc):
doc = juniper.get_interface_info_ncrpc(router_fqdn) doc = juniper.get_interface_info_ncrpc(router_fqdn)
interfaces = juniper.physical_interface_counters(doc) interfaces = juniper.physical_interface_counters(doc)
for point in brian.points(router_fqdn=router_fqdn, interface_counters=interfaces): counters = brian.counters(router_fqdn=router_fqdn, interface_counters=interfaces)
print(point) for _p in map(partial(brian.ctr2point, 'bogus-measurement'), counters):
jsonschema.validate(point, brian.BRIAN_POINT_SCHEMA) jsonschema.validate(_p, influx.INFLUX_POINT)
@pytest.mark.parametrize('router_fqdn', ROUTERS) @pytest.mark.parametrize('router_fqdn', ROUTERS)
def test_logical_brian_points(router_fqdn, ifc_netconf_rpc): def test_logical_brian_points(router_fqdn, ifc_netconf_rpc):
doc = juniper.get_interface_info_ncrpc(router_fqdn) doc = juniper.get_interface_info_ncrpc(router_fqdn)
interfaces = juniper.logical_interface_counters(doc) interfaces = juniper.logical_interface_counters(doc)
for point in brian.points(router_fqdn=router_fqdn, interface_counters=interfaces): counters = brian.counters(router_fqdn=router_fqdn, interface_counters=interfaces)
jsonschema.validate(point, brian.BRIAN_POINT_SCHEMA) for _p in map(partial(brian.ctr2point, 'bogus-measurement'), counters):
jsonschema.validate(_p, influx.INFLUX_POINT)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment