英文:
How do I consume a message based on a header with MassTransit in RabbitMQ?
问题
我正在尝试在RabbitMQ中使用MassTransit创建一个头部交换(headers exchange),以便消费者只根据消息中的头部信息从特定队列中消费消息。
我配置了生产者部分如下所示:
builder.Services.AddMassTransit(mt =>
{
mt.SetKebabCaseEndpointNameFormatter();
mt.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost", "/", h =>
{
h.Username("guest");
h.Password("guest");
});
cfg.Publish<OrderSubmitted>(p =>
{
p.ExchangeType = "headers";
});
});
});
这是我的消费者配置部分:
builder.Services.AddMassTransit(mt =>
{
mt.AddConsumer<OrderPickupConsumer>();
mt.AddConsumer<OrderDeliveryConsumer>();
mt.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost", "/", h =>
{
h.Username("guest");
h.Password("guest");
});
cfg.ReceiveEndpoint("OrderPickup", re =>
{
re.ConfigureConsumer<OrderPickupConsumer>(context);
re.Bind<OrderSubmitted>(x =>
{
x.ExchangeType = "headers";
x.SetBindingArgument("headers", new Dictionary<string, string>
{
{ "Transport", "pickup" },
{ "x-match", "all" }
});
});
});
cfg.ReceiveEndpoint("OrderDelivery", re =>
{
re.ConfigureConsumer<OrderDeliveryConsumer>(context);
re.Bind<OrderSubmitted>(x =>
{
x.ExchangeType = "headers";
x.SetBindingArgument("headers", new Dictionary<string, string>
{
{ "Transport", "delivery" },
{ "x-match", "all" }
});
});
});
});
});
我发布消息的方式如下:
_bus.Publish<OrderSubmitted>(new
{
__Header_Transport = "pickup",
Product = "Pizza"
});
_bus.Publish<OrderSubmitted>(new
{
__Header_Transport = "delivery",
Product = "Burgers"
});
就我理解,上述设置是正确的,然而,当启动接收应用程序时出现错误:
ArgumentException: The MassTransit.RabbitMqTransport.Topology.ExchangeEntity entity settings did not match the existing entity
我有一个干净的RabbitMQ实例,没有任何先前存在的队列,甚至当RabbitMQ根本没有启动时也会出现错误。
我的配置是否有问题?或者这可能是MassTransit中的一个bug吗?
英文:
I am trying to create a headers exchange in RabbitMQ with MassTransit, so a consumer only consumes messages from a specific queue based on headers in the message.
I configured my producer:
builder.Services.AddMassTransit(mt =>
{
mt.SetKebabCaseEndpointNameFormatter();
mt.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost", "/", h =>
{
h.Username("guest");
h.Password("guest");
});
cfg.Publish<OrderSubmitted>(p =>
{
p.ExchangeType = "headers";
});
});
});
This is my consumer configuration:
builder.Services.AddMassTransit(mt =>
{
mt.AddConsumer<OrderPickupConsumer>();
mt.AddConsumer<OrderDeliveryConsumer>();
mt.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost", "/", h =>
{
h.Username("guest");
h.Password("guest");
});
cfg.ReceiveEndpoint("OrderPickup", re =>
{
re.ConfigureConsumer<OrderPickupConsumer>(context);
re.Bind<OrderSubmitted>(x =>
{
x.ExchangeType = "headers";
x.SetBindingArgument("headers", new Dictionary<string, string>
{
{ "Transport", "pickup" },
{ "x-match", "all" }
});
});
});
cfg.ReceiveEndpoint("OrderDelivery", re =>
{
re.ConfigureConsumer<OrderDeliveryConsumer>(context);
re.Bind<OrderSubmitted>(x =>
{
x.ExchangeType = "headers";
x.SetBindingArgument("headers", new Dictionary<string, string>
{
{ "Transport", "delivery" },
{ "x-match", "all" }
});
});
});
});
});
I publish the messages like this:
_bus.Publish<OrderSubmitted>(new
{
__Header_Transport = "pickup",
Product = "Pizza"
});
_bus.Publish<OrderSubmitted>(new
{
__Header_Transport = "delivery",
Product = "Burgers"
});
As far as I understand the setup above is correct, however when starting the receiving application an error is thrown:
ArgumentException: The MassTransit.RabbitMqTransport.Topology.ExchangeEntity entity settings did not match the existing entity
I have a clean RabbitMQ instance without any previously existing queues and even when RabbitMQ is not started at all I also get the error.
Is there something wrong with my configuration? Or might this be a bug in MassTransit?
答案1
得分: 0
在一天的调查后,我发现当我添加了 re.ConfigureConsumeTopology = false;
时,错误没有显示。
然而,消息未传递到队列。为了解决这个问题,我更改了 SetBindingArgument
配置。
cfg.ReceiveEndpoint("OrderPickup", re =>
{
re.ConfigureConsumeTopology = false;
re.ConfigureConsumer<OrderPickupConsumer>(context);
re.Bind<OrderSubmitted>(x =>
{
x.ExchangeType = "headers";
x.SetBindingArgument("Transport", "pickup");
x.SetBindingArgument("x-match", "all");
});
});
cfg.ReceiveEndpoint("OrderDelivery", re =>
{
re.ConfigureConsumeTopology = false;
re.ConfigureConsumer<OrderDeliveryConsumer>(context);
re.Bind<OrderSubmitted>(x =>
{
x.ExchangeType = "headers";
x.SetBindingArgument("Transport", "delivery");
x.SetBindingArgument("x-match", "all");
});
});
```现在消息根据标头过滤器发送到正确的队列,并由消费者接收。
<details>
<summary>英文:</summary>
After a day of investigation I found that the error did not show when I added `re.ConfigureConsumeTopology = false;`
However, the message was not delivered to the queue. To fix this I changed the `SetBindingArgument` configuration.
cfg.ReceiveEndpoint("OrderPickup", re =>
{
re.ConfigureConsumeTopology = false;
re.ConfigureConsumer<OrderPickupConsumer>(context);
re.Bind<OrderSubmitted>(x =>
{
x.ExchangeType = "headers";
x.SetBindingArgument("Transport", "pickup");
x.SetBindingArgument("x-match", "all");
});
});
cfg.ReceiveEndpoint("OrderDelivery", re =>
{
re.ConfigureConsumeTopology = false;
re.ConfigureConsumer<OrderDeliveryConsumer>(context);
re.Bind<OrderSubmitted>(x =>
{
x.ExchangeType = "headers";
x.SetBindingArgument("Transport", "delivery");
x.SetBindingArgument("x-match", "all");
});
});
Now the messages are sent to the correct queue, based on the header filter and pickup by the consumer.
</details>
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论