diff --git a/mapping_provider/__init__.py b/mapping_provider/__init__.py index 068b038ea5fd9e8c58bff3a47850f8dd610cd92c..2a9675f074b7908fcbfac31d882a3faf3e463882 100644 --- a/mapping_provider/__init__.py +++ b/mapping_provider/__init__.py @@ -12,7 +12,7 @@ from fastapi import FastAPI from mapping_provider import config, environment from mapping_provider.api import common, map -from mapping_provider.backends import cache, correlator, inventory +from mapping_provider.backends import cache, correlator, inventory, brian logger = logging.getLogger(__name__) @@ -49,6 +49,16 @@ async def lifespan(app: FastAPI, app_config: config.Configuration) -> AsyncItera }) _inventory_thread.start() + _brian_thread = threading.Thread( + target=brian.worker_proc, + daemon=True, + kwargs={ + 'brian_params': app_config.brian, + 'refresh_frequency_seconds': 300, + 'stop_event': stop_event, + }) + _brian_thread.start() + yield None # wait here until the app is shutting down (mypy needs a value to be yielded) stop_event.set() diff --git a/mapping_provider/backends/brian.py b/mapping_provider/backends/brian.py new file mode 100644 index 0000000000000000000000000000000000000000..6bac003db7f301a5a48ff2abf662c5f91c0ebc64 --- /dev/null +++ b/mapping_provider/backends/brian.py @@ -0,0 +1,137 @@ +import os +import contextlib +from influxdb import InfluxDBClient +from threading import Event +import time + +import logging + +from mapping_provider import config +from mapping_provider.backends import cache + +logger = logging.getLogger(__name__) + +# os.environ['SETTINGS_FILENAME'] = os.path.join(os.path.dirname(__file__), 'config.json') + +# params = config.load() + + + +BRIAN_SCID_RATES_FILENAME = 'brian-scid-rates.json' + + +@contextlib.contextmanager +def influx_client(influx_params: config.InfluxConnectionParams): + """ + context manager for creating an influx db connection + + exactly one of schema_name or database should be provided + + :param influx_params: the 'influx' element from config + :param database: a key from INFLUX_COUNTER_SCHEMAS + :return: + """ + + client = None + try: + client = InfluxDBClient( + host=influx_params.hostname, + port=influx_params.port, + database=influx_params.database, + username=influx_params.username, + password=influx_params.password, + ssl=influx_params.ssl, + verify_ssl=influx_params.ssl) + yield client + finally: + if client: + logger.debug('closing influx session') + client.close() + + +def _load_scid_rates_rows(influx_params: config.InfluxConnectionParams, window: str = '24h'): + """ + Return the count of all fields, grouped by + hostname & interface_name. + + :param influx_params: the 'influx' element from config + :param schema_name: a key from INFLUX_COUNTER_SCHEMAS + :param window: time filter + :return: + """ + + with influx_client(influx_params) as influx: + + query = ('select' + ' last(ingress) as latest_ingress,' + ' last(egress) as latest_egress,' + ' mean(ingress) as mean_ingress,' + ' mean(egress) as mean_egress,' + ' max(ingress) as max_ingress,' + ' max(egress) as max_egress' + ' from scid_rates' + f' where time > now() - {window}' + ' group by scid') + + logger.debug(query) + result = influx.query(query) + + for series in result.raw['series']: + tags = series['tags'] + columns = series['columns'] + values = series['values'] + assert len(values) == 1 + + for row in values: + rate = {'scid': tags['scid']} + for field_name, field_value in zip(columns, row): + rate[field_name] = field_value + yield rate + + +def load_scid_rates(influx_params: config.InfluxConnectionParams): + rates = {} + for r in _load_scid_rates_rows(influx_params): + + def _bitrate_or_none(field_name: str) -> float | None: + if field_name in r: + return r[field_name] * 8 + return None + + values = { + 'latest': { + 'ingress': _bitrate_or_none('latest_ingress'), + 'egress': _bitrate_or_none('latest_egress') + }, + 'mean': { + 'ingress': _bitrate_or_none('mean_ingress'), + 'egress': _bitrate_or_none('mean_egress') + }, + 'max': { + 'ingress': _bitrate_or_none('max_ingress'), + 'egress': _bitrate_or_none('max_egress') + }, + } + rates[r['scid']] = values + + cache.set(BRIAN_SCID_RATES_FILENAME, rates) + return rates + + +def worker_proc( + brian_params: config.InfluxConnectionParams, + refresh_frequency_seconds: int, + stop_event: Event | None = None) -> None: + + def _should_stop() -> bool: + return stop_event.is_set() if stop_event else False + + while not _should_stop(): + + load_scid_rates(brian_params) + + # wait and the restart the loop + if stop_event: + stop_event.wait(timeout=refresh_frequency_seconds) + else: + time.sleep(refresh_frequency_seconds) diff --git a/mapping_provider/backends/services.py b/mapping_provider/backends/services.py index 6abf52336b30368962310ed5391fbf1d7cd9e59e..a51eb41a739e54b8873a46fc551fcf5e931b0838 100644 --- a/mapping_provider/backends/services.py +++ b/mapping_provider/backends/services.py @@ -3,7 +3,7 @@ from collections.abc import Generator from pydantic import BaseModel -from . import cache, correlator, inventory +from . import cache, correlator, inventory, brian logger = logging.getLogger(__name__) @@ -13,9 +13,17 @@ class Endpoint(BaseModel): interface: str +class BitRates(BaseModel): + ingress: float | None + egress: float | None + + class Overlays(BaseModel): speed: int up: bool + latest: BitRates + mean: BitRates + max: BitRates class Service(BaseModel): @@ -43,6 +51,7 @@ def _services(service_type: str | None = None) -> Generator[Service]: scid_current = cache.get(inventory.REPORTING_SCID_CURRENT_CACHE_FILENAME) # poller_interfaces = cache.get(inventory.INPROV_POLLER_INTERFACES_CACHE_FILENAME) correlator_state = cache.get(correlator.CACHED_CORRELATOR_STATE_FILENAME) + brian_rates = cache.get(brian.BRIAN_SCID_RATES_FILENAME) except FileNotFoundError: logger.exception('not enough data available to build the service list') return @@ -66,9 +75,22 @@ def _services(service_type: str | None = None) -> Generator[Service]: if service_type and _s['service_type'] != service_type: continue + rates = brian_rates.get(_s['scid'], {}) overlays = Overlays( speed = _s['speed'], up = _s['sid'] not in down_service_sids, + latest = BitRates( + egress = rates['latest']['egress'], + ingress = rates['latest']['ingress'], + ), + mean = BitRates( + egress = rates['mean']['egress'], + ingress = rates['mean']['ingress'], + ), + max = BitRates( + egress = rates['max']['egress'], + ingress = rates['max']['ingress'], + ), ) endpoints = [] diff --git a/mapping_provider/config.py b/mapping_provider/config.py index c67b2d25be778b69e6d931b42c0b2bbb1c6f2833..c83d7064acd05770d05a707b39752eeea554b2ab 100644 --- a/mapping_provider/config.py +++ b/mapping_provider/config.py @@ -3,6 +3,16 @@ import os from pydantic import BaseModel, Field, HttpUrl +class InfluxConnectionParams(BaseModel): + hostname: str + port: int = 8086 + ssl: bool = True + username: str + password: str + database: str + measurement: str + + class RMQConnectionParams(BaseModel): brokers: list[str] username: str @@ -30,6 +40,7 @@ class Configuration(BaseModel): reporting: HttpUrl rmq: RMQConnectionParams | None = None correlator_exchange: str = 'dashboard.alarms.broadcast' + brian: InfluxConnectionParams def load() -> Configuration: diff --git a/requirements.txt b/requirements.txt index 8032e99e89c48566a30f9498fc5e43ff6e8fbbba..29ec9efc6aa697b83abf6b7f33e9339f8b171b10 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,6 +4,7 @@ requests jsonschema sentry_sdk pika +influxdb httpx # required for fastapi TestClient pytest