MassTransit Azure总线的PrefetchCount必须为1。

huangapple go评论72阅读模式
英文:

MassTransit Azure bus PrefetchCount must be 1

问题

使用MassTransit与Azure Service Bus作为传输方式。我被迫始终将PrefetchCount设置为1,否则消费者会多次从队列中获取相同的消息,这时我会收到消息锁异常。为什么我的消费者会多次获取相同的消息并尝试处理它多次?如何避免这种行为,以允许我的消费者并发处理多条消息(只是不同的消息)?

2020-01-03 10:30:31 ERR Exception on Receiver sb://**queueuri**/create-access-points during "RenewLock"
Microsoft.Azure.ServiceBus.MessageLockLostException: The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue. Reference:2e0c6c96-a0e1-40e2-9f95-1930aad9744b, TrackingId:37d9c13c-5bb4-4c19-9853-21f9ef06c7dd_B31, SystemTracker:qesi-local-queue:Queue:create-access-points, Timestamp:2020-01-03T16:31:08

在我的消费者中被强制设置PrefetchCount:

public class CreateAccessPointCommandHandlerDef : ConsumerDefinition<CreateAccessPointCommandHandler>
{
    public CreateAccessPointCommandHandlerDef()
    {
        EndpointName = "create-access-points";
        ConcurrentMessageLimit = 4;
    }

    protected override void ConfigureConsumer(
        IReceiveEndpointConfigurator endpointConfigurator,
        IConsumerConfigurator<CreateAccessPointCommandHandler> consumerConfigurator
    )
    {
        if (endpointConfigurator is IServiceBusEndpointConfigurator)
        {
            (endpointConfigurator as IServiceBusEndpointConfigurator).LockDuration = TimeSpan.FromMinutes(5);
            (endpointConfigurator as IServiceBusEndpointConfigurator).MaxAutoRenewDuration = TimeSpan.FromMinutes(30);
            (endpointConfigurator as IServiceBusEndpointConfigurator).PrefetchCount = 1;
        }
    }
}

这是我的日志,显示我在接收消息后不久就收到了锁异常。该进程最终成功完成,然后尝试再次处理相同的消息。

