""" copied from: dashboard-v3-python v0.226, dashboard.messaging.queue updated typehints to satisfy mypy, and linted with changes suggested by ruff """ import logging import os from collections.abc import Sequence from typing import Any from pika.adapters.blocking_connection import BlockingChannel, BlockingConnection from pika.exceptions import ChannelClosedByBroker from pika.exchange_type import ExchangeType DASHBOARD_USE_QUORUM_QUEUE = os.getenv( "DASHBOARD_USE_QUORUM_QUEUE", "" ).lower() not in {"0", "", "false"} logger = logging.getLogger(__name__) def loose_queue_declare( channel: BlockingChannel, queue: str = "", exclusive: bool = False, single_active_consumer: bool = False, force_quorum_queue: bool = False, ) -> str | None: """declare a queue (either classic or quorum, depending on the USE_QUORUM_QUEUE global variable and if it is a ``dashboard.*`` queue). In case of a failure that the queue already exists as a different type, this function fails silently. :param channel: the channel to use :param queue: the queue name. If empty string, the broker will create a unique queue name (default: '') :param kwargs: additional parameters to pass to ``channel.queue_declare`` """ durable = False arguments: dict[str, Any] = {} if force_quorum_queue or ( DASHBOARD_USE_QUORUM_QUEUE and queue.startswith("dashboard.") ): durable = True arguments["x-queue-type"] = "quorum" if single_active_consumer: arguments["x-single-active-consumer"] = True try: result = channel.queue_declare( queue, durable=durable, exclusive=exclusive, arguments=arguments or None ) assert isinstance(result.method.queue, str) # for mypy return result.method.queue except ChannelClosedByBroker as e: if e.reply_code == 406: # PRECONDITION_FAILED due to incompatible queue type requested_type, existing_type = "classic", "quorum" if DASHBOARD_USE_QUORUM_QUEUE: requested_type, existing_type = existing_type, requested_type logger.warning( f"Trying to declare {requested_type} queue '{queue}'" f" but queue already exists as {existing_type} queue" ) return None raise def setup_channel( connection: BlockingConnection, exchange_name: str, exchange_type: ExchangeType = ExchangeType.fanout, queue_name: str | None = None, queue_declare: bool = True, exclusive: bool = False, single_active_consumer: bool = False, routing_keys: Sequence[str] = [], prefetch_count: int | None = None, force_quorum_queue: bool = False, ) -> tuple[BlockingChannel, str | None]: """Setup a channel and declare the exchange and optionally the queue. :param connection: A ``pika`` ``BlockingConnection`` :param exchange_name: the exchange to declare :param exchange_type: the exchange type (default: ``fanout``) :param queue_name: The queue to bind to, if given. Can be set to empty string to get a temporary queue from the broker. When queue_name=None, the channel can only be used for publishing (default: None) :param queue_declare: Whether to declare the queue before binding (default: True) :param exclusive: Whether this should be declared as an exclusive queue (default: False) :param routing_keys: Optional routing keys to bind to the queue. :returns: A tuple (channel: ``BlockingChannel``, queue_name: Optional[str]) """ channel = connection.channel() if prefetch_count is not None: channel.basic_qos(prefetch_count=prefetch_count) if exchange_name != "": # cannot declare default exchange channel.exchange_declare(exchange=exchange_name, exchange_type=exchange_type) if queue_name is None: return channel, None if queue_name == "" and not queue_declare: raise ValueError("must set queue_declare=True when supplying empty queue name") if queue_name == "" and not exclusive: raise ValueError("must set exclusive=True for anonymous queues") if exclusive and single_active_consumer: raise ValueError("Exclusive queues cannot have single active consumer") if queue_declare: if exclusive: # Short circuit exclusive queues that are never quorum and never # pre-exist result = channel.queue_declare(queue_name, exclusive=True) queue_name = result.method.queue else: queue_name = loose_queue_declare( channel, queue_name, exclusive=exclusive, single_active_consumer=single_active_consumer, force_quorum_queue=force_quorum_queue, ) # if a queue declare fails, the channel is in an unusable state. # Start over but skip the declare if queue_name is None: return setup_channel( connection=connection, exchange_name=exchange_name, exchange_type=exchange_type, queue_name=queue_name, queue_declare=False, exclusive=exclusive, routing_keys=routing_keys, prefetch_count=prefetch_count, single_active_consumer=single_active_consumer, force_quorum_queue=force_quorum_queue, ) assert queue_name, "queue name must not be empty here" if not routing_keys: # in case no routing keys are provided (as for fanout exchanges), # ensure the queue is still bound to the exchange channel.queue_bind(exchange=exchange_name, queue=queue_name) for rk in routing_keys: channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key=rk) return channel, queue_name