Skip to content
Snippets Groups Projects
Commit 6d03f525 authored by Erik Reid's avatar Erik Reid
Browse files

copied RMQClient from dashboard

parent 858f53e3
No related branches found
No related tags found
No related merge requests found
This diff is collapsed.
"""
copied from: dashboard-v3-python v0.226, dashboard.messaging.queue
"""
import logging
from typing import Any, Dict, Optional, Sequence, Tuple, Union
from pika.exceptions import ChannelClosedByBroker
from pika.adapters.blocking_connection import BlockingChannel, BlockingConnection
import os
DASHBOARD_USE_QUORUM_QUEUE = os.getenv(
"DASHBOARD_USE_QUORUM_QUEUE", ""
).lower() not in {"0", "", "false"}
logger = logging.getLogger(__name__)
def loose_queue_declare(
channel: BlockingChannel,
queue: str = "",
exclusive=False,
single_active_consumer=False,
force_quorum_queue=False,
) -> Optional[str]:
"""declare a queue (either classic or quorum, depending on the USE_QUORUM_QUEUE
global variable and if it is a ``dashboard.*`` queue). In case of a failure that
the queue already exists as a different type, this function fails silently.
:param channel: the channel to use
:param queue: the queue name. If empty string, the broker will create a unique queue
name (default: '')
:param kwargs: additional parameters to pass to ``channel.queue_declare``
"""
durable = False
arguments: Dict[str, Any] = {}
if force_quorum_queue or (
DASHBOARD_USE_QUORUM_QUEUE and queue.startswith("dashboard.")
):
durable = True
arguments["x-queue-type"] = "quorum"
if single_active_consumer:
arguments["x-single-active-consumer"] = True
try:
result = channel.queue_declare(
queue, durable=durable, exclusive=exclusive, arguments=arguments or None
)
return result.method.queue
except ChannelClosedByBroker as e:
if e.reply_code == 406: # PRECONDITION_FAILED due to incompatible queue type
requested_type, existing_type = "classic", "quorum"
if DASHBOARD_USE_QUORUM_QUEUE:
requested_type, existing_type = existing_type, requested_type
logger.warning(
f"Trying to declare {requested_type} queue '{queue}'"
f" but queue already exists as {existing_type} queue"
)
return None
raise
def setup_channel(
connection: BlockingConnection,
exchange_name: str,
exchange_type="fanout",
queue_name: Optional[str] = None,
queue_declare=True,
exclusive=False,
single_active_consumer=False,
routing_keys: Union[str, Sequence[Union[str, None]], None] = None,
prefetch_count: Optional[int] = None,
force_quorum_queue=False,
) -> Tuple[BlockingChannel, Optional[str]]:
"""Setup a channel and declare the exchange and optionally the queue.
:param connection: A ``pika`` ``BlockingConnection``
:param exchange_name: the exchange to declare
:param exchange_type: the exchange type (default: ``fanout``)
:param queue_name: The queue to bind to, if given. Can be set to empty string to
get a temporary queue from the broker. When queue_name=None, the channel can
only be used for publishing (default: None)
:param queue_declare: Whether to declare the queue before binding (default: True)
:param exclusive: Whether this should be declared as an exclusive queue (default:
False)
:param routing_keys: Optional routing keys to bind to the queue.
:returns: A tuple (channel: ``BlockingChannel``, queue_name: Optional[str])
"""
channel = connection.channel()
if prefetch_count is not None:
channel.basic_qos(prefetch_count=prefetch_count)
if exchange_name != "": # cannot declare default exchange
channel.exchange_declare(exchange=exchange_name, exchange_type=exchange_type)
if queue_name is None:
return channel, None
if queue_name == "" and not queue_declare:
raise ValueError("must set queue_declare=True when supplying empty queue name")
if queue_name == "" and not exclusive:
raise ValueError("must set exclusive=True for anonymous queues")
if exclusive and single_active_consumer:
raise ValueError("Exclusive queues cannot have single active consumer")
if queue_declare:
if exclusive:
# Short circuit exclusive queues that are never quorum and never
# pre-exist
result = channel.queue_declare(queue_name, exclusive=True)
queue_name = result.method.queue
else:
result = loose_queue_declare(
channel,
queue_name,
exclusive=exclusive,
single_active_consumer=single_active_consumer,
force_quorum_queue=force_quorum_queue,
)
# if a queue declare fails, the channel is in an unusable state.
# Start over but skip the declare
if result is None:
return setup_channel(
connection=connection,
exchange_name=exchange_name,
exchange_type=exchange_type,
queue_name=queue_name,
queue_declare=False,
exclusive=exclusive,
routing_keys=routing_keys,
prefetch_count=prefetch_count,
single_active_consumer=single_active_consumer,
force_quorum_queue=force_quorum_queue,
)
queue_name = result
assert queue_name != "", "queue name must not be empty here"
if routing_keys is None or isinstance(routing_keys, str):
routing_keys = [routing_keys]
for rk in routing_keys:
channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key=rk)
return channel, queue_name
...@@ -3,6 +3,7 @@ uvicorn[standard] ...@@ -3,6 +3,7 @@ uvicorn[standard]
requests requests
jsonschema jsonschema
sentry_sdk sentry_sdk
pika
httpx # required for fastapi TestClient httpx # required for fastapi TestClient
pytest pytest
......
...@@ -16,6 +16,7 @@ setup( ...@@ -16,6 +16,7 @@ setup(
"requests", "requests",
"jsonschema", "jsonschema",
"sentry_sdk", "sentry_sdk",
"pika"
], ],
long_description=open("README.md", encoding="utf-8").read(), long_description=open("README.md", encoding="utf-8").read(),
long_description_content_type="text/markdown", long_description_content_type="text/markdown",
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment