如何使用MassTransit在RabbitMQ中基于标头消费消息?

huangapple go评论83阅读模式
英文:

How do I consume a message based on a header with MassTransit in RabbitMQ?

问题

我正在尝试在RabbitMQ中使用MassTransit创建一个头部交换(headers exchange),以便消费者只根据消息中的头部信息从特定队列中消费消息。

我配置了生产者部分如下所示:

  1. builder.Services.AddMassTransit(mt =>
  2. {
  3. mt.SetKebabCaseEndpointNameFormatter();
  4. mt.UsingRabbitMq((context, cfg) =>
  5. {
  6. cfg.Host("localhost", "/", h =>
  7. {
  8. h.Username("guest");
  9. h.Password("guest");
  10. });
  11. cfg.Publish<OrderSubmitted>(p =>
  12. {
  13. p.ExchangeType = "headers";
  14. });
  15. });
  16. });

这是我的消费者配置部分:

  1. builder.Services.AddMassTransit(mt =>
  2. {
  3. mt.AddConsumer<OrderPickupConsumer>();
  4. mt.AddConsumer<OrderDeliveryConsumer>();
  5. mt.UsingRabbitMq((context, cfg) =>
  6. {
  7. cfg.Host("localhost", "/", h =>
  8. {
  9. h.Username("guest");
  10. h.Password("guest");
  11. });
  12. cfg.ReceiveEndpoint("OrderPickup", re =>
  13. {
  14. re.ConfigureConsumer<OrderPickupConsumer>(context);
  15. re.Bind<OrderSubmitted>(x =>
  16. {
  17. x.ExchangeType = "headers";
  18. x.SetBindingArgument("headers", new Dictionary<string, string>
  19. {
  20. { "Transport", "pickup" },
  21. { "x-match", "all" }
  22. });
  23. });
  24. });
  25. cfg.ReceiveEndpoint("OrderDelivery", re =>
  26. {
  27. re.ConfigureConsumer<OrderDeliveryConsumer>(context);
  28. re.Bind<OrderSubmitted>(x =>
  29. {
  30. x.ExchangeType = "headers";
  31. x.SetBindingArgument("headers", new Dictionary<string, string>
  32. {
  33. { "Transport", "delivery" },
  34. { "x-match", "all" }
  35. });
  36. });
  37. });
  38. });
  39. });

我发布消息的方式如下:

  1. _bus.Publish<OrderSubmitted>(new
  2. {
  3. __Header_Transport = "pickup",
  4. Product = "Pizza"
  5. });
  6. _bus.Publish<OrderSubmitted>(new
  7. {
  8. __Header_Transport = "delivery",
  9. Product = "Burgers"
  10. });

就我理解,上述设置是正确的,然而,当启动接收应用程序时出现错误:
ArgumentException: The MassTransit.RabbitMqTransport.Topology.ExchangeEntity entity settings did not match the existing entity

我有一个干净的RabbitMQ实例,没有任何先前存在的队列,甚至当RabbitMQ根本没有启动时也会出现错误。

我的配置是否有问题?或者这可能是MassTransit中的一个bug吗?

英文:

I am trying to create a headers exchange in RabbitMQ with MassTransit, so a consumer only consumes messages from a specific queue based on headers in the message.

I configured my producer:

  1. builder.Services.AddMassTransit(mt =&gt;
  2. {
  3. mt.SetKebabCaseEndpointNameFormatter();
  4. mt.UsingRabbitMq((context, cfg) =&gt;
  5. {
  6. cfg.Host(&quot;localhost&quot;, &quot;/&quot;, h =&gt;
  7. {
  8. h.Username(&quot;guest&quot;);
  9. h.Password(&quot;guest&quot;);
  10. });
  11. cfg.Publish&lt;OrderSubmitted&gt;(p =&gt;
  12. {
  13. p.ExchangeType = &quot;headers&quot;;
  14. });
  15. });
  16. });

