如何使用MassTransit在RabbitMQ中基于标头消费消息?

huangapple go评论64阅读模式
英文:

How do I consume a message based on a header with MassTransit in RabbitMQ?

问题

我正在尝试在RabbitMQ中使用MassTransit创建一个头部交换(headers exchange),以便消费者只根据消息中的头部信息从特定队列中消费消息。

我配置了生产者部分如下所示:

builder.Services.AddMassTransit(mt =>
{
    mt.SetKebabCaseEndpointNameFormatter();

    mt.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host("localhost", "/", h =>
        {
            h.Username("guest");
            h.Password("guest");
        });

        cfg.Publish<OrderSubmitted>(p =>
        {
            p.ExchangeType = "headers";
        });
    });
});

这是我的消费者配置部分:

builder.Services.AddMassTransit(mt =>
{
    mt.AddConsumer<OrderPickupConsumer>();
    mt.AddConsumer<OrderDeliveryConsumer>();

    mt.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host("localhost", "/", h =>
        {
            h.Username("guest");
            h.Password("guest");
        });

        cfg.ReceiveEndpoint("OrderPickup", re =>
        {
            re.ConfigureConsumer<OrderPickupConsumer>(context);
            re.Bind<OrderSubmitted>(x =>
            {
                x.ExchangeType = "headers";
                x.SetBindingArgument("headers", new Dictionary<string, string>
                {
                    { "Transport", "pickup" },
                    { "x-match", "all" }
                });
            });
        });

        cfg.ReceiveEndpoint("OrderDelivery", re =>
        {
            re.ConfigureConsumer<OrderDeliveryConsumer>(context);
            re.Bind<OrderSubmitted>(x =>
            {
                x.ExchangeType = "headers";
                x.SetBindingArgument("headers", new Dictionary<string, string>
                {
                    { "Transport", "delivery" },
                    { "x-match", "all" }
                });
            });
        });
    });
});

我发布消息的方式如下:

_bus.Publish<OrderSubmitted>(new
{
    __Header_Transport = "pickup",
    Product = "Pizza"
});

_bus.Publish<OrderSubmitted>(new
{
    __Header_Transport = "delivery",
    Product = "Burgers"
});

就我理解,上述设置是正确的,然而,当启动接收应用程序时出现错误:
ArgumentException: The MassTransit.RabbitMqTransport.Topology.ExchangeEntity entity settings did not match the existing entity

我有一个干净的RabbitMQ实例,没有任何先前存在的队列,甚至当RabbitMQ根本没有启动时也会出现错误。

我的配置是否有问题?或者这可能是MassTransit中的一个bug吗?

英文:

I am trying to create a headers exchange in RabbitMQ with MassTransit, so a consumer only consumes messages from a specific queue based on headers in the message.

I configured my producer:

builder.Services.AddMassTransit(mt =&gt;
{
    mt.SetKebabCaseEndpointNameFormatter();

    mt.UsingRabbitMq((context, cfg) =&gt;
    {
        cfg.Host(&quot;localhost&quot;, &quot;/&quot;, h =&gt;
        {
            h.Username(&quot;guest&quot;);
            h.Password(&quot;guest&quot;);
        });

        cfg.Publish&lt;OrderSubmitted&gt;(p =&gt;
        {
            p.ExchangeType = &quot;headers&quot;;
        });
    });
});

This is my consumer configuration:

