Skip to content
Snippets Groups Projects
Commit 6c9432bc authored by Bjarke Madsen's avatar Bjarke Madsen
Browse files

Fix exception propagation from thread

parent 6fa55280
No related branches found
No related tags found
No related merge requests found
......@@ -4,9 +4,10 @@ import logging
import sys
import threading
import traceback
from multiprocessing import Queue
from datetime import datetime
from logging import LogRecord
from typing import Any, Collection, Dict, Iterable, List
from typing import Any, Collection, Dict, Iterable, List, Optional
import click
import jsonschema
......@@ -187,40 +188,48 @@ ALL_ = object()
def main(
exception_queue: Optional[Queue],
processor: RouterProcessor,
interfaces=ALL_,
output: OutputMethod = OutputMethod.INFLUX,
):
logger.info(
f"Processing {processor.name.capitalize()} router {processor.router_fqdn}"
)
try:
logger.info(
f"Processing {processor.name.capitalize()} router {processor.router_fqdn}"
)
timestamp = datetime.now()
timestamp = datetime.now()
for point_group in processor.supported_point_groups:
logger.info(f"Processing {str(point_group).upper()} points...")
for point_group in processor.supported_point_groups:
logger.info(f"Processing {str(point_group).upper()} points...")
inventory = processor.config.get("inventory")
inventory = processor.config.get("inventory")
check_interfaces = None
if inventory is not None:
check_interfaces = load_interfaces(
router_fqdn=processor.router_fqdn,
interfaces=interfaces,
check_interfaces = None
if inventory is not None:
check_interfaces = load_interfaces(
router_fqdn=processor.router_fqdn,
interfaces=interfaces,
point_group=point_group,
config=processor.config,
)
if not check_interfaces:
logger.info(f"No {str(point_group).upper()} interfaces found")
continue
process_router(
processor=processor,
point_group=point_group,
config=processor.config,
interfaces=check_interfaces,
timestamp=timestamp,
output=output,
)
if not check_interfaces:
logger.info(f"No {str(point_group).upper()} interfaces found")
continue
process_router(
processor=processor,
point_group=point_group,
interfaces=check_interfaces,
timestamp=timestamp,
output=output,
)
except Exception:
if not exception_queue:
raise # no queue means we don't run in a thread, so we can just raise the exception
exc_info = sys.exc_info()
formatted = ''.join(traceback.format_exception(*exc_info))
exception_queue.put(formatted)
def validate_config(_unused_ctx, _unused_param, file):
......@@ -296,14 +305,16 @@ def cli(
error_counter = setup_logging(debug=verbose)
try:
thread = threading.Thread(
target=main,
args=(processor, interfaces if interfaces else ALL_, OutputMethod.from_string(output.lower())),
)
thread.daemon = True
thread.start()
thread.join(timeout=120)
exception_queue = Queue()
thread = threading.Thread(
target=main,
args=(exception_queue, processor, interfaces if interfaces else ALL_, OutputMethod.from_string(output.lower())),
)
thread.daemon = True
thread.start()
thread.join(timeout=120)
if thread.is_alive() or not exception_queue.empty():
if thread.is_alive():
logger.error("Thread timed out")
frames = sys._current_frames()
......@@ -313,13 +324,11 @@ def cli(
for line in traceback.format_stack(frame):
logger.error(line.strip())
logger.error("-- Thread stack trace end --")
raise click.exceptions.Exit(2)
else:
logger.error(f"Error while processing {processor.name.capitalize()} router {router_fqdn}:")
exception = exception_queue.get()
logger.error(exception)
except Exception:
logger.exception(
f"Error while processing {processor.name.capitalize()} router {router_fqdn}"
)
# Exit code 2 indicates CRITICAL in Sensu
raise click.exceptions.Exit(2)
if error_counter.count:
......
......@@ -157,6 +157,7 @@ def test_main_for_all_juniper_routers(
write_points.side_effect = validate
cli.main(
None,
processor=JuniperRouterProcessor(juniper_router_fqdn, config),
interfaces=juniper_inventory[juniper_router_fqdn],
)
......@@ -187,6 +188,7 @@ def test_main_with_some_interfaces(
"error-counters": {},
}
cli.main(
None,
processor=JuniperRouterProcessor("mx1.ams.nl.geant.net", config),
interfaces=["ifc1"],
)
......@@ -204,6 +206,7 @@ def test_main_with_all_interfaces_and_inprov_hosts(
"error-counters": {},
}
cli.main(
None,
processor=JuniperRouterProcessor("mx1.ams.nl.geant.net", config),
)
assert process_router.call_args[1]["interfaces"].keys() == {"ifc1", "ifc2"}
......@@ -219,6 +222,7 @@ def test_main_with_all_interfaces_no_inprov_hosts(
"error-counters": {"influx": None},
}
cli.main(
None,
processor=JuniperRouterProcessor("mx1.ams.nl.geant.net", config),
)
assert process_router.call_args[1]["interfaces"] is None
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment