Skip to content
Snippets Groups Projects
Commit bbecc8ef authored by Erik Reid's avatar Erik Reid
Browse files

initial background tasks for correlator & inventory downloads

parent 1a6ede18
Branches
Tags
No related merge requests found
""" """
Default entry point for the FastAPI application. Default entry point for the FastAPI application.
""" """
from functools import partial
import logging
import contextlib
import tempfile
import threading import threading
from fastapi import FastAPI from fastapi import FastAPI
from mapping_provider import config, environment, correlator from mapping_provider import config, environment, correlator, services
from mapping_provider.api import common, map from mapping_provider.api import common, map
logger = logging.getLogger(__name__)
@contextlib.asynccontextmanager
async def lifespan(app: FastAPI, app_config: config.Configuration):
if not app_config.rmq:
return # don't start any background tasks
with tempfile.TemporaryDirectory() as tmp_working_dir:
logger.debug(f"using temporary working directory: {tmp_working_dir}")
stop_event = threading.Event()
_correlator_thread = threading.Thread(
target=correlator.consume_status,
daemon=True,
kwargs={
'correlator_state_broadcast_exchange_name': app_config.correlator_exchange,
'rmq_params': app_config.rmq,
'working_dirname': tmp_working_dir,
'stop_event': stop_event,
})
_correlator_thread.start()
_inventory_thread = threading.Thread(
target=services.worker_proc,
daemon=True,
kwargs={
'inventory_base_uri': app_config.inventory,
'refresh_frequency_seconds': 300,
'working_dirname': tmp_working_dir,
'stop_event': stop_event,
})
_inventory_thread.start()
yield # wait here until the app is shutting down
stop_event.set()
_correlator_thread.join(timeout=10)
_inventory_thread.join(timeout=10)
logger.info("background threads joined")
def create_app() -> FastAPI: def create_app() -> FastAPI:
""" """
...@@ -22,18 +69,9 @@ def create_app() -> FastAPI: ...@@ -22,18 +69,9 @@ def create_app() -> FastAPI:
app = FastAPI( app = FastAPI(
title="Mapping provider", title="Mapping provider",
description="Mapping provider endpoints for GÉANT maps", description="Mapping provider endpoints for GÉANT maps",
lifespan=partial(lifespan, app_config=app_config),
) )
app.include_router(common.router) app.include_router(common.router)
app.include_router(map.router, prefix='/map') app.include_router(map.router, prefix='/map')
if app_config.rmq:
t = threading.Thread(
target=correlator.consume_status,
daemon=True,
kwargs={
'correlator_state_broadcast_exchange_name': app_config.correlator_exchange,
'rmq_params': app_config.rmq
})
t.start()
return app return app
from functools import partial
import json
import logging import logging
import os import os
import socket import socket
...@@ -12,14 +14,114 @@ from mapping_provider.rmq.exchange import RabbitMQClient, default_rmq_connection ...@@ -12,14 +14,114 @@ from mapping_provider.rmq.exchange import RabbitMQClient, default_rmq_connection
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
ALARM_STATE_MESSAGE_SCHEMA = {
"$schema": "https://json-schema.org/draft/2020-12/schema",
"definitions": {
"list-of-strings": {
# shorthand for a repeated type ...
"type": "array",
"items": {"type": "string"},
},
"location": {
"type": "object",
"properties": {
"site": {"type": "string"},
"equipment": {"type": "string"}
},
},
"severity": {
"type": "string",
# TODO: gen from AlarmSeverity
"enum": ["HIDE", "WARNING", "MINOR", "MAJOR", "CRITICAL"],
},
"endpoint": {
"type": "object",
"properties": {
"uuid": {"type": "string"},
"type": {"type": "string"},
"name": {"type": "string"},
"projects": {"$ref": "#/definitions/list-of-strings"},
"alarm": {"type": "string"},
"init_time": {"type": "number"},
"last_activity_ts": {"type": "number"},
"services": {"type": "array", "items": {"type": "object"}},
"locations": {
"type": "array",
"items": {"$ref": "#/definitions/location"},
},
# not needed - sanity checks only
"event_history": {"$ref": "#/definitions/list-of-strings"},
"up": {"type": "boolean"},
# skipping validation of type-specific params
# such as peer, router, interfaces, snmp, asn, etc
},
"required": [
"uuid",
"type",
"name",
"alarm",
"projects",
"init_time",
"last_activity_ts",
"locations",
"event_history",
"up",
],
},
"alarm": {
"type": "object",
"properties": {
"uuid": {"type": "string"},
"db_id": {"type": "integer"},
"phase": {
"type": "string",
"enum": ["PENDING", "FINALIZED", "KILL_ME"],
},
"final_severity": {
# this field is null until phase is FINALIZED
"oneOf": [{"$ref": "#/definitions/severity"}, {"type": "null"}]
},
"severity": {"$ref": "#/definitions/severity"},
"state": {
"type": "string",
"enum": ["OPEN", "CLOSED"], # TODO: gen from AlarmState
},
# note: list-of-strings is in _ENDPOINT_SCHEMA_DEFS
"endpoints": {"$ref": "#/definitions/list-of-strings"},
"description": {"type": "string"},
"devoured": {"type": "array", "items": {"type": "integer"}},
},
"required": [
"uuid",
"db_id",
"phase",
"severity",
"final_severity",
"state",
"endpoints",
"description",
],
},
},
"type": "object",
"properties": {
"alarms": {"type": "array", "items": {"$ref": "#/definitions/alarm"}},
"endpoints": {"type": "array", "items": {"$ref": "#/definitions/endpoint"}},
},
"required": ["alarms", "endpoints"],
}
def handle_correlator_state_broadcast(message_bytes: bytes):
logger.error(f'got correlator message: {message_bytes}') def handle_correlator_state_broadcast(message: dict, working_dirname: str):
with open(os.path.join(working_dirname, 'correlator-state.json'), 'w') as f:
f.write(json.dumps(message))
def consume_status( def consume_status(
correlator_state_broadcast_exchange_name: str, correlator_state_broadcast_exchange_name: str,
rmq_params: 'RMQConnectionParams'): rmq_params: 'RMQConnectionParams',
working_dirname: str,
stop_event: threading.Event | None = None):
""" """
Consume the correlator state broadcast exchange forever. Consume the correlator state broadcast exchange forever.
When a message is received, update the global availability state cache. When a message is received, update the global availability state cache.
...@@ -46,6 +148,10 @@ def consume_status( ...@@ -46,6 +148,10 @@ def consume_status(
exchange_type="fanout", exchange_type="fanout",
queue_name=queue_name, queue_name=queue_name,
exclusive=True, exclusive=True,
reconnect_on_idle_timeout=120) reconnect_on_idle_timeout=120,
stop_event=stop_event)
client.consume_forever(handle_correlator_state_broadcast) client.consume_forever(
callback=partial(handle_correlator_state_broadcast, working_dirname=working_dirname),
json=True,
schema=ALARM_STATE_MESSAGE_SCHEMA)
import json
import logging
import os
import requests
import time
from threading import Event
logger = logging.getLogger(__name__)
def _load_and_process_services(inventory_base_uri: str, working_dirname: str):
"""
Load the services from the inventory and process them.
"""
# TODO: proper error handling
# TODO: actually process the services
rv = requests.get(f'{inventory_base_uri}/poller/interfaces')
rv.raise_for_status()
with open(os.path.join(working_dirname, 'poller-interfaces.json'), 'w+') as f:
f.write(json.dumps(rv.json()))
def worker_proc(
inventory_base_uri: str,
refresh_frequency_seconds: int,
working_dirname: str,
stop_event: Event | None = None):
def _should_stop():
return stop_event.is_set() if stop_event else False
while not _should_stop():
_load_and_process_services(inventory_base_uri, working_dirname)
# wait and the restart the loop
if stop_event:
stop_event.wait(timeout=refresh_frequency_seconds)
else:
time.sleep(refresh_frequency_seconds)
import json
import os
DATA_DIR = os.path.join(os.path.dirname(__file__), 'data')
def load_test_data(filename: str) -> dict:
with open(os.path.join(DATA_DIR, filename)) as f:
return json.load(f)
\ No newline at end of file
This diff is collapsed.
This diff is collapsed.
import json
import os
import re
import tempfile
import responses
from mapping_provider.correlator import handle_correlator_state_broadcast
from .common import load_test_data
def test_handle_correlator_state_broadcast():
"""
tmp bogus test - just to have something
"""
with tempfile.TemporaryDirectory() as tmp_dir:
handle_correlator_state_broadcast(
message={'alarms': [], 'endpoints': []},
working_dirname=tmp_dir)
assert os.path.exists(os.path.join(tmp_dir, 'correlator-state.json'))
with open(os.path.join(tmp_dir, 'correlator-state.json')) as f:
assert json.load(f) == {'alarms': [], 'endpoints': []}
...@@ -6,30 +6,16 @@ import responses ...@@ -6,30 +6,16 @@ import responses
from mapping_provider.api.map import RouterList, ServiceList, SiteList from mapping_provider.api.map import RouterList, ServiceList, SiteList
DATA_DIR = os.path.join(os.path.dirname(__file__), 'data') from .common import load_test_data
def _load_test_data(filename: str) -> dict:
with open(os.path.join(DATA_DIR, filename)) as f:
return json.load(f)
# @responses.activate
# def test_inventory_uri_validation():
# responses.add(
# method=responses.GET, url=re.compile(r".*/version$"), json={"api": "0.9"}
# )
# assert (
# classifier.verify_inventory_provider_uri(None, None, "http://a.b.c:9999")
# == "http://a.b.c:9999/"
# )
@responses.activate @responses.activate
def test_get_sites(client): def test_get_sites(client):
responses.add( responses.add(
method=responses.GET, method=responses.GET,
url=re.compile(r'.*/map/sites$'), url=re.compile(r'.*/map/sites$'),
json=_load_test_data('inprov-sites.json') json=load_test_data('inprov-sites.json')
) )
rv = client.get("/map/sites") rv = client.get("/map/sites")
...@@ -44,7 +30,7 @@ def test_get_routers(client): ...@@ -44,7 +30,7 @@ def test_get_routers(client):
responses.add( responses.add(
method=responses.GET, method=responses.GET,
url=re.compile(r'.*/map/routers$'), url=re.compile(r'.*/map/routers$'),
json=_load_test_data('inprov-routers.json') json=load_test_data('inprov-routers.json')
) )
rv = client.get("/map/routers") rv = client.get("/map/routers")
...@@ -58,7 +44,7 @@ def test_get_trunks(client): ...@@ -58,7 +44,7 @@ def test_get_trunks(client):
responses.add( responses.add(
method=responses.GET, method=responses.GET,
url=re.compile(r'.*/map/services.*'), url=re.compile(r'.*/map/services.*'),
json=_load_test_data('inprov-services.json') json=load_test_data('inprov-services.json')
) )
rv = client.get("/map/trunks") rv = client.get("/map/trunks")
......
import json
import os
import re
import tempfile
import responses
from mapping_provider.services import _load_and_process_services
from .common import load_test_data
@responses.activate
def test_inventory_service_download():
"""
tmp bogus test - just to have something
"""
responses.add(
method=responses.GET,
url=re.compile(r'.*/poller/interfaces$'),
json=load_test_data('poller-interfaces.json')
)
with tempfile.TemporaryDirectory() as tmp_dir:
_load_and_process_services(
inventory_base_uri='https://dummy-hostname.dummy.domain',
working_dirname=tmp_dir)
assert os.path.exists(os.path.join(tmp_dir, 'poller-interfaces.json'))
with open(os.path.join(tmp_dir, 'poller-interfaces.json')) as f:
assert json.load(f) == load_test_data('poller-interfaces.json')
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment