Skip to content
Snippets Groups Projects
Commit 81ee733b authored by Erik Reid's avatar Erik Reid
Browse files

Finished feature NGM-18-utilization-from-influx.

parents f1eed84a 5ba51206
No related branches found
No related tags found
No related merge requests found
......@@ -62,6 +62,13 @@ with tempfile.NamedTemporaryFile(delete=False) as f:
'inventory': 'http://bogus',
'reporting': 'https://bogus',
'correlator_exchange': 'bogus',
'brian': {
'hostname': 'bogus',
'username': 'bogus',
'password': 'bogus',
'database': 'bogus',
'measurement': 'bogus',
}
}
with open(f.name, 'w') as f:
json.dump(bogus_config, f)
......
......@@ -12,7 +12,7 @@ from fastapi import FastAPI
from mapping_provider import config, environment
from mapping_provider.api import common, map
from mapping_provider.backends import cache, correlator, inventory
from mapping_provider.backends import brian, cache, correlator, inventory
logger = logging.getLogger(__name__)
......@@ -49,6 +49,16 @@ async def lifespan(app: FastAPI, app_config: config.Configuration) -> AsyncItera
})
_inventory_thread.start()
_brian_thread = threading.Thread(
target=brian.worker_proc,
daemon=True,
kwargs={
'brian_params': app_config.brian,
'refresh_frequency_seconds': 300,
'stop_event': stop_event,
})
_brian_thread.start()
yield None # wait here until the app is shutting down (mypy needs a value to be yielded)
stop_event.set()
......
import contextlib
import logging
import time
from collections.abc import Generator
from threading import Event
from typing import Any
from influxdb import InfluxDBClient
from mapping_provider import config
from mapping_provider.backends import cache
logger = logging.getLogger(__name__)
# os.environ['SETTINGS_FILENAME'] = os.path.join(os.path.dirname(__file__), 'config.json')
# params = config.load()
CACHED_BRIAN_SCID_RATES_FILENAME = 'brian-scid-rates.json'
CACHED_SCID_RATES_SCHEMA = {
"$schema": "https://json-schema.org/draft/2020-12/schema",
"definitions": {
"in-out-rates": {
"type": "object",
"properties": {
"ingress": {"type": ["number", "null"]},
"egress": {"type": ["number", "null"]},
},
"required": ["ingress", "egress"],
"additionalProperties": False,
},
"rate-stats": {
"type": "object",
"properties": {
"latest": {"$ref": "#/definitions/in-out-rates"},
"mean": {"$ref": "#/definitions/in-out-rates"},
"max": {"$ref": "#/definitions/in-out-rates"},
},
"required": ["latest", "mean", "max"],
"additionalProperties": False,
},
"service-rate": {
"type": "object",
"properties": {
"scid": {"type": "string"},
"values": {"$ref": "#/definitions/rate-stats"},
},
"required": ["scid", "values"],
"additionalProperties": False,
},
},
"type": "array",
"items": {"$ref": "#/definitions/service-rate"},
}
@contextlib.contextmanager
def influx_client(
influx_params: config.InfluxConnectionParams) -> Generator[InfluxDBClient]:
"""
context manager for creating an influx db connection
exactly one of schema_name or database should be provided
:param influx_params: the 'influx' element from config
:param database: a key from INFLUX_COUNTER_SCHEMAS
:return:
"""
client = None
try:
client = InfluxDBClient(
host=influx_params.hostname,
port=influx_params.port,
database=influx_params.database,
username=influx_params.username,
password=influx_params.password,
ssl=influx_params.ssl,
verify_ssl=influx_params.ssl)
yield client
finally:
if client:
logger.debug('closing influx session')
client.close()
def _load_scid_rates_rows(
influx_params: config.InfluxConnectionParams,
window: str = '24h') -> Generator[dict[str, str | float | None]]:
"""
Return the count of all fields, grouped by
hostname & interface_name.
:param influx_params: the 'influx' element from config
:param schema_name: a key from INFLUX_COUNTER_SCHEMAS
:param window: time filter
:return:
"""
with influx_client(influx_params) as influx:
query = ('select'
' last(ingress) as latest_ingress,'
' last(egress) as latest_egress,'
' mean(ingress) as mean_ingress,'
' mean(egress) as mean_egress,'
' max(ingress) as max_ingress,'
' max(egress) as max_egress'
' from scid_rates'
f' where time > now() - {window}'
' group by scid')
logger.debug(query)
result = influx.query(query)
for series in result.raw['series']:
tags = series['tags']
columns = series['columns']
values = series['values']
assert len(values) == 1
for row in values:
rate = {'scid': tags['scid']}
for field_name, field_value in zip(columns, row, strict=False):
rate[field_name] = field_value
yield rate
def load_scid_rates(influx_params: config.InfluxConnectionParams) -> list[dict[str, Any]]:
rates = []
def _bitrate_or_none(row: dict[str, Any], field_name: str) -> float | None:
_rate = row.get(field_name, None)
assert isinstance(_rate, float | None) # mypy noise
if _rate:
return _rate * 8
return _rate # could be 0 or None
for r in _load_scid_rates_rows(influx_params):
values = {
'latest': {
'ingress': _bitrate_or_none(r, 'latest_ingress'),
'egress': _bitrate_or_none(r, 'latest_egress')
},
'mean': {
'ingress': _bitrate_or_none(r, 'mean_ingress'),
'egress': _bitrate_or_none(r, 'mean_egress')
},
'max': {
'ingress': _bitrate_or_none(r, 'max_ingress'),
'egress': _bitrate_or_none(r, 'max_egress')
},
}
rates.append({
'scid': r['scid'],
'values': values
})
cache.set(CACHED_BRIAN_SCID_RATES_FILENAME, rates)
return rates # <-- caller can also retrieve this from the cache
def worker_proc(
brian_params: config.InfluxConnectionParams,
refresh_frequency_seconds: int,
stop_event: Event | None = None) -> None:
def _should_stop() -> bool:
return stop_event.is_set() if stop_event else False
while not _should_stop():
load_scid_rates(brian_params)
# wait and the restart the loop
if stop_event:
stop_event.wait(timeout=refresh_frequency_seconds)
else:
time.sleep(refresh_frequency_seconds)
......@@ -22,9 +22,9 @@ def init(cache_dir: str) -> None:
logger.debug(f"set cache directory: {_cache_dir}")
def set(filename: str, data: dict[str, Any]) -> None:
def set(filename: str, data: Any) -> None:
"""
data must be a JSON-serializable dict.
data must be JSON-serializable.
"""
assert _cache_dir is not None, "cache_dir hasn't been initialized"
with open(os.path.join(_cache_dir, filename), 'w') as f:
......
......@@ -3,7 +3,7 @@ from collections.abc import Generator
from pydantic import BaseModel
from . import cache, correlator, inventory
from . import brian, cache, correlator, inventory
logger = logging.getLogger(__name__)
......@@ -13,9 +13,17 @@ class Endpoint(BaseModel):
interface: str
class BitRates(BaseModel):
ingress: float | None
egress: float | None
class Overlays(BaseModel):
speed: int
up: bool
latest: BitRates
mean: BitRates
max: BitRates
class Service(BaseModel):
......@@ -43,6 +51,7 @@ def _services(service_type: str | None = None) -> Generator[Service]:
scid_current = cache.get(inventory.REPORTING_SCID_CURRENT_CACHE_FILENAME)
# poller_interfaces = cache.get(inventory.INPROV_POLLER_INTERFACES_CACHE_FILENAME)
correlator_state = cache.get(correlator.CACHED_CORRELATOR_STATE_FILENAME)
brian_rates = cache.get(brian.CACHED_BRIAN_SCID_RATES_FILENAME)
except FileNotFoundError:
logger.exception('not enough data available to build the service list')
return
......@@ -57,6 +66,7 @@ def _services(service_type: str | None = None) -> Generator[Service]:
down_service_sids = set(_get_down_correlator_services())
brian_scid_rates = {r['scid']: r['values'] for r in brian_rates}
for _s in scid_current:
......@@ -66,9 +76,22 @@ def _services(service_type: str | None = None) -> Generator[Service]:
if service_type and _s['service_type'] != service_type:
continue
rates = brian_scid_rates.get(_s['scid'], {})
overlays = Overlays(
speed = _s['speed'],
up = _s['sid'] not in down_service_sids,
latest = BitRates(
egress = rates['latest']['egress'],
ingress = rates['latest']['ingress'],
),
mean = BitRates(
egress = rates['mean']['egress'],
ingress = rates['mean']['ingress'],
),
max = BitRates(
egress = rates['max']['egress'],
ingress = rates['max']['ingress'],
),
)
endpoints = []
......
......@@ -3,6 +3,16 @@ import os
from pydantic import BaseModel, Field, HttpUrl
class InfluxConnectionParams(BaseModel):
hostname: str
port: int = 8086
ssl: bool = True
username: str
password: str
database: str
measurement: str
class RMQConnectionParams(BaseModel):
brokers: list[str]
username: str
......@@ -30,6 +40,7 @@ class Configuration(BaseModel):
reporting: HttpUrl
rmq: RMQConnectionParams | None = None
correlator_exchange: str = 'dashboard.alarms.broadcast'
brian: InfluxConnectionParams
def load() -> Configuration:
......
......@@ -11,6 +11,10 @@ strict = true
warn_unused_ignores = true
warn_return_any = true
[[tool.mypy.overrides]]
module = ["influxdb"]
ignore_missing_imports = true
[tool.coverage.run]
source = ["mapping_provider"]
omit = [
......
......@@ -4,6 +4,7 @@ requests
jsonschema
sentry_sdk
pika
influxdb
httpx # required for fastapi TestClient
pytest
......
......@@ -16,7 +16,8 @@ setup(
"requests",
"jsonschema",
"sentry_sdk",
"pika"
"pika",
"influxdb"
],
long_description=open("README.md", encoding="utf-8").read(),
long_description_content_type="text/markdown",
......
......@@ -7,7 +7,7 @@ import pytest
from fastapi.testclient import TestClient
from mapping_provider import create_app
from mapping_provider.backends import cache, correlator, inventory
from mapping_provider.backends import brian, cache, correlator, inventory
from .common import load_test_data
......@@ -31,7 +31,14 @@ def dummy_config():
# 'username': 'guest',
# 'password': 'guest',
# 'vhost': '/'
# }
# },
'brian': {
'hostname': 'bogus hostname',
'username': 'bogus username',
'password': 'bogus password',
'database': 'bogus database name',
'measurement': 'bogus measurement'
}
}
......@@ -55,6 +62,7 @@ def client(dummy_config_filename):
cache.set(inventory.REPORTING_SCID_CURRENT_CACHE_FILENAME, load_test_data('scid-current.json'))
cache.set(inventory.INPROV_POLLER_INTERFACES_CACHE_FILENAME, load_test_data('poller-interfaces.json'))
cache.set(correlator.CACHED_CORRELATOR_STATE_FILENAME, load_test_data('correlator-state.json'))
cache.set(brian.CACHED_BRIAN_SCID_RATES_FILENAME, load_test_data('brian-scid-rates.json'))
with patch('sentry_sdk.init') as _mock_sentry_init:
yield TestClient(create_app())
......
This diff is collapsed.
This diff is collapsed.
......@@ -19,8 +19,8 @@ def test_get_sites(client):
rv = client.get("/map/sites")
assert rv.status_code == 200
assert rv.json()
SiteList.model_validate(rv.json())
site_list = SiteList.model_validate(rv.json())
assert site_list.sites, 'test data should not be empty'
@responses.activate
......@@ -34,8 +34,8 @@ def test_get_routers(client):
rv = client.get("/map/routers")
assert rv.status_code == 200
assert rv.json()
RouterList.model_validate(rv.json())
router_list = RouterList.model_validate(rv.json())
assert router_list.routers, 'test data should not be empty'
@responses.activate
def test_get_trunks(client):
......@@ -48,5 +48,5 @@ def test_get_trunks(client):
rv = client.get("/map/trunks")
assert rv.status_code == 200
assert rv.json()
ServiceList.model_validate(rv.json())
service_list = ServiceList.model_validate(rv.json())
assert service_list.services, 'test data should not be empty'
import tempfile
from unittest.mock import MagicMock, patch
import jsonschema
from mapping_provider.backends import brian, cache
from mapping_provider.config import InfluxConnectionParams
from .common import load_test_data
def test_utilization():
with tempfile.TemporaryDirectory() as tmpdir:
cache.init(tmpdir)
with patch('mapping_provider.backends.brian.InfluxDBClient') as mocked_influx:
# with patch('influxdb.InfluxDBClient') as mocked_influx_client:
mocked_client_instance = MagicMock()
mocked_influx.return_value = mocked_client_instance
mocked_query = MagicMock()
mocked_query.return_value.raw = load_test_data('influx-scid-rates-query-result.json')
mocked_client_instance.query = mocked_query
brian.load_scid_rates(InfluxConnectionParams(
hostname='bogus hostname',
username='bogus username',
password='bogus password',
database='bogus database name',
measurement='bogus measurement'))
cached_scid_rates = cache.get(brian.CACHED_BRIAN_SCID_RATES_FILENAME)
assert cached_scid_rates, "test data is not empty"
jsonschema.validate(cached_scid_rates, brian.CACHED_SCID_RATES_SCHEMA)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment