已发布的消息在使用RabbitMQ与Masstransit时进入了跳过。

huangapple go评论59阅读模式
英文:

Published messages going into skipped while using RabbitMQ with Masstransit

问题

I have translated the code sections you provided:

我有两个应用程序,一个用于发布,另一个用于订阅。

让我们看看用于注册 masstransit 的发布代码。

context.Services.AddMassTransit<IEmployeeSvcBus>(x =>
            {                
                x.UsingRabbitMq((context, cfg) =>
                {
                    cfg.Host("rabbitmq://localhost:5672");

                    cfg.Message<ICreateEmployeeEto>(m => m.SetEntityName("CreateEmployee"));
                    cfg.Message<IUpdateEmployeeEto>(m => m.SetEntityName("UpdateEmployee"));
                    
                    cfg.Publish<ICreateEmployeeEto>(y =>
                    {
                        y.ExchangeType = ExchangeType.FanOut.ToString().ToLower();
                        y.AutoDelete = false;
                        y.Durable = true;
                    });

                    cfg.Publish<IUpdateEmployeeEto>(y =>
                    {
                        y.ExchangeType = ExchangeType.FanOut.ToString().ToLower();
                        y.AutoDelete = false;
                        y.Durable = true;
                    });                    
                });
            });

我们使用多个总线,因此我正在使用 IEmployeeSvcBus 用于此总线。

public interface IEmployeeSvcBus: IBus
{
}

通过上述配置,我想要说的是 ICreateEmployeeEto 和 IUpdateEmployeeEto 的消息类型应该分别发布到名为 CreateEmployee 和 UpdateEmployee 的交换机中。

问题 1: 创建了一个交换机:
<namespace>:ICreateEmployeeEto<namespace>:IUpdateEmployeeEto
暂时,我手动将它们绑定到 RabbitMQ 中的 CreateEmployeeUpdateEmployee

现在让我们看看消费者应用程序中的配置,并查看消费者类。

context.Services.AddMassTransit<IEmployeeSvcBus>(x =>
            {
                x.AddConsumer<CreateEmployeeConsumer>();
                x.AddConsumer<UpdateEmployeeConsumer>();

                x.UsingRabbitMq((context, busCfg) =>
                {
                    busCfg.Host("rabbitmq://localhost:5672");
                    
                    busCfg.ReceiveEndpoint("employees.create_dev", ep =>
                    {
                        ep.Bind("CreateEmployee");
                        ep.ConfigureConsumeTopology = false;
                        ep.Durable = true;
                        ep.Lazy = true;
                        ep.Consumer<CreateEmployeeConsumer>();
                    });

                    busCfg.ReceiveEndpoint("employees.update_dev", ep =>
                    {
                        ep.Bind("UpdateEmployee");
                        ep.ConfigureConsumeTopology = false;
                        ep.Durable = true;
                        ep.Lazy = true;
                        ep.Consumer<UpdateEmployeeConsumer>();
                    });
                });
            });

还请查看消费者定义:

public class CreateEmployeeConsumer: IConsumer<ICreateEmployeeEto>
    {
        private readonly IBus _localBus;
        private readonly ImySvc _mySvc;

        public CreateEmployeeConsumer(
            IBus localBus,
            ImySvc mySvc)
        {
            _localBus = localBus;
            _mySvc= mySvc;
        }

        public CreateEmployeeConsumer()
        {
            //For Bus Registration
        }

        public async Task Consume(ConsumeContext<ICreateEmployeeEto> context)
        {
            //Some operations.
        }
    }

问题 2: 当我发布 CreateEmployeeEto 时,它会进入 employees.create_dev_skipped 队列。

我该如何检查可能导致此问题的原因?代码中是否有一些配置错误我错过了?
请告诉我。
我对 masstransit 是新手。

对于问题 1:
我尝试通过使用 x.Exchange.ExchangeName 在 Publish 方法中检查交换机名称,但无论如何都会进入 <namespace>:ICreateEmployeeEto

对于问题 2:
我尝试注释掉使用的注入,删除绑定并尝试。

英文:

I have 2 applications, one for publishing and another for subscribing.

Let's see the publishing code used for registering masstransit.