This is my consumer configuration:

  1. builder.Services.AddMassTransit(mt =&gt;
  2. {
  3. mt.AddConsumer&lt;OrderPickupConsumer&gt;();
  4. mt.AddConsumer&lt;OrderDeliveryConsumer&gt;();
  5. mt.UsingRabbitMq((context, cfg) =&gt;
  6. {
  7. cfg.Host(&quot;localhost&quot;, &quot;/&quot;, h =&gt;
  8. {
  9. h.Username(&quot;guest&quot;);
  10. h.Password(&quot;guest&quot;);
  11. });
  12. cfg.ReceiveEndpoint(&quot;OrderPickup&quot;, re =&gt;
  13. {
  14. re.ConfigureConsumer&lt;OrderPickupConsumer&gt;(context);
  15. re.Bind&lt;OrderSubmitted&gt;(x =&gt;
  16. {
  17. x.ExchangeType = &quot;headers&quot;;
  18. x.SetBindingArgument(&quot;headers&quot;, new Dictionary&lt;string, string&gt;
  19. {
  20. { &quot;Transport&quot;, &quot;pickup&quot; },
  21. { &quot;x-match&quot;, &quot;all&quot; }
  22. });
  23. });
  24. });
  25. cfg.ReceiveEndpoint(&quot;OrderDelivery&quot;, re =&gt;
  26. {
  27. re.ConfigureConsumer&lt;OrderDeliveryConsumer&gt;(context);
  28. re.Bind&lt;OrderSubmitted&gt;(x =&gt;
  29. {
  30. x.ExchangeType = &quot;headers&quot;;
  31. x.SetBindingArgument(&quot;headers&quot;, new Dictionary&lt;string, string&gt;
  32. {
  33. { &quot;Transport&quot;, &quot;delivery&quot; },
  34. { &quot;x-match&quot;, &quot;all&quot; }
  35. });
  36. });
  37. });
  38. });
  39. });

I publish the messages like this:

  1. _bus.Publish&lt;OrderSubmitted&gt;(new
  2. {
  3. __Header_Transport = &quot;pickup&quot;,
  4. Product = &quot;Pizza&quot;
  5. });
  6. _bus.Publish&lt;OrderSubmitted&gt;(new
  7. {
  8. __Header_Transport = &quot;delivery&quot;,
  9. Product = &quot;Burgers&quot;
  10. });

As far as I understand the setup above is correct, however when starting the receiving application an error is thrown:
ArgumentException: The MassTransit.RabbitMqTransport.Topology.ExchangeEntity entity settings did not match the existing entity

I have a clean RabbitMQ instance without any previously existing queues and even when RabbitMQ is not started at all I also get the error.

Is there something wrong with my configuration? Or might this be a bug in MassTransit?

答案1

得分: 0

在一天的调查后,我发现当我添加了 re.ConfigureConsumeTopology = false; 时,错误没有显示。

然而,消息未传递到队列。为了解决这个问题,我更改了 SetBindingArgument 配置。

  1. cfg.ReceiveEndpoint("OrderPickup", re =>
  2. {
  3. re.ConfigureConsumeTopology = false;
  4. re.ConfigureConsumer<OrderPickupConsumer>(context);
  5. re.Bind<OrderSubmitted>(x =>
  6. {
  7. x.ExchangeType = "headers";
  8. x.SetBindingArgument("Transport", "pickup");
  9. x.SetBindingArgument("x-match", "all");
  10. });
  11. });
  12. cfg.ReceiveEndpoint("OrderDelivery", re =>
  13. {
  14. re.ConfigureConsumeTopology = false;
  15. re.ConfigureConsumer<OrderDeliveryConsumer>(context);
  16. re.Bind<OrderSubmitted>(x =>
  17. {
  18. x.ExchangeType = "headers";
  19. x.SetBindingArgument("Transport", "delivery");
  20. x.SetBindingArgument("x-match", "all");
  21. });
  22. });
  23. ```现在消息根据标头过滤器发送到正确的队列,并由消费者接收。
  24. <details>
  25. <summary>英文:</summary>
  26. After a day of investigation I found that the error did not show when I added `re.ConfigureConsumeTopology = false;`
  27. However, the message was not delivered to the queue. To fix this I changed the `SetBindingArgument` configuration.

cfg.ReceiveEndpoint("OrderPickup", re =>
{
re.ConfigureConsumeTopology = false;
re.ConfigureConsumer<OrderPickupConsumer>(context);
re.Bind<OrderSubmitted>(x =>
{
x.ExchangeType = "headers";
x.SetBindingArgument("Transport", "pickup");
x.SetBindingArgument("x-match", "all");
});
});

  1. cfg.ReceiveEndpoint(&quot;OrderDelivery&quot;, re =&gt;
  2. {
  3. re.ConfigureConsumeTopology = false;
  4. re.ConfigureConsumer&lt;OrderDeliveryConsumer&gt;(context);
  5. re.Bind&lt;OrderSubmitted&gt;(x =&gt;
  6. {
  7. x.ExchangeType = &quot;headers&quot;;
  8. x.SetBindingArgument(&quot;Transport&quot;, &quot;delivery&quot;);
  9. x.SetBindingArgument(&quot;x-match&quot;, &quot;all&quot;);
  10. });
  11. });
  1. Now the messages are sent to the correct queue, based on the header filter and pickup by the consumer.
  2. </details>

huangapple
  • 本文由 发表于 2023年6月22日 15:10:40
  • 转载请务必保留本文链接:https://go.coder-hub.com/76529371.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定