Skip to content
Snippets Groups Projects
Commit af131e6a authored by Erik Reid's avatar Erik Reid
Browse files

working skeleton scid rates

parent f1eed84a
No related branches found
No related tags found
No related merge requests found
...@@ -12,7 +12,7 @@ from fastapi import FastAPI ...@@ -12,7 +12,7 @@ from fastapi import FastAPI
from mapping_provider import config, environment from mapping_provider import config, environment
from mapping_provider.api import common, map from mapping_provider.api import common, map
from mapping_provider.backends import cache, correlator, inventory from mapping_provider.backends import cache, correlator, inventory, brian
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -49,6 +49,16 @@ async def lifespan(app: FastAPI, app_config: config.Configuration) -> AsyncItera ...@@ -49,6 +49,16 @@ async def lifespan(app: FastAPI, app_config: config.Configuration) -> AsyncItera
}) })
_inventory_thread.start() _inventory_thread.start()
_brian_thread = threading.Thread(
target=brian.worker_proc,
daemon=True,
kwargs={
'brian_params': app_config.brian,
'refresh_frequency_seconds': 300,
'stop_event': stop_event,
})
_brian_thread.start()
yield None # wait here until the app is shutting down (mypy needs a value to be yielded) yield None # wait here until the app is shutting down (mypy needs a value to be yielded)
stop_event.set() stop_event.set()
......
import os
import contextlib
from influxdb import InfluxDBClient
from threading import Event
import time
import logging
from mapping_provider import config
from mapping_provider.backends import cache
logger = logging.getLogger(__name__)
# os.environ['SETTINGS_FILENAME'] = os.path.join(os.path.dirname(__file__), 'config.json')
# params = config.load()
BRIAN_SCID_RATES_FILENAME = 'brian-scid-rates.json'
@contextlib.contextmanager
def influx_client(influx_params: config.InfluxConnectionParams):
"""
context manager for creating an influx db connection
exactly one of schema_name or database should be provided
:param influx_params: the 'influx' element from config
:param database: a key from INFLUX_COUNTER_SCHEMAS
:return:
"""
client = None
try:
client = InfluxDBClient(
host=influx_params.hostname,
port=influx_params.port,
database=influx_params.database,
username=influx_params.username,
password=influx_params.password,
ssl=influx_params.ssl,
verify_ssl=influx_params.ssl)
yield client
finally:
if client:
logger.debug('closing influx session')
client.close()
def _load_scid_rates_rows(influx_params: config.InfluxConnectionParams, window: str = '24h'):
"""
Return the count of all fields, grouped by
hostname & interface_name.
:param influx_params: the 'influx' element from config
:param schema_name: a key from INFLUX_COUNTER_SCHEMAS
:param window: time filter
:return:
"""
with influx_client(influx_params) as influx:
query = ('select'
' last(ingress) as latest_ingress,'
' last(egress) as latest_egress,'
' mean(ingress) as mean_ingress,'
' mean(egress) as mean_egress,'
' max(ingress) as max_ingress,'
' max(egress) as max_egress'
' from scid_rates'
f' where time > now() - {window}'
' group by scid')
logger.debug(query)
result = influx.query(query)
for series in result.raw['series']:
tags = series['tags']
columns = series['columns']
values = series['values']
assert len(values) == 1
for row in values:
rate = {'scid': tags['scid']}
for field_name, field_value in zip(columns, row):
rate[field_name] = field_value
yield rate
def load_scid_rates(influx_params: config.InfluxConnectionParams):
rates = {}
for r in _load_scid_rates_rows(influx_params):
def _bitrate_or_none(field_name: str) -> float | None:
if field_name in r:
return r[field_name] * 8
return None
values = {
'latest': {
'ingress': _bitrate_or_none('latest_ingress'),
'egress': _bitrate_or_none('latest_egress')
},
'mean': {
'ingress': _bitrate_or_none('mean_ingress'),
'egress': _bitrate_or_none('mean_egress')
},
'max': {
'ingress': _bitrate_or_none('max_ingress'),
'egress': _bitrate_or_none('max_egress')
},
}
rates[r['scid']] = values
cache.set(BRIAN_SCID_RATES_FILENAME, rates)
return rates
def worker_proc(
brian_params: config.InfluxConnectionParams,
refresh_frequency_seconds: int,
stop_event: Event | None = None) -> None:
def _should_stop() -> bool:
return stop_event.is_set() if stop_event else False
while not _should_stop():
load_scid_rates(brian_params)
# wait and the restart the loop
if stop_event:
stop_event.wait(timeout=refresh_frequency_seconds)
else:
time.sleep(refresh_frequency_seconds)
...@@ -3,7 +3,7 @@ from collections.abc import Generator ...@@ -3,7 +3,7 @@ from collections.abc import Generator
from pydantic import BaseModel from pydantic import BaseModel
from . import cache, correlator, inventory from . import cache, correlator, inventory, brian
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -13,9 +13,17 @@ class Endpoint(BaseModel): ...@@ -13,9 +13,17 @@ class Endpoint(BaseModel):
interface: str interface: str
class BitRates(BaseModel):
ingress: float | None
egress: float | None
class Overlays(BaseModel): class Overlays(BaseModel):
speed: int speed: int
up: bool up: bool
latest: BitRates
mean: BitRates
max: BitRates
class Service(BaseModel): class Service(BaseModel):
...@@ -43,6 +51,7 @@ def _services(service_type: str | None = None) -> Generator[Service]: ...@@ -43,6 +51,7 @@ def _services(service_type: str | None = None) -> Generator[Service]:
scid_current = cache.get(inventory.REPORTING_SCID_CURRENT_CACHE_FILENAME) scid_current = cache.get(inventory.REPORTING_SCID_CURRENT_CACHE_FILENAME)
# poller_interfaces = cache.get(inventory.INPROV_POLLER_INTERFACES_CACHE_FILENAME) # poller_interfaces = cache.get(inventory.INPROV_POLLER_INTERFACES_CACHE_FILENAME)
correlator_state = cache.get(correlator.CACHED_CORRELATOR_STATE_FILENAME) correlator_state = cache.get(correlator.CACHED_CORRELATOR_STATE_FILENAME)
brian_rates = cache.get(brian.BRIAN_SCID_RATES_FILENAME)
except FileNotFoundError: except FileNotFoundError:
logger.exception('not enough data available to build the service list') logger.exception('not enough data available to build the service list')
return return
...@@ -66,9 +75,22 @@ def _services(service_type: str | None = None) -> Generator[Service]: ...@@ -66,9 +75,22 @@ def _services(service_type: str | None = None) -> Generator[Service]:
if service_type and _s['service_type'] != service_type: if service_type and _s['service_type'] != service_type:
continue continue
rates = brian_rates.get(_s['scid'], {})
overlays = Overlays( overlays = Overlays(
speed = _s['speed'], speed = _s['speed'],
up = _s['sid'] not in down_service_sids, up = _s['sid'] not in down_service_sids,
latest = BitRates(
egress = rates['latest']['egress'],
ingress = rates['latest']['ingress'],
),
mean = BitRates(
egress = rates['mean']['egress'],
ingress = rates['mean']['ingress'],
),
max = BitRates(
egress = rates['max']['egress'],
ingress = rates['max']['ingress'],
),
) )
endpoints = [] endpoints = []
......
...@@ -3,6 +3,16 @@ import os ...@@ -3,6 +3,16 @@ import os
from pydantic import BaseModel, Field, HttpUrl from pydantic import BaseModel, Field, HttpUrl
class InfluxConnectionParams(BaseModel):
hostname: str
port: int = 8086
ssl: bool = True
username: str
password: str
database: str
measurement: str
class RMQConnectionParams(BaseModel): class RMQConnectionParams(BaseModel):
brokers: list[str] brokers: list[str]
username: str username: str
...@@ -30,6 +40,7 @@ class Configuration(BaseModel): ...@@ -30,6 +40,7 @@ class Configuration(BaseModel):
reporting: HttpUrl reporting: HttpUrl
rmq: RMQConnectionParams | None = None rmq: RMQConnectionParams | None = None
correlator_exchange: str = 'dashboard.alarms.broadcast' correlator_exchange: str = 'dashboard.alarms.broadcast'
brian: InfluxConnectionParams
def load() -> Configuration: def load() -> Configuration:
......
...@@ -4,6 +4,7 @@ requests ...@@ -4,6 +4,7 @@ requests
jsonschema jsonschema
sentry_sdk sentry_sdk
pika pika
influxdb
httpx # required for fastapi TestClient httpx # required for fastapi TestClient
pytest pytest
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment