兔子消息队列,头部交换机,未被头部 x-match = all 路由的消息

huangapple go评论71阅读模式
英文:

RabbitMQ, headers exchange, messages not routed by headers x-match = all

问题

我正在尝试设置一个带有队列的头部交换,其中消息基于收件人头部进行路由。

该交换属于头部类型。

到目前为止,这个类能够连接到交换并将消息传递到队列。它还能够订阅队列并接收消息。每当订阅者的连接被取消时,它也会关闭连接。

当前的问题是消息没有根据收件人的头部值进行路由。

给定以下类:

import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

@Slf4j
public class MyQueue {

    private final ConnectionFactory connectionFactory;
    private Channel channel;

    // ...(略去其余部分)
}

交换是通过以下调用声明的:

channel.exchangeDeclare(RabbitMqConfig.EXCHANGE_NAME, BuiltinExchangeType.HEADERS, true);

绑定收件人日志:

Binding to {x-match=all, message-recipient=mary}
Binding to {x-match=all, message-recipient=james}
Binding to {x-match=all, message-recipient=john}

然而,消息没有匹配,就像它们是随机路由的一样:

Sending message to {message-recipient=james}
RabbitMQ sent message Hey there to james
Subscriber mary received a message Envelope(deliveryTag=1, redeliver=false, exchange=my-exchange, routingKey=) with headers {message-recipient=james}

Sending message to {message-recipient=james}
RabbitMQ sent message Hey there to james
Subscriber james received a message Envelope(deliveryTag=1, redeliver=false, exchange=my-exchange, routingKey=) with headers {message-recipient=james}

Sending message to {message-recipient=james}
RabbitMQ sent message Hey there to james
Subscriber john received a message Envelope(deliveryTag=1, redeliver=false, exchange=my-exchange, routingKey=) with headers {message-recipient=james}

为什么 x-match: all 没有匹配?

英文:

I'm trying to setup a headers exchange with a queue where messages are routed based on a recipient header.

The exchange is of type headers.

So far the class is able to connect to the exchange and feed messages to the queue.
It's also able to subscribe to the queue and receive messages. It also closes the connection whenever the subscriber's connection is cancelled.

The current problem is that the message is not routed by the recipient's header value.

Given the following class:

import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

@Slf4j
public class MyQueue {

private final ConnectionFactory connectionFactory;
private Channel channel;


public MyQueue(ConnectionFactory connectionFactory) {
    this.connectionFactory = connectionFactory;
}

public String sendMessage(TestTextMessage message) throws UndeliverableMessageException {
    try (Connection connection = connectionFactory.newConnection();
         Channel channel = connection.createChannel()) {

        Map<String, Object> headers = new HashMap<>();
        headers.put(RabbitMqConfig.MATCH_HEADER, message.getRecipient());
        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
                .deliveryMode(MessageProperties.PERSISTENT_TEXT_PLAIN.getDeliveryMode())
                .priority(MessageProperties.PERSISTENT_TEXT_PLAIN.getPriority())
                .headers(headers).build();

        log.info("Sending message to {}", headers);

        channel.basicPublish(RabbitMqConfig.EXCHANGE_NAME, "", props,
                message.getMessage().getBytes(StandardCharsets.UTF_8));

        log.info("RabbitMQ sent message {} to {}", message.getMessage(), message.getRecipient());
        return "ok";
    } catch (TimeoutException e) {
        log.error("Rabbit mq timeout", e);
    } catch (IOException e) {
        log.error("Rabbit mq io error", e);
    }
    throw new UndeliverableMessageException();
}

public Flux<String> listenMessages(String recipient) throws IOException, TimeoutException {
    Connection connection = connectionFactory.newConnection();
    this.channel = connection.createChannel();

    // The map for the headers.
    Map<String, Object> headers = new HashMap<>();
    headers.put("x-match", "all");
    headers.put(RabbitMqConfig.MATCH_HEADER, recipient);

    final String[] consumerTag = new String[1];
    Flux<String> as = Flux.create(sink -> new MessageListener<String>() {
        {
            try {
                log.info("Binding to {}", headers);
                channel.queueBind(RabbitMqConfig.QUEUE_NAME, RabbitMqConfig.EXCHANGE_NAME, "",
                        headers);
                DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                    String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
                    log.info("Subscriber {} received a message {} with headers {}", recipient, delivery.getEnvelope(),
                            delivery.getProperties().getHeaders());

                    sink.next(delivery.getEnvelope().getDeliveryTag() + "--" + message);
                    //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                };

                consumerTag[0] = channel.basicConsume(RabbitMqConfig.QUEUE_NAME,
                        true, deliverCallback, tag -> {
                            sink.complete();
                        });

            } catch (IOException e) {
                log.error("RabbitMQ IOException ", e);
            }
        }

    });

    return as.doOnCancel(() -> {
        try {
            if (consumerTag[0] == null) {
                log.error("RabbitMQ uncloseable subscription, consumerTag is null!");
                channel.close();
                return;
            }
            channel.basicCancel(consumerTag[0]);
            channel.close();
            log.info("RabbitMQ CANCEL subscription for recipient {}", recipient);
        } catch (IOException | TimeoutException e) {
            log.error("RabbitMQ channel close error", e);
        }
    });
}

interface MessageListener<T> {

}
}

The exchange is declared by the following call

channel.exchangeDeclare(RabbitMqConfig.EXCHANGE_NAME, BuiltinExchangeType.HEADERS, true);

Binding recipient log:

Binding to {x-match=all, message-recipient=mary}
Binding to {x-match=all, message-recipient=james}
Binding to {x-match=all, message-recipient=john}

Bound 3 recipients with x-match:
兔子消息队列,头部交换机,未被头部 x-match = all 路由的消息

However, messages are not matched, as if they were routed randomly

Sending message to {message-recipient=james}
RabbitMQ sent message Hey there to james
Subscriber mary received a message Envelope(deliveryTag=1, redeliver=false, exchange=my-exchange, routingKey=) with headers {message-recipient=james}

Sending message to {message-recipient=james}
RabbitMQ sent message Hey there to james
Subscriber james received a message Envelope(deliveryTag=1, redeliver=false, exchange=my-exchange, routingKey=) with headers {message-recipient=james}

Sending message to {message-recipient=james}
RabbitMQ sent message Hey there to james
Subscriber john received a message Envelope(deliveryTag=1, redeliver=false, exchange=my-exchange, routingKey=) with headers {message-recipient=james}

Why isn't x-match: all, matching?

答案1

得分: 0

阅读了@Gryphon发布的评论后,在订阅者方面,我最终为每个参与者创建了一个队列。

channel.queueDeclare(RabbitMqConfig.QUEUE_NAME + "-" + recipient,
    true,
    false,
    false,
    null)

在发布者方面,代码保持不变,消息被发送到交换机,交换机将根据 x-match: all 的配置处理路由,将消息路由到适当的队列。

英文:

After reading the comment posted by @Gryphon, on the subscriber side, I ended up creating a queue for each participant.

channel.queueDeclare(RabbitMqConfig.QUEUE_NAME + "-" + recipient,
    true,
    false,
    false,
    null)

On the publisher side, code remains unchanged, the messages are sent to the exchange, and the exchange will handle routing based on the x-match: all configuration, routing the messages to the appropiate queue.

huangapple
  • 本文由 发表于 2020年8月31日 05:59:45
  • 转载请务必保留本文链接:https://go.coder-hub.com/63662547.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定