context.Services.AddMassTransit&lt;IEmployeeSvcBus&gt;(x =&gt;
            {                
                x.UsingRabbitMq((context, cfg) =&gt;
                {
                    cfg.Host(&quot;rabbitmq://localhost:5672&quot;);

                    cfg.Message&lt;ICreateEmployeeEto&gt;(m =&gt; m.SetEntityName(&quot;CreateEmployee&quot;));
                    cfg.Message&lt;IUpdateEmployeeEto&gt;(m =&gt; m.SetEntityName(&quot;UpdateEmployee&quot;));
                    
                    cfg.Publish&lt;ICreateEmployeeEto&gt;(y =&gt;
                    {
                        y.ExchangeType = ExchangeType.FanOut.ToString().ToLower();
                        y.AutoDelete = false;
                        y.Durable = true;
                    });

                    cfg.Publish&lt;IUpdateEmployeeEto&gt;(y =&gt;
                    {
                        y.ExchangeType = ExchangeType.FanOut.ToString().ToLower();
                        y.AutoDelete = false;
                        y.Durable = true;
                    });                    
                });
            });

We're using multiple buses, hence I am using IEmployeeSvcBus for this bus.

public interface IEmployeeSvcBus: IBus
{
}

With the above configuration I meant to say that message types of ICreateEmployeeEto and IUpdateEmployeeEto should be published to exchanges with names CreateEmployee and UpdateEmployee respectively.

> Issue 1: There is an exchange created with:
&lt;namespace&gt;:ICreateEmployeeEto & &lt;namespace&gt;:IUpdateEmployeeEto.

For the time being I manually binded them to CreateEmployee & UpdateEmployee respectively in RabbitMQ.

Now let's look at the configuration at consumer application and also see the consumer classes.

context.Services.AddMassTransit&lt;IEmployeeSvcBus&gt;(x =&gt;
            {
                x.AddConsumer&lt;CreateEmployeeConsumer&gt;();
                x.AddConsumer&lt;UpdateEmployeeConsumer&gt;();

                x.UsingRabbitMq((context, busCfg) =&gt;
                {
                    busCfg.Host(&quot;rabbitmq://localhost:5672&quot;);
                    
                    busCfg.ReceiveEndpoint(&quot;employees.create_dev&quot;, ep =&gt;
                    {
                        ep.Bind(&quot;CreateEmployee&quot;);
                        ep.ConfigureConsumeTopology = false;
                        ep.Durable = true;
                        ep.Lazy = true;
                        ep.Consumer&lt;CreateEmployeeConsumer&gt;();
                    });

                    busCfg.ReceiveEndpoint(&quot;employees.update_dev&quot;, ep =&gt;
                    {
                        ep.Bind(&quot;UpdateEmployee&quot;);
                        ep.ConfigureConsumeTopology = false;
                        ep.Durable = true;
                        ep.Lazy = true;
                        ep.Consumer&lt;UpdateEmployeeConsumer&gt;();
                    });
                });
            });

Also see the consumer definitions:

public class CreateEmployeeConsumer: IConsumer&lt;ICreateEmployeeEto&gt;
    {
        private readonly IBus _localBus;
        private readonly ImySvc _mySvc;

        public CreateEmployeeConsumer(
            IBus localBus,
            ImySvc mySvc)
        {
            _localBus = localBus;
            _mySvc= mySvc;
        }

        public CreateEmployeeConsumer()
        {
            //For Bus Registration
        }

        public async Task Consume(ConsumeContext&lt;ICreateEmployeeEto&gt; context)
        {
            //Some operations.
        }
    }

> Issue 2: When I publish the CreateEmployeeEto, its going to the employees.create_dev_skipped queue.

How'd I check what may be causing the issue?
Is there some misconfiguration in the code that I missed?
Please let me know.
I am new to masstransit.

for issue 1:
I tried checking the exchange name inside the Publish method by using x.Exchange.ExchangeName, it's giving as "CreateCompanyPlaceholder". But no matter what it's going to <namespace>:ICreateEmployeeEto.

for issue 2:
I tried commenting the injections used, removing binding and tried.

答案1

得分: 1

当MassTransit将消息移动到*_skipped*队列时,这是因为接收端未消耗消息类型。确保您的消息正确创建并具有相同的类型和命名空间:

从文档中获取

MassTransit使用包括命名空间在内的完整类型名称来表示消息合同。在两个单独的项目中创建相同的消息类型时,命名空间必须匹配,否则消息将无法被消耗。

此外,如果您希望对特定消息类型使用不同的交换名称进行生产和消耗,您可以使用属性或消息拓扑配置(类似于您上面指定的方式)来更改这些消息类型的交换名称。

更改应该在_producer_和_consumer_上都指定,以确保一致性。在那时,MassTransit将执行正确的操作并为您绑定适当的交换。

英文:

When MassTransit moves a message to the _skipped queue, it's because the message type is not consumed on the receive endpoint. Be sure your messages are created properly and have the same type and namespace:

From the documentation.

> MassTransit uses the full type name, including the namespace, for message contracts. When creating the same message type in two separate projects, the namespaces must match or the message will not be consumed.

Beyond that, if you wish to produce and consume messages using different exchange names for certain message types, you can use attributes or the message topology configuration (similar to what you specified above) to change the exchange names for those message types.

The changes should be specified on both the producer and the consumer to ensure consistency. At that point, MassTransit will do the right thing and bind the proper exchanges for you.

huangapple
  • 本文由 发表于 2023年4月17日 17:48:02
  • 转载请务必保留本文链接:https://go.coder-hub.com/76033797.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定