Skip to content
Snippets Groups Projects
Commit f1eed84a authored by Erik Reid's avatar Erik Reid
Browse files

Finished feature NGM-6-availability-proc.

parents 898317c7 a2151df7
Branches
Tags
No related merge requests found
Showing
with 129476 additions and 64 deletions
...@@ -58,7 +58,11 @@ html_static_path = ['_static'] ...@@ -58,7 +58,11 @@ html_static_path = ['_static']
# start the server and dump the schema # start the server and dump the schema
with tempfile.NamedTemporaryFile(delete=False) as f: with tempfile.NamedTemporaryFile(delete=False) as f:
bogus_config = {'inventory': 'http://bogus'} bogus_config = {
'inventory': 'http://bogus',
'reporting': 'https://bogus',
'correlator_exchange': 'bogus',
}
with open(f.name, 'w') as f: with open(f.name, 'w') as f:
json.dump(bogus_config, f) json.dump(bogus_config, f)
f.flush() f.flush()
......
""" """
Default entry point for the FastAPI application. Default entry point for the FastAPI application.
""" """
import contextlib
import logging
import tempfile
import threading
from collections.abc import AsyncIterator
from functools import partial
from fastapi import FastAPI from fastapi import FastAPI
from mapping_provider import config, environment from mapping_provider import config, environment
from mapping_provider.api import common, map from mapping_provider.api import common, map
from mapping_provider.backends import cache, correlator, inventory
logger = logging.getLogger(__name__)
@contextlib.asynccontextmanager
async def lifespan(app: FastAPI, app_config: config.Configuration) -> AsyncIterator[None]:
if not app_config.rmq:
return # don't start any background tasks
with tempfile.TemporaryDirectory() as _tmp_working_dir:
cache.init(_tmp_working_dir)
stop_event = threading.Event()
_correlator_thread = threading.Thread(
target=correlator.consume_status,
daemon=True,
kwargs={
'correlator_state_broadcast_exchange_name': app_config.correlator_exchange,
'rmq_params': app_config.rmq,
'stop_event': stop_event,
})
_correlator_thread.start()
_inventory_thread = threading.Thread(
target=inventory.worker_proc,
daemon=True,
kwargs={
'inventory_base_uri': app_config.inventory,
'reporting_base_uri': app_config.reporting,
'refresh_frequency_seconds': 300,
'stop_event': stop_event,
})
_inventory_thread.start()
yield None # wait here until the app is shutting down (mypy needs a value to be yielded)
stop_event.set()
_correlator_thread.join(timeout=10)
_inventory_thread.join(timeout=10)
logger.info("background threads joined")
def create_app() -> FastAPI: def create_app() -> FastAPI:
...@@ -21,7 +70,9 @@ def create_app() -> FastAPI: ...@@ -21,7 +70,9 @@ def create_app() -> FastAPI:
app = FastAPI( app = FastAPI(
title="Mapping provider", title="Mapping provider",
description="Mapping provider endpoints for GÉANT maps", description="Mapping provider endpoints for GÉANT maps",
lifespan=partial(lifespan, app_config=app_config),
) )
app.include_router(common.router) app.include_router(common.router)
app.include_router(map.router, prefix='/map') app.include_router(map.router, prefix='/map')
return app return app
...@@ -6,10 +6,12 @@ from fastapi import APIRouter ...@@ -6,10 +6,12 @@ from fastapi import APIRouter
from pydantic import BaseModel from pydantic import BaseModel
from mapping_provider import config from mapping_provider import config
from mapping_provider.backends import services
router = APIRouter() router = APIRouter()
class Site(BaseModel): class Site(BaseModel):
latitude: float latitude: float
longitude: float longitude: float
...@@ -41,47 +43,6 @@ class RouterList(BaseModel): ...@@ -41,47 +43,6 @@ class RouterList(BaseModel):
routers: list[Router] routers: list[Router]
class Endpoint(BaseModel):
hostname: str
interface: str
@classmethod
def from_inprov_endpoint(cls, endpoint: dict[str, Any]) -> 'Endpoint':
return cls(
hostname = endpoint['hostname'],
interface = endpoint['interface'],
)
class Overlays(BaseModel):
speed: int
@classmethod
def from_inprov_overlays(cls, overlays: dict[str, Any]) -> 'Overlays':
return cls(
speed = overlays['speed'],
)
class Service(BaseModel):
sid: str
name: str
type: str
endpoints: list[Endpoint]
overlays: Overlays
@classmethod
def from_inprov_service(cls, service: dict[str, Any]) -> 'Service':
return cls(
sid = service['sid'],
name = service['name'],
type = service['type'],
endpoints = list(map(Endpoint.from_inprov_endpoint, service['endpoints'])),
overlays = Overlays.from_inprov_overlays(service['overlays']),
)
class ServiceList(BaseModel):
services: list[Service]
INPROV_SITE_LIST_SCHEMA = { INPROV_SITE_LIST_SCHEMA = {
'$schema': 'https://json-schema.org/draft/2020-12/schema', '$schema': 'https://json-schema.org/draft/2020-12/schema',
...@@ -198,21 +159,8 @@ def get_routers() -> RouterList: ...@@ -198,21 +159,8 @@ def get_routers() -> RouterList:
@router.get("/trunks") @router.get("/trunks")
def get_trunks() -> ServiceList: def get_trunks() -> services.ServiceList:
""" """
handler for /trunks handler for /trunks
""" """
return services.build_service_info_list(service_type='IP TRUNK')
# TODO: catch/handle the usual exceptions
app_params = config.load()
rv = requests.get(
f'{app_params.inventory}/map/services/IP TRUNK',
headers={'Accept': 'application/json'})
rv.raise_for_status()
service_list_json = rv.json()
jsonschema.validate(service_list_json, INPROV_SERVICE_LIST_SCHEMA)
rsp_services = map(Service.from_inprov_service, service_list_json)
return ServiceList(services=list(rsp_services))
"""
Trivial caching module for the mapping provider.
The global variable `cache_dir` is set during FastAPI server startup.
Could be replaced with a proper caching module in the future, or maybe
just a simple dict.
"""
import json
import logging
import os
from typing import Any
logger = logging.getLogger(__name__)
_cache_dir: str | None = None
def init(cache_dir: str) -> None:
global _cache_dir
assert _cache_dir is None, "cache_dir has already been initialized"
_cache_dir = cache_dir
logger.debug(f"set cache directory: {_cache_dir}")
def set(filename: str, data: dict[str, Any]) -> None:
"""
data must be a JSON-serializable dict.
"""
assert _cache_dir is not None, "cache_dir hasn't been initialized"
with open(os.path.join(_cache_dir, filename), 'w') as f:
f.write(json.dumps(data))
logger.debug(f"wrote cached data: {filename}")
def get(filename: str) -> Any:
"""
Loads the cached data, parses it as json & returns the object.
"""
assert _cache_dir is not None, "cache_dir hasn't been initialized"
with open(os.path.join(_cache_dir, filename)) as f:
logger.debug(f"reading cached data: {filename}")
return json.load(f)
import logging
import os
import socket
import threading
import time
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from mapping_provider.config import RMQConnectionParams
from pika.exchange_type import ExchangeType
from . import cache
from .rmq.exchange import RabbitMQClient, default_rmq_connection_params
logger = logging.getLogger(__name__)
CACHED_CORRELATOR_STATE_FILENAME = 'correlator-state.json'
ALARM_STATE_MESSAGE_SCHEMA = {
"$schema": "https://json-schema.org/draft/2020-12/schema",
"definitions": {
"list-of-strings": {
# shorthand for a repeated type ...
"type": "array",
"items": {"type": "string"},
},
"location": {
"type": "object",
"properties": {
"site": {"type": "string"},
"equipment": {"type": "string"}
},
},
"severity": {
"type": "string",
# TODO: gen from AlarmSeverity
"enum": ["HIDE", "WARNING", "MINOR", "MAJOR", "CRITICAL"],
},
"endpoint": {
"type": "object",
"properties": {
"uuid": {"type": "string"},
"type": {"type": "string"},
"name": {"type": "string"},
"projects": {"$ref": "#/definitions/list-of-strings"},
"alarm": {"type": "string"},
"init_time": {"type": "number"},
"last_activity_ts": {"type": "number"},
"services": {"type": "array", "items": {"type": "object"}},
"locations": {
"type": "array",
"items": {"$ref": "#/definitions/location"},
},
# not needed - sanity checks only
"event_history": {"$ref": "#/definitions/list-of-strings"},
"up": {"type": "boolean"},
# skipping validation of type-specific params
# such as peer, router, interfaces, snmp, asn, etc
},
"required": [
"uuid",
"type",
"name",
"alarm",
"projects",
"init_time",
"last_activity_ts",
"locations",
"event_history",
"up",
],
},
"alarm": {
"type": "object",
"properties": {
"uuid": {"type": "string"},
"db_id": {"type": "integer"},
"phase": {
"type": "string",
"enum": ["PENDING", "FINALIZED", "KILL_ME"],
},
"final_severity": {
# this field is null until phase is FINALIZED
"oneOf": [{"$ref": "#/definitions/severity"}, {"type": "null"}]
},
"severity": {"$ref": "#/definitions/severity"},
"state": {
"type": "string",
"enum": ["OPEN", "CLOSED"], # TODO: gen from AlarmState
},
# note: list-of-strings is in _ENDPOINT_SCHEMA_DEFS
"endpoints": {"$ref": "#/definitions/list-of-strings"},
"description": {"type": "string"},
"devoured": {"type": "array", "items": {"type": "integer"}},
},
"required": [
"uuid",
"db_id",
"phase",
"severity",
"final_severity",
"state",
"endpoints",
"description",
],
},
},
"type": "object",
"properties": {
"alarms": {"type": "array", "items": {"$ref": "#/definitions/alarm"}},
"endpoints": {"type": "array", "items": {"$ref": "#/definitions/endpoint"}},
},
"required": ["alarms", "endpoints"],
}
def handle_correlator_state_broadcast(message: dict[str, Any]) -> None:
cache.set(CACHED_CORRELATOR_STATE_FILENAME, message)
def consume_status(
correlator_state_broadcast_exchange_name: str,
rmq_params: 'RMQConnectionParams',
stop_event: threading.Event | None = None) -> None:
"""
Consume the correlator state broadcast exchange forever.
When a message is received, update the global availability state cache.
:param correlator_state_broadcast_exchange_name:
The name of the correlator state broadcast exchange.
:param rmq_params: RabbitMQ connection parameters.
"""
cp = default_rmq_connection_params(
hostname=rmq_params.brokers,
username=rmq_params.username,
password=rmq_params.password,
vhost=rmq_params.vhost)
queue_name = (
f"mapping-provider-{socket.getfqdn()}"
f"-{os.getpid()}.{threading.get_ident()}.{time.time()}"
)
client = RabbitMQClient(
connection_params=cp,
exchange_name=correlator_state_broadcast_exchange_name,
exchange_type=ExchangeType.fanout,
queue_name=queue_name,
exclusive=True,
reconnect_on_idle_timeout=120,
stop_event=stop_event)
client.consume_forever(
callback=handle_correlator_state_broadcast,
json=True,
schema=ALARM_STATE_MESSAGE_SCHEMA)
import concurrent.futures
import logging
import time
from threading import Event
from typing import Any
import requests
from . import cache
logger = logging.getLogger(__name__)
INPROV_POLLER_INTERFACES_CACHE_FILENAME = 'inprov-poller-interfaces.json'
REPORTING_SCID_CURRENT_CACHE_FILENAME = 'reporting-scid-current.json'
INPROV_MAP_SERVICES_CACHE_FILENAME = 'inprov-map-services.json'
def _load_and_cache_json(
key: str,
url: str,
cache_filename: str) -> dict[str, Any]:
"""
Load the JSON from the URL, return and cache it.
:param key: key to return with the loaded data for distinguishing futures
:param url: URL to load the data from
:param cache_filename: filename to cache the data under
"""
# TODO: proper error handling
# TODO: maybe try to return cached data if http fails?
rv = requests.get(url)
rv.raise_for_status()
rsp_object = rv.json()
cache.set(cache_filename, rsp_object)
return {
'key': key,
'value': rsp_object
}
def _load_all_inventory(inventory_base_uri: str, reporting_base_uri: str) -> dict[str, Any]:
"""
Load and process service info from inventory+reporting provider.
:param inventory_base_uri: base URI of the inventory provider
:param reporting_base_uri: base URI of the reporting provider
:return: list of services
"""
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
futures = [
executor.submit(
_load_and_cache_json,
key='poller-interfaces',
url=f'{inventory_base_uri}/poller/interfaces',
cache_filename=INPROV_POLLER_INTERFACES_CACHE_FILENAME),
executor.submit(
_load_and_cache_json,
key='map-services',
url=f'{inventory_base_uri}/map/services',
cache_filename=INPROV_MAP_SERVICES_CACHE_FILENAME),
executor.submit(
_load_and_cache_json,
key='scid-current',
url=f'{reporting_base_uri}/scid/current',
cache_filename=REPORTING_SCID_CURRENT_CACHE_FILENAME),
]
responses = {}
for _f in concurrent.futures.as_completed(futures):
responses[_f.result()['key']] = _f.result()['value']
return responses
def worker_proc(
inventory_base_uri: str,
reporting_base_uri: str,
refresh_frequency_seconds: int,
stop_event: Event | None = None) -> None:
def _should_stop() -> bool:
return stop_event.is_set() if stop_event else False
while not _should_stop():
_load_all_inventory(inventory_base_uri, reporting_base_uri)
# wait and the restart the loop
if stop_event:
stop_event.wait(timeout=refresh_frequency_seconds)
else:
time.sleep(refresh_frequency_seconds)
This diff is collapsed.
"""
copied from: dashboard-v3-python v0.226, dashboard.messaging.queue
updated typehints to satisfy mypy, and linted with changes suggested by ruff
"""
import logging
import os
from collections.abc import Sequence
from typing import Any
from pika.adapters.blocking_connection import BlockingChannel, BlockingConnection
from pika.exceptions import ChannelClosedByBroker
from pika.exchange_type import ExchangeType
DASHBOARD_USE_QUORUM_QUEUE = os.getenv(
"DASHBOARD_USE_QUORUM_QUEUE", ""
).lower() not in {"0", "", "false"}
logger = logging.getLogger(__name__)
def loose_queue_declare(
channel: BlockingChannel,
queue: str = "",
exclusive: bool = False,
single_active_consumer: bool = False,
force_quorum_queue: bool = False,
) -> str | None:
"""declare a queue (either classic or quorum, depending on the USE_QUORUM_QUEUE
global variable and if it is a ``dashboard.*`` queue). In case of a failure that
the queue already exists as a different type, this function fails silently.
:param channel: the channel to use
:param queue: the queue name. If empty string, the broker will create a unique queue
name (default: '')
:param kwargs: additional parameters to pass to ``channel.queue_declare``
"""
durable = False
arguments: dict[str, Any] = {}
if force_quorum_queue or (
DASHBOARD_USE_QUORUM_QUEUE and queue.startswith("dashboard.")
):
durable = True
arguments["x-queue-type"] = "quorum"
if single_active_consumer:
arguments["x-single-active-consumer"] = True
try:
result = channel.queue_declare(
queue, durable=durable, exclusive=exclusive, arguments=arguments or None
)
assert isinstance(result.method.queue, str) # for mypy
return result.method.queue
except ChannelClosedByBroker as e:
if e.reply_code == 406: # PRECONDITION_FAILED due to incompatible queue type
requested_type, existing_type = "classic", "quorum"
if DASHBOARD_USE_QUORUM_QUEUE:
requested_type, existing_type = existing_type, requested_type
logger.warning(
f"Trying to declare {requested_type} queue '{queue}'"
f" but queue already exists as {existing_type} queue"
)
return None
raise
def setup_channel(
connection: BlockingConnection,
exchange_name: str,
exchange_type: ExchangeType = ExchangeType.fanout,
queue_name: str | None = None,
queue_declare: bool = True,
exclusive: bool = False,
single_active_consumer: bool = False,
routing_keys: Sequence[str] = [],
prefetch_count: int | None = None,
force_quorum_queue: bool = False,
) -> tuple[BlockingChannel, str | None]:
"""Setup a channel and declare the exchange and optionally the queue.
:param connection: A ``pika`` ``BlockingConnection``
:param exchange_name: the exchange to declare
:param exchange_type: the exchange type (default: ``fanout``)
:param queue_name: The queue to bind to, if given. Can be set to empty string to
get a temporary queue from the broker. When queue_name=None, the channel can
only be used for publishing (default: None)
:param queue_declare: Whether to declare the queue before binding (default: True)
:param exclusive: Whether this should be declared as an exclusive queue (default:
False)
:param routing_keys: Optional routing keys to bind to the queue.
:returns: A tuple (channel: ``BlockingChannel``, queue_name: Optional[str])
"""
channel = connection.channel()
if prefetch_count is not None:
channel.basic_qos(prefetch_count=prefetch_count)
if exchange_name != "": # cannot declare default exchange
channel.exchange_declare(exchange=exchange_name, exchange_type=exchange_type)
if queue_name is None:
return channel, None
if queue_name == "" and not queue_declare:
raise ValueError("must set queue_declare=True when supplying empty queue name")
if queue_name == "" and not exclusive:
raise ValueError("must set exclusive=True for anonymous queues")
if exclusive and single_active_consumer:
raise ValueError("Exclusive queues cannot have single active consumer")
if queue_declare:
if exclusive:
# Short circuit exclusive queues that are never quorum and never
# pre-exist
result = channel.queue_declare(queue_name, exclusive=True)
queue_name = result.method.queue
else:
queue_name = loose_queue_declare(
channel,
queue_name,
exclusive=exclusive,
single_active_consumer=single_active_consumer,
force_quorum_queue=force_quorum_queue,
)
# if a queue declare fails, the channel is in an unusable state.
# Start over but skip the declare
if queue_name is None:
return setup_channel(
connection=connection,
exchange_name=exchange_name,
exchange_type=exchange_type,
queue_name=queue_name,
queue_declare=False,
exclusive=exclusive,
routing_keys=routing_keys,
prefetch_count=prefetch_count,
single_active_consumer=single_active_consumer,
force_quorum_queue=force_quorum_queue,
)
assert queue_name, "queue name must not be empty here"
if not routing_keys:
# in case no routing keys are provided (as for fanout exchanges),
# ensure the queue is still bound to the exchange
channel.queue_bind(exchange=exchange_name, queue=queue_name)
for rk in routing_keys:
channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key=rk)
return channel, queue_name
import logging
from collections.abc import Generator
from pydantic import BaseModel
from . import cache, correlator, inventory
logger = logging.getLogger(__name__)
class Endpoint(BaseModel):
equipment: str
interface: str
class Overlays(BaseModel):
speed: int
up: bool
class Service(BaseModel):
sid: str
scid: str
name: str
type: str
endpoints: list[Endpoint]
overlays: Overlays
class ServiceList(BaseModel):
services: list[Service]
def _services(service_type: str | None = None) -> Generator[Service]:
"""
load the cached backend data and yield map service records
only return operational services that match the service type, if provided
"""
# TOOD: maybe remove loading of inventory/poller/interfaces & /map/services
try:
# inprov_map_services = cache.get(inventory.INPROV_MAP_SERVICES_CACHE_FILENAME)
scid_current = cache.get(inventory.REPORTING_SCID_CURRENT_CACHE_FILENAME)
# poller_interfaces = cache.get(inventory.INPROV_POLLER_INTERFACES_CACHE_FILENAME)
correlator_state = cache.get(correlator.CACHED_CORRELATOR_STATE_FILENAME)
except FileNotFoundError:
logger.exception('not enough data available to build the service list')
return
def _get_down_correlator_services() -> Generator[str]:
for _e in correlator_state['endpoints']:
if _e['up']:
continue
for _s in _e['services']:
if 'sid' in _s:
yield _s['sid']
down_service_sids = set(_get_down_correlator_services())
for _s in scid_current:
if _s['status'] != 'operational':
continue
if service_type and _s['service_type'] != service_type:
continue
overlays = Overlays(
speed = _s['speed'],
up = _s['sid'] not in down_service_sids,
)
endpoints = []
for _e in _s['endpoints']:
equipment = _e['hostname'] if 'hostname' in _e else _e['equipment']
interface = _e['interface'] if 'interface' in _e else _e['port']
endpoints.append(Endpoint(equipment=equipment, interface=interface))
yield Service(
sid = _s['sid'],
scid = _s['scid'],
name = _s['name'],
type = _s['service_type'],
endpoints = endpoints,
overlays = overlays,
)
def build_service_info_list(service_type: str | None = None) -> ServiceList:
"""
return a list of mappable info about all operational services
"""
return ServiceList(services=list(_services(service_type)))
...@@ -3,6 +3,13 @@ import os ...@@ -3,6 +3,13 @@ import os
from pydantic import BaseModel, Field, HttpUrl from pydantic import BaseModel, Field, HttpUrl
class RMQConnectionParams(BaseModel):
brokers: list[str]
username: str
password: str
vhost: str
class SentryConfig(BaseModel): class SentryConfig(BaseModel):
dsn: str dsn: str
environment: str environment: str
...@@ -20,6 +27,9 @@ class SentryConfig(BaseModel): ...@@ -20,6 +27,9 @@ class SentryConfig(BaseModel):
class Configuration(BaseModel): class Configuration(BaseModel):
sentry: SentryConfig | None = None sentry: SentryConfig | None = None
inventory: HttpUrl inventory: HttpUrl
reporting: HttpUrl
rmq: RMQConnectionParams | None = None
correlator_exchange: str = 'dashboard.alarms.broadcast'
def load() -> Configuration: def load() -> Configuration:
......
...@@ -10,3 +10,10 @@ python_version = "3.13" ...@@ -10,3 +10,10 @@ python_version = "3.13"
strict = true strict = true
warn_unused_ignores = true warn_unused_ignores = true
warn_return_any = true warn_return_any = true
[tool.coverage.run]
source = ["mapping_provider"]
omit = [
"mapping_provider/backends/rmq/*",
"test/*"
]
\ No newline at end of file
...@@ -3,6 +3,7 @@ uvicorn[standard] ...@@ -3,6 +3,7 @@ uvicorn[standard]
requests requests
jsonschema jsonschema
sentry_sdk sentry_sdk
pika
httpx # required for fastapi TestClient httpx # required for fastapi TestClient
pytest pytest
......
...@@ -7,8 +7,7 @@ setup( ...@@ -7,8 +7,7 @@ setup(
author="GÉANT", author="GÉANT",
author_email="info@geant.org", author_email="info@geant.org",
license="MIT", license="MIT",
packages=find_packages(where="mapping_provider"), packages=find_packages(exclude=["test"]),
package_dir={"": "mapping_provider"},
include_package_data=True, include_package_data=True,
python_requires=">=3.10", python_requires=">=3.10",
install_requires=[ install_requires=[
...@@ -16,7 +15,8 @@ setup( ...@@ -16,7 +15,8 @@ setup(
"uvicorn[standard]", "uvicorn[standard]",
"requests", "requests",
"jsonschema", "jsonschema",
"sentry_sdk", "sentry_sdk",
"pika"
], ],
long_description=open("README.md", encoding="utf-8").read(), long_description=open("README.md", encoding="utf-8").read(),
long_description_content_type="text/markdown", long_description_content_type="text/markdown",
...@@ -26,4 +26,4 @@ setup( ...@@ -26,4 +26,4 @@ setup(
"License :: OSI Approved :: MIT License", "License :: OSI Approved :: MIT License",
"Operating System :: OS Independent", "Operating System :: OS Independent",
], ],
) )
\ No newline at end of file
import json
import os
DATA_DIR = os.path.join(os.path.dirname(__file__), 'data')
def load_test_data(filename: str) -> dict:
with open(os.path.join(DATA_DIR, filename)) as f:
return json.load(f)
\ No newline at end of file
...@@ -7,6 +7,9 @@ import pytest ...@@ -7,6 +7,9 @@ import pytest
from fastapi.testclient import TestClient from fastapi.testclient import TestClient
from mapping_provider import create_app from mapping_provider import create_app
from mapping_provider.backends import cache, correlator, inventory
from .common import load_test_data
@pytest.fixture @pytest.fixture
...@@ -16,7 +19,19 @@ def dummy_config(): ...@@ -16,7 +19,19 @@ def dummy_config():
'dsn': 'https://token@hostname.geant.org:1111/a/b', 'dsn': 'https://token@hostname.geant.org:1111/a/b',
'environment': 'unit tests' 'environment': 'unit tests'
}, },
'inventory': 'https://dummy-hostname.dummy.domain' 'inventory': 'https://inventory.bogus.domain',
'reporting': 'http://reporting.another-bogus.domain',
# no rmq param to skip correlator thread
# 'rmq': {
# 'brokers': [
# 'test-noc-alarms01.geant.org',
# 'test-noc-alarms02.geant.org',
# 'test-noc-alarms03.geant.org'
# ],
# 'username': 'guest',
# 'password': 'guest',
# 'vhost': '/'
# }
} }
...@@ -31,5 +46,24 @@ def dummy_config_filename(dummy_config): ...@@ -31,5 +46,24 @@ def dummy_config_filename(dummy_config):
@pytest.fixture @pytest.fixture
def client(dummy_config_filename): def client(dummy_config_filename):
os.environ['SETTINGS_FILENAME'] = dummy_config_filename os.environ['SETTINGS_FILENAME'] = dummy_config_filename
with patch('sentry_sdk.init') as _mock_sentry_init:
return TestClient(create_app()) with tempfile.TemporaryDirectory() as tmp_dir:
# there's no rmq in the test config data, so cache won't be initialized
cache.init(tmp_dir)
cache.set(inventory.INPROV_MAP_SERVICES_CACHE_FILENAME, load_test_data('inprov-services.json'))
cache.set(inventory.REPORTING_SCID_CURRENT_CACHE_FILENAME, load_test_data('scid-current.json'))
cache.set(inventory.INPROV_POLLER_INTERFACES_CACHE_FILENAME, load_test_data('poller-interfaces.json'))
cache.set(correlator.CACHED_CORRELATOR_STATE_FILENAME, load_test_data('correlator-state.json'))
with patch('sentry_sdk.init') as _mock_sentry_init:
yield TestClient(create_app())
@pytest.fixture(autouse=True)
def run_around_tests():
assert cache._cache_dir is None # test env sanity check
yield
# make sure cache is set to unused before the next test
cache._cache_dir = None
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment