长时间不活动和队列客户端频繁出现的MessageLockLostException。

huangapple go评论99阅读模式
英文:

Periods of prolonged inactivity and frequent MessageLockLostException in QueueClient

问题

背景

我们有一个数据传输解决方案,其中Azure Service Bus是消息代理。我们通过x队列传输来自x数据集的数据 - 使用x个专用的QueueClient作为发送者。一些发送者以每两秒一个消息的速度发布消息,而其他发送者以每15分钟一个消息的速度发布消息。

数据源端(发送者所在的地方)的应用程序工作正常,提供了我们所期望的吞吐量。

另一方面,我们有一个应用程序,每个队列有一个QueueClient接收器,并具有以下配置:

  • maxConcurrentCalls = 1
  • autoComplete = true(如果接收模式为RECEIVEANDDELETE)和false(如果接收模式为PEEKLOCK) - 对于某些接收器,如果它们意外关闭,希望保留Service Bus队列中的消息。
  • maxAutoRenewDuration = 3分钟(所有队列的锁定持续时间 = 30秒)
  • 带有单个线程的Executor服务

每个这些接收器都注册了以下的MessageHandler

public CompletableFuture<Void> onMessageAsync(final IMessage message) {

	// 反序列化消息体
	final CustomObject customObject = (CustomObject) SerializationUtils.deserialize((byte[]) message.getMessageBody().getBinaryData().get(0));

	// 异步处理 processDB1() 和 processDB2()
	final List<CompletableFuture<Boolean>> processFutures = new ArrayList<CompletableFuture<Boolean>>();

	processFutures.add(processDB1(customObject));  // processDB1() 返回 Boolean
	processFutures.add(processDB2(customObject)); // processDB2() 返回 Boolean

	// 加入这两个CompletableFuture以获取结果 Booleans
	List<Boolean> results = CompletableFuture.allOf(processFutures.toArray(new CompletableFuture[processFutures.size()])).thenApply(future -> processFutures.stream()
        .map(CompletableFuture<Boolean>::join).collect(Collectors.toList())

    if (results.contains(false)) {
    	// 如果结果包含 false,则将消息放入死信队列
    	return getQueueClient().deadLetterAsync(message.getLockToken());
    } else {
    	// 否则完成消息
    	getQueueClient().completeAsync(message.getLockToken());
    }
}

我们进行了以下场景测试:

场景1 - 接收模式 = RECEIVEANDDELETE,消息发布速率:每分钟 30

预期行为

应连续接收消息,并具有恒定的吞吐量(不一定要与消息发布源处的吞吐量相同)。

实际行为

我们观察到随机的长时间不活动期,从几分钟到几小时不等 - 在指标图表上观察到Service Bus命名空间没有出站消息(观察到)这段时间没有消费日志!

场景2 - 接收模式 = PEEKLOCK,消息发布速率:每分钟 30

预期行为

应连续接收消息,并具有恒定的吞吐量(不一定要与消息发布源处的吞吐量相同)。

实际行为

在应用程序运行20-30分钟后,我们不断看到MessageLockLostException

我们尝试了以下操作 -

  1. 我们将预取计数减少(从最佳实践指南中提到的20 *处理速率)到最低值(在一个测试周期中甚至为0),以减少为客户端锁定的消息数。
  2. maxAutoRenewDuration增加到5分钟 - 我们的processDB1()processDB2()在几乎90%的情况下不会超过一两秒,所以我认为锁定持续时间为30秒和maxAutoRenewDuration在这里不是问题。
  3. 删除了阻塞的CompletableFuture.get(),并使处理同步。

这些调整都没有帮助我们解决问题。我们观察到COMPLETERENEWMESSAGELOCK引发了MessageLockLostException

我们需要帮助回答以下问题:

  1. 为什么在场景1中QueueClient会有长时间的不活动期?
  2. 我们如何知道MessageLockLostException是因为锁定确实已过期而引发的?我们怀疑锁定不会太快过期,因为我们的处理只需要一两秒。禁用预取也没有解决这个问题。

版本和Service Bus详细信息

  • Java - openjdk-11-jre
  • Azure Service Bus命名空间层级:标准
  • Java SDK版本 - 3.4.0
英文:

Background

We have a data transfer solution with Azure Service Bus as the message broker. We are transferring data from x datasets through x queues - with x dedicated QueueClients as senders. Some senders publish messages at the rate of one message every two seconds, while others publish one every 15 minutes.

The application on the data source side (where senders are) is working just fine, giving us the desired throughput.

On the other side, we have an application with one QueueClient receiver per queue with the following configuration:

  • maxConcurrentCalls = 1
  • autoComplete = true (if receive mode = RECEIVEANDDELETE) and false (if receive mode = PEEKLOCK) - we have some receivers where, if they shut-down unexpectedly, would want to preserve the messages in the Service Bus Queue.
  • maxAutoRenewDuration = 3 minutes (lock duraition on all queues = 30 seconds)
  • an Executor service with a single thread

The MessageHandler registered with each of these receivers does the following:

public CompletableFuture&lt;Void&gt; onMessageAsync(final IMessage message) {

	// deserialize the message body
	final CustomObject customObject = (CustomObject)SerializationUtils.deserialize((byte[])message.getMessageBody().getBinaryData().get(0));

	// process processDB1() and processDB2() asynchronously
	final List&lt;CompletableFuture&lt;Boolean&gt;&gt; processFutures = new ArrayList&lt;CompletableFuture&lt;Boolean&gt;&gt;();

	processFutures.add(processDB1(customObject));  // processDB1() returns Boolean
	processFutures.add(processDB2(customObject)); // processDB2() returns Boolean

	// join both the completablefutures to get the result Booleans
	List&lt;Boolean&gt; results = CompletableFuture.allOf(processFutures.toArray(new CompletableFuture[processFutures.size()])).thenApply(future -&gt; processFutures.stream()
        .map(CompletableFuture&lt;Boolean&gt;::join).collect(Collectors.toList())

    if (results.contains(false)) {
    	// dead-letter the message if results contains false
    	return getQueueClient().deadLetterAsync(message.getLockToken());
    } else {
    	// complete the message otherwise
    	getQueueClient().completeAsync(message.getLockToken());
    }
}

We tested with the following scenarios:

Scenario 1 - receive mode = RECEIVEANDDELETE, message publish rate: 30/ minute

Expected Behavior

The messages should be received continuosuly with a constant throughput (which need not necessarily be the throughput at source, where messages are published).

Actual behavior

We observe random, long periods of inactivity from the QueueClient - ranging from minutes to hours - there is no Outgoing Messages from the Service Bus namespace (observed on the Metrics charts) and there are no consumption logs for the same time periods!

Scenario 2 - receive mode = PEEKLOCK, message publish rate: 30/ minute

Expected Behavior

The messages should be received continuosuly with a constant throughput (which need not necessarily be the throughput at source, where messages are published).

Actual behavior

We keep seeing MessageLockLostException constantly after 20-30 minutes into the run of the application.

We tried doing the following -

  1. we reduced the prefetch count (from 20 * processing rate - as mentioned in the Best Practices guide) to a bare minimum (to even 0 in one test cycle), to reduce the no. of messages that are locked for the client
  2. increased the maxAutoRenewDuration to 5 minutes - our processDB1() and processDB2() do not take more than a second or two for almost 90% of the cases - so, I think the lock duration of 30 seconds and maxAutoRenewDuration are not issues here.
  3. removed the blocking CompletableFuture.get() and made the processing synchronous.

None of these tweaks helped us fix the issue. What we observed is that the COMPLETE or RENEWMESSAGELOCK are throwing the MessageLockLostException.

We need help with finding answers for the following:

  1. why is there a long period of inactivity of the QueueClient in scenario 1?
  2. how do we know the MessageLockLostExceptions are thrown, because the locks have indeed expired? we suspect the locks cannot expire too soon, as our processing happens in a second or two. disabling prefetch also did not solve this for us.

Versions and Service Bus details

  • Java - openjdk-11-jre
  • Azure Service Bus namespace tier: Standard
  • Java SDK version - 3.4.0

答案1

得分: 0

对于情景1:

如果您已启用重复检测历史记录,则可能会发生如下所述的行为:

长时间不活动和队列客户端频繁出现的MessageLockLostException。

我已启用了30秒。我不断向服务总线发送重复消息(在我的情况下,是来自客户端的具有相同消息ID的消息 - 每分钟30条)。我会在窗口中看到没有活动的传出。尽管消息从发送客户端接收到了服务总线,但我无法在传出消息中看到它们。您可能可以检查是否遇到了被过滤的重复消息,从而导致传出消息无法活动。

长时间不活动和队列客户端频繁出现的MessageLockLostException。

还请注意:您无法在创建队列后启用/禁用重复检测。只能在创建队列时执行此操作。

英文:

For Scenario 1 :

If you have the duplicate detection history enabled, there is a possibility of this behavior happening as per the below explained scenario :

长时间不活动和队列客户端频繁出现的MessageLockLostException。

I had enabled for 30 seconds. I constantly hit Service bus with duplicate messages ( im my case messages with the same messageid from the client - 30 /per minute). I would be seeing a no activity outgoing for the window. Though the messages are received at the servicebus from the sending client, I was not be able to see them in outgoing messages. You could probably check whether you re encountering the duplicate messages which are filtered - inturn resulting inactivity in outgoing.

长时间不活动和队列客户端频繁出现的MessageLockLostException。

Also Note : You can't enable/disable duplicate detection after the queue is created. You can only do so at the time of creating the queue.

答案2

得分: 0

问题并不在于QueueClient对象本身。而是在于我们从MessageHandler内部触发的那些进程:processDB1(customObject)processDB2(customObject)。由于这些进程没有经过优化,消息消耗下降了,锁定已过期(在peek-lock模式下),因为处理程序花费的时间更多(相对于消息发布到队列的速度),以完成这些操作。

在优化了这些进程之后,在peek-lock模式下的消耗和完成都正常。

英文:

The issue was not with the QueueClient object per se. It was with the processes that we were triggering from within the MessageHandler: processDB1(customObject) and processDB2(customObject). since these processes were not optimized, the message consumption dropped and the locks gor expired (in peek-lock mode), as the handler was spending more time (in relation to the rate at which messages were published to the queues) in completing these opertations.

After optimizing the processes, the consumption and completion (in peek-lock mode) were just fine.

huangapple
  • 本文由 发表于 2020年7月22日 14:25:59
  • 转载请务必保留本文链接:https://go.coder-hub.com/63028151.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定