英文:
RabbitMQ, headers exchange, messages not routed by headers x-match = all
问题
我正在尝试设置一个带有队列的头部交换,其中消息基于收件人头部进行路由。
该交换属于头部类型。
到目前为止,这个类能够连接到交换并将消息传递到队列。它还能够订阅队列并接收消息。每当订阅者的连接被取消时,它也会关闭连接。
当前的问题是消息没有根据收件人的头部值进行路由。
给定以下类:
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
@Slf4j
public class MyQueue {
private final ConnectionFactory connectionFactory;
private Channel channel;
// ...(略去其余部分)
}
交换是通过以下调用声明的:
channel.exchangeDeclare(RabbitMqConfig.EXCHANGE_NAME, BuiltinExchangeType.HEADERS, true);
绑定收件人日志:
Binding to {x-match=all, message-recipient=mary}
Binding to {x-match=all, message-recipient=james}
Binding to {x-match=all, message-recipient=john}
然而,消息没有匹配,就像它们是随机路由的一样:
Sending message to {message-recipient=james}
RabbitMQ sent message Hey there to james
Subscriber mary received a message Envelope(deliveryTag=1, redeliver=false, exchange=my-exchange, routingKey=) with headers {message-recipient=james}
Sending message to {message-recipient=james}
RabbitMQ sent message Hey there to james
Subscriber james received a message Envelope(deliveryTag=1, redeliver=false, exchange=my-exchange, routingKey=) with headers {message-recipient=james}
Sending message to {message-recipient=james}
RabbitMQ sent message Hey there to james
Subscriber john received a message Envelope(deliveryTag=1, redeliver=false, exchange=my-exchange, routingKey=) with headers {message-recipient=james}
为什么 x-match: all
没有匹配?
英文:
I'm trying to setup a headers exchange with a queue where messages are routed based on a recipient header.
The exchange is of type headers.
So far the class is able to connect to the exchange and feed messages to the queue.
It's also able to subscribe to the queue and receive messages. It also closes the connection whenever the subscriber's connection is cancelled.
The current problem is that the message is not routed by the recipient's header value.
Given the following class:
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
@Slf4j
public class MyQueue {
private final ConnectionFactory connectionFactory;
private Channel channel;
public MyQueue(ConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}
public String sendMessage(TestTextMessage message) throws UndeliverableMessageException {
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
Map<String, Object> headers = new HashMap<>();
headers.put(RabbitMqConfig.MATCH_HEADER, message.getRecipient());
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.deliveryMode(MessageProperties.PERSISTENT_TEXT_PLAIN.getDeliveryMode())
.priority(MessageProperties.PERSISTENT_TEXT_PLAIN.getPriority())
.headers(headers).build();
log.info("Sending message to {}", headers);
channel.basicPublish(RabbitMqConfig.EXCHANGE_NAME, "", props,
message.getMessage().getBytes(StandardCharsets.UTF_8));
log.info("RabbitMQ sent message {} to {}", message.getMessage(), message.getRecipient());
return "ok";
} catch (TimeoutException e) {
log.error("Rabbit mq timeout", e);
} catch (IOException e) {
log.error("Rabbit mq io error", e);
}
throw new UndeliverableMessageException();
}
public Flux<String> listenMessages(String recipient) throws IOException, TimeoutException {
Connection connection = connectionFactory.newConnection();
this.channel = connection.createChannel();
// The map for the headers.
Map<String, Object> headers = new HashMap<>();
headers.put("x-match", "all");
headers.put(RabbitMqConfig.MATCH_HEADER, recipient);
final String[] consumerTag = new String[1];
Flux<String> as = Flux.create(sink -> new MessageListener<String>() {
{
try {
log.info("Binding to {}", headers);
channel.queueBind(RabbitMqConfig.QUEUE_NAME, RabbitMqConfig.EXCHANGE_NAME, "",
headers);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
log.info("Subscriber {} received a message {} with headers {}", recipient, delivery.getEnvelope(),
delivery.getProperties().getHeaders());
sink.next(delivery.getEnvelope().getDeliveryTag() + "--" + message);
//channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
consumerTag[0] = channel.basicConsume(RabbitMqConfig.QUEUE_NAME,
true, deliverCallback, tag -> {
sink.complete();
});
} catch (IOException e) {
log.error("RabbitMQ IOException ", e);
}
}
});
return as.doOnCancel(() -> {
try {
if (consumerTag[0] == null) {
log.error("RabbitMQ uncloseable subscription, consumerTag is null!");
channel.close();
return;
}
channel.basicCancel(consumerTag[0]);
channel.close();
log.info("RabbitMQ CANCEL subscription for recipient {}", recipient);
} catch (IOException | TimeoutException e) {
log.error("RabbitMQ channel close error", e);
}
});
}
interface MessageListener<T> {
}
}
The exchange is declared by the following call
channel.exchangeDeclare(RabbitMqConfig.EXCHANGE_NAME, BuiltinExchangeType.HEADERS, true);
Binding recipient log:
Binding to {x-match=all, message-recipient=mary}
Binding to {x-match=all, message-recipient=james}
Binding to {x-match=all, message-recipient=john}
Bound 3 recipients with x-match:
However, messages are not matched, as if they were routed randomly
Sending message to {message-recipient=james}
RabbitMQ sent message Hey there to james
Subscriber mary received a message Envelope(deliveryTag=1, redeliver=false, exchange=my-exchange, routingKey=) with headers {message-recipient=james}
Sending message to {message-recipient=james}
RabbitMQ sent message Hey there to james
Subscriber james received a message Envelope(deliveryTag=1, redeliver=false, exchange=my-exchange, routingKey=) with headers {message-recipient=james}
Sending message to {message-recipient=james}
RabbitMQ sent message Hey there to james
Subscriber john received a message Envelope(deliveryTag=1, redeliver=false, exchange=my-exchange, routingKey=) with headers {message-recipient=james}
Why isn't x-match: all
, matching?
答案1
得分: 0
阅读了@Gryphon发布的评论后,在订阅者方面,我最终为每个参与者创建了一个队列。
channel.queueDeclare(RabbitMqConfig.QUEUE_NAME + "-" + recipient,
true,
false,
false,
null)
在发布者方面,代码保持不变,消息被发送到交换机,交换机将根据 x-match: all
的配置处理路由,将消息路由到适当的队列。
英文:
After reading the comment posted by @Gryphon, on the subscriber side, I ended up creating a queue for each participant.
channel.queueDeclare(RabbitMqConfig.QUEUE_NAME + "-" + recipient,
true,
false,
false,
null)
On the publisher side, code remains unchanged, the messages are sent to the exchange, and the exchange will handle routing based on the x-match: all
configuration, routing the messages to the appropiate queue.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论