builder.Services.AddMassTransit(mt =&gt;
{
    mt.AddConsumer&lt;OrderPickupConsumer&gt;();
    mt.AddConsumer&lt;OrderDeliveryConsumer&gt;();

    mt.UsingRabbitMq((context, cfg) =&gt;
    {
        cfg.Host(&quot;localhost&quot;, &quot;/&quot;, h =&gt;
        {
            h.Username(&quot;guest&quot;);
            h.Password(&quot;guest&quot;);
        });

        cfg.ReceiveEndpoint(&quot;OrderPickup&quot;, re =&gt;
        {
            re.ConfigureConsumer&lt;OrderPickupConsumer&gt;(context);
            re.Bind&lt;OrderSubmitted&gt;(x =&gt;
            {
                x.ExchangeType = &quot;headers&quot;;
                x.SetBindingArgument(&quot;headers&quot;, new Dictionary&lt;string, string&gt;
                {
                    { &quot;Transport&quot;, &quot;pickup&quot; },
                    { &quot;x-match&quot;, &quot;all&quot; }
                });
            });
        });

        cfg.ReceiveEndpoint(&quot;OrderDelivery&quot;, re =&gt;
        {
            re.ConfigureConsumer&lt;OrderDeliveryConsumer&gt;(context);
            re.Bind&lt;OrderSubmitted&gt;(x =&gt;
            {
                x.ExchangeType = &quot;headers&quot;;
                x.SetBindingArgument(&quot;headers&quot;, new Dictionary&lt;string, string&gt;
                {
                    { &quot;Transport&quot;, &quot;delivery&quot; },
                    { &quot;x-match&quot;, &quot;all&quot; }
                });
            });
        });
    });
});

I publish the messages like this:

_bus.Publish&lt;OrderSubmitted&gt;(new
            {
                __Header_Transport = &quot;pickup&quot;,
                Product = &quot;Pizza&quot;
            });

            _bus.Publish&lt;OrderSubmitted&gt;(new
            {
                __Header_Transport = &quot;delivery&quot;,
                Product = &quot;Burgers&quot;
            });

As far as I understand the setup above is correct, however when starting the receiving application an error is thrown:
ArgumentException: The MassTransit.RabbitMqTransport.Topology.ExchangeEntity entity settings did not match the existing entity

I have a clean RabbitMQ instance without any previously existing queues and even when RabbitMQ is not started at all I also get the error.

Is there something wrong with my configuration? Or might this be a bug in MassTransit?

答案1

得分: 0

在一天的调查后,我发现当我添加了 re.ConfigureConsumeTopology = false; 时,错误没有显示。

然而,消息未传递到队列。为了解决这个问题,我更改了 SetBindingArgument 配置。

cfg.ReceiveEndpoint("OrderPickup", re =>
        {
            re.ConfigureConsumeTopology = false;
            re.ConfigureConsumer<OrderPickupConsumer>(context);
            re.Bind<OrderSubmitted>(x =>
            {
                x.ExchangeType = "headers";
                x.SetBindingArgument("Transport", "pickup");
                x.SetBindingArgument("x-match", "all");
            });
        });

        cfg.ReceiveEndpoint("OrderDelivery", re =>
        {
            re.ConfigureConsumeTopology = false;
            re.ConfigureConsumer<OrderDeliveryConsumer>(context);
            re.Bind<OrderSubmitted>(x =>
            {
                x.ExchangeType = "headers";
                x.SetBindingArgument("Transport", "delivery");
                x.SetBindingArgument("x-match", "all");
            });
        });
```现在消息根据标头过滤器发送到正确的队列,并由消费者接收。

<details>
<summary>英文:</summary>

After a day of investigation I found that the error did not show when I added `re.ConfigureConsumeTopology = false;`

However, the message was not delivered to the queue. To fix this I changed the `SetBindingArgument` configuration.

cfg.ReceiveEndpoint("OrderPickup", re =>
{
re.ConfigureConsumeTopology = false;
re.ConfigureConsumer<OrderPickupConsumer>(context);
re.Bind<OrderSubmitted>(x =>
{
x.ExchangeType = "headers";
x.SetBindingArgument("Transport", "pickup");
x.SetBindingArgument("x-match", "all");
});
});

    cfg.ReceiveEndpoint(&quot;OrderDelivery&quot;, re =&gt;
    {
        re.ConfigureConsumeTopology = false;
        re.ConfigureConsumer&lt;OrderDeliveryConsumer&gt;(context);
        re.Bind&lt;OrderSubmitted&gt;(x =&gt;
        {
            x.ExchangeType = &quot;headers&quot;;
            x.SetBindingArgument(&quot;Transport&quot;, &quot;delivery&quot;);
            x.SetBindingArgument(&quot;x-match&quot;, &quot;all&quot;);
        });
    });

Now the messages are sent to the correct queue, based on the header filter and pickup by the consumer.

</details>



huangapple
  • 本文由 发表于 2023年6月22日 15:10:40
  • 转载请务必保留本文链接:https://go.coder-hub.com/76529371.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定