"""

copied from: dashboard-v3-python v0.226, dashboard.messaging.queue
updated typehints to satisfy mypy, and linted with changes suggested by ruff

"""

import logging
import os
from collections.abc import Sequence
from typing import Any

from pika.adapters.blocking_connection import BlockingChannel, BlockingConnection
from pika.exceptions import ChannelClosedByBroker
from pika.exchange_type import ExchangeType

DASHBOARD_USE_QUORUM_QUEUE = os.getenv(
    "DASHBOARD_USE_QUORUM_QUEUE", ""
).lower() not in {"0", "", "false"}

logger = logging.getLogger(__name__)


def loose_queue_declare(
    channel: BlockingChannel,
    queue: str = "",
    exclusive: bool = False,
    single_active_consumer: bool = False,
    force_quorum_queue: bool = False,
) -> str | None:
    """declare a queue (either classic or quorum, depending on the USE_QUORUM_QUEUE
    global variable and if it is a ``dashboard.*`` queue). In case of a failure that
    the queue already exists as a different type, this function fails silently.

    :param channel: the channel to use
    :param queue: the queue name. If empty string, the broker will create a unique queue
        name (default: '')
    :param kwargs: additional parameters to pass to ``channel.queue_declare``
    """
    durable = False
    arguments: dict[str, Any] = {}
    if force_quorum_queue or (
        DASHBOARD_USE_QUORUM_QUEUE and queue.startswith("dashboard.")
    ):
        durable = True
        arguments["x-queue-type"] = "quorum"
    if single_active_consumer:
        arguments["x-single-active-consumer"] = True
    try:
        result = channel.queue_declare(
            queue, durable=durable, exclusive=exclusive, arguments=arguments or None
        )
        assert isinstance(result.method.queue, str)  # for mypy
        return result.method.queue

    except ChannelClosedByBroker as e:
        if e.reply_code == 406:  # PRECONDITION_FAILED due to incompatible queue type
            requested_type, existing_type = "classic", "quorum"
            if DASHBOARD_USE_QUORUM_QUEUE:
                requested_type, existing_type = existing_type, requested_type
            logger.warning(
                f"Trying to declare {requested_type} queue '{queue}'"
                f" but queue already exists as {existing_type} queue"
            )
            return None
        raise


def setup_channel(
    connection: BlockingConnection,
    exchange_name: str,
    exchange_type: ExchangeType = ExchangeType.fanout,
    queue_name: str | None = None,
    queue_declare: bool = True,
    exclusive: bool = False,
    single_active_consumer: bool = False,
    routing_keys: Sequence[str] = [],
    prefetch_count: int | None = None,
    force_quorum_queue: bool = False,
) -> tuple[BlockingChannel, str | None]:
    """Setup a channel and declare the exchange and optionally the queue.

    :param connection: A ``pika`` ``BlockingConnection``
    :param exchange_name: the exchange to declare
    :param exchange_type: the exchange type (default: ``fanout``)
    :param queue_name: The queue to bind to, if given. Can be set to empty string to
        get a temporary queue from the broker. When queue_name=None, the channel can
        only be used for publishing (default: None)
    :param queue_declare: Whether to declare the queue before binding (default: True)
    :param exclusive: Whether this should be declared as an exclusive queue (default:
        False)
    :param routing_keys: Optional routing keys to bind to the queue.
    :returns: A tuple (channel: ``BlockingChannel``, queue_name: Optional[str])
    """
    channel = connection.channel()
    if prefetch_count is not None:
        channel.basic_qos(prefetch_count=prefetch_count)

    if exchange_name != "":  # cannot declare default exchange
        channel.exchange_declare(exchange=exchange_name, exchange_type=exchange_type)

    if queue_name is None:
        return channel, None

    if queue_name == "" and not queue_declare:
        raise ValueError("must set queue_declare=True when supplying empty queue name")
    if queue_name == "" and not exclusive:
        raise ValueError("must set exclusive=True for anonymous queues")
    if exclusive and single_active_consumer:
        raise ValueError("Exclusive queues cannot have single active consumer")

    if queue_declare:
        if exclusive:
            # Short circuit exclusive queues that are never quorum and never
            # pre-exist
            result = channel.queue_declare(queue_name, exclusive=True)
            queue_name = result.method.queue
        else:
            queue_name = loose_queue_declare(
                channel,
                queue_name,
                exclusive=exclusive,
                single_active_consumer=single_active_consumer,
                force_quorum_queue=force_quorum_queue,
            )

            # if a queue declare fails, the channel is in an unusable state.
            # Start over but skip the declare
            if queue_name is None:
                return setup_channel(
                    connection=connection,
                    exchange_name=exchange_name,
                    exchange_type=exchange_type,
                    queue_name=queue_name,
                    queue_declare=False,
                    exclusive=exclusive,
                    routing_keys=routing_keys,
                    prefetch_count=prefetch_count,
                    single_active_consumer=single_active_consumer,
                    force_quorum_queue=force_quorum_queue,
                )

    assert queue_name, "queue name must not be empty here"
 
    if not routing_keys:
        # in case no routing keys are provided (as for fanout exchanges),
        # ensure the queue is still bound to the exchange
         channel.queue_bind(exchange=exchange_name, queue=queue_name)

    for rk in routing_keys:
        channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key=rk)

    return channel, queue_name