2020-01-06 10:19:38 DBG Received CreateAccessPoints message (ID: 14020000-5d08-0015-ac23-08d792c43254 for ProviderNetworkId 17 and ProviderNetworkVersionId 1014.
info: Worker[0] Worker running at: 01/06/2020 10:19:52 -06:00
2020-01-06 10:19:52 INF Worker running at: 01/06/2020 10:19:52 -06:00
info: Worker[0] Worker running at: 01/06/2020 10:20:12 -06:00
2020-01-06 10:20:13 INF Worker running at: 01/06/2020 10:20:12 -06:00
fail: MassTransit[0]
  Exception on Receiver sb://**azuresb** during RenewLock
Microsoft.Azure.ServiceBus.MessageLockLostException: The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue. Reference:9470f437-5c5f-4280-aa32-0f1d97ac7c50, TrackingId:89f135d1-69ce-4c08-b80d-247d3c17eb76_B1, SystemTracker:**azuresb**:Queue:create-access-points, Timestamp:2020-01

这是我的启动日志,其中是否有任何错误?

2020-01-06 10:43:05 DBG Topic: "qesi.Core.ProviderNetwork.Commands/CreateAccessPointsCommand" ("")
2020-01-06 10:43:05 DBG Queue: "create-access-points" ("auto-delete: 29247y1M2w2h48m5s477ms")
2020-01-06 10:43:05 DBG Subscription "create-access-points" ("Commands/CreateAccessPointsCommand" -> "sb://**azuresb**/create-access-points")
2020-01-06 10:43:05 DBG Creating message receiver for sb://qesi-local-queue.servicebus.windows.net/create-access-points
2020-01-06 10:43:06 DBG Creating queue "QA1271_****_bus_nobyyyn7byybmbdabdm3ft598r"
2020-01-06 10:43:07 DBG Queue: "QA1271_****_bus_nobyyyn7byybmbdabdm3ft598r" ("dead letter, auto-delete: 5m")

最后,这是我创建Bus的代码:

return Bus.Factory.CreateUsingAzureServiceBus(cfg =>
{
    cfg.Host(_config.ServiceBusUri, host => {
        host.SharedAccessSignature(s =>
        {
            s.KeyName = _config.KeyName;
            s.SharedAccessKey = _config.SharedAccessKey;
            s.TokenTimeToLive = TimeSpan.FromDays(1);
            s.TokenScope = TokenScope.Namespace;
        });
    });
    cfg.SetLoggerFactory(_logFactory);
    cfg.ConfigureEndpoints(_provider);
    cfg.UseMessageData(_messageDataRepository);
    cfg.UseMessageRetry(retry => retry.Immediate(2));
});
英文:

Using MassTransit with Azure Service Bus as the transport. I'm forced to always set PrefetchCount to 1 or else the consumer will pull the same message off the queue multiple times, at which point I get a message lock exception. Why would my consumer pull off the same message and try to process it multiple times? How can I avoid this behaviour to allow my consumer to process multiple messages (just not the same message) concurrently?

2020-01-03 10:30:31 ERR Exception on Receiver sb://**queueuri**/create-access-points during "RenewLock"
Microsoft.Azure.ServiceBus.MessageLockLostException: The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue. Reference:2e0c6c96-a0e1-40e2-9f95-1930aad9744b, TrackingId:37d9c13c-5bb4-4c19-9853-21f9ef06c7dd_B31, SystemTracker:qesi-local-queue:Queue:create-access-points, Timestamp:2020-01-03T16:31:08

Forced to set PrefetchCount in my Consumer

public class CreateAccessPointCommandHandlerDef : ConsumerDefinition<CreateAccessPointCommandHandler>
{
    public CreateAccessPointCommandHandlerDef()
    {
        EndpointName = "create-access-points";
        ConcurrentMessageLimit = 4;
    }

    protected override void ConfigureConsumer(
        IReceiveEndpointConfigurator endpointConfigurator,
        IConsumerConfigurator<CreateAccessPointCommandHandler> consumerConfigurator
    )
    {
        if (endpointConfigurator is IServiceBusEndpointConfigurator)
        {
            (endpointConfigurator as IServiceBusEndpointConfigurator).LockDuration = TimeSpan.FromMinutes(5);
            (endpointConfigurator as IServiceBusEndpointConfigurator).MaxAutoRenewDuration = TimeSpan.FromMinutes(30);
            (endpointConfigurator as IServiceBusEndpointConfigurator).PrefetchCount = 1;
        }
    }

Here is my logs showing I get the lock exception just shortly after the message is received. The process eventually completes successfully and then it tries to process the same message again.

2020-01-06 10:19:38 DBG Received CreateAccessPoints message (ID: 14020000-5d08-0015-ac23-08d792c43254 for ProviderNetworkId 17 and ProviderNetworkVersionId 1014.
info: Worker[0] Worker running at: 01/06/2020 10:19:52 -06:00
2020-01-06 10:19:52 INF Worker running at: 01/06/2020 10:19:52 -06:00
info: Worker[0] Worker running at: 01/06/2020 10:20:12 -06:00
2020-01-06 10:20:13 INF Worker running at: 01/06/2020 10:20:12 -06:00
fail: MassTransit[0]
  Exception on Receiver sb://**azuresb** during RenewLock
Microsoft.Azure.ServiceBus.MessageLockLostException: The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue. Reference:9470f437-5c5f-4280-aa32-0f1d97ac7c50, TrackingId:89f135d1-69ce-4c08-b80d-247d3c17eb76_B1, SystemTracker:**azuresb**:Queue:create-access-points, Timestamp:2020-01

Here's the log from my startup, does any of this seem wrong?

2020-01-06 10:43:05 DBG Topic: "qesi.Core.ProviderNetwork.Commands/CreateAccessPointsCommand" ("")
2020-01-06 10:43:05 DBG Queue: "create-access-points" ("auto-delete: 29247y1M2w2h48m5s477ms")
2020-01-06 10:43:05 DBG Subscription "create-access-points" ("Commands/CreateAccessPointsCommand" -> "sb://**azuresb**/create-access-points")
2020-01-06 10:43:05 DBG Creating message receiver for sb://qesi-local-queue.servicebus.windows.net/create-access-points
2020-01-06 10:43:06 DBG Creating queue "QA1271_****_bus_nobyyyn7byybmbdabdm3ft598r"
2020-01-06 10:43:07 DBG Queue: "QA1271_****_bus_nobyyyn7byybmbdabdm3ft598r" ("dead letter, auto-delete: 5m")

And finally here is my Bus creation code:

        return Bus.Factory.CreateUsingAzureServiceBus(cfg =>
        {
            cfg.Host(_config.ServiceBusUri, host => {
                host.SharedAccessSignature(s =>
                {
                    s.KeyName = _config.KeyName;
                    s.SharedAccessKey = _config.SharedAccessKey;
                    s.TokenTimeToLive = TimeSpan.FromDays(1);
                    s.TokenScope = TokenScope.Namespace;
                });
            });
            cfg.SetLoggerFactory(_logFactory);
            cfg.ConfigureEndpoints(_provider);
            cfg.UseMessageData(_messageDataRepository);
            cfg.UseMessageRetry(retry => retry.Immediate(2));
        });

答案1

得分: 1

Azure Service Bus将消息传递给消费者。消息在一段时间内被锁定。如果消息在规定的时间内没有完成、死信化或取消,它将失去锁定,另一个竞争的消费者将接管重新处理它。如果消息多次失去锁定,特别是达到MaxDeliveryCount次数,消息将由经纪自动死信化。由于您将MaxAutoRenewDuration设置为30分钟,MassTransit将尝试续订锁定并将处理时间延长至30分钟。超过这个时间,情况将相同 - 锁定将丢失,消息将分配给另一个竞争的消费者。检查一下您的消息处理需要多长时间。

PrefetchCount是不同的设置。它意味着“预取多少消息,以便客户端不必每次都显式获取消息。在您的情况下,每当MassTransit请求/接收消息时,会预取一个额外的消息。这个设置提高了端点的总吞吐量,而不是并发性。您将其设置为4。

英文:

You're confusing several things here that are not related to how MassTransit works.
Azure Service Bus delivers messages to the consumers. A message is locked for a certain duration. If a message is not completed, dead-lettered, or cancelled in the given period of time, it will lose its lock and another competing consumer will have it for reprocessing. If a message is losing its lock multiple times, specifically MaxDeliveryCount times, the message will be dead-lettered automatically by the broker. Since you're assigning 30 minutes to MaxAutoRenewDuration, MassTransit will attempt to renew the lock and extend the processing time up to 30 minutes. Beyond that, it will be the same thing - lock will be lost and the message will be given to another competing consumer. Check how long your message processing is taking.

PrefetchCount is a different setting. It means "how many messages are pre-fetched so that the client doesn't have to go explicitly bring messages every time. In your case, that's one additional message that is prefetched whenever a message is requested/received by MassTransit. This setting is improving the overall endpoint throughput, not the concurrency. Which you set to 4.

答案2

得分: 0

原来问题是因为我们在一个连续运行的 WebJob 内运行了我们的 .NET Core 通用主机。在 WebJob 上下文中运行时,与 Azure 服务总线存在连接问题。它大部分时间都能工作,然后突然失败,失去了 Azure 服务总线的锁定。我们花了很多时间进行试验和错误来追踪问题。一旦我们将通用主机部署到 Docker 容器中,所有与服务总线锁定相关的问题都解决了。

英文:

Turns out this issue was being caused because we were running our .NET Core Generic Host inside of a Continuous WebJob. There seems to be connectivity issues to Azure Service Bus when running in the context of the WebJob. It'll work most of the time then it will blow up, losing Azure Service Bus locks. This took us a lot of trial and error to track down. As soon as we deployed the Generic Host in a Docker container, all of our service bus lock problems went away.

huangapple
  • 本文由 发表于 2020年1月4日 00:43:11
  • 转载请务必保留本文链接:https://go.coder-hub.com/59582208.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定