英文:
Increasing concurrent calls undefined behaviour in Spring Cloud Stream Azure Service Bus
问题
以下是您要翻译的内容:
我有一个简单的Spring Cloud Azure Service Bus消费者,定义如下,遵循官方文档:
@Bean
public Consumer<Message<byte[]>> consume() {
return message -> someProcessing(message);
}
默认策略,如果spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.requeue-rejected
标志为false
,则在消费过程中失败的消息会重新排队等待MaxDeliveryCount
次,然后进入DLQ。我测试了这个行为,并且它一直有效 - 例如,如果我将代码更新为
@Bean
public Consumer<Message<byte[]>> consume() {
return message -> {
someProcessing(message);
throw new IllegalStateException();
}
}
消息会按预期重新排队。
然而,如果我通过设置spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.max-concurrent-calls=2
来增加连续调用的次数,行为就变得不确定了。**有时它会完全放弃消息,有时会将它们重新排队到主队列。**我不明白为什么行为不一致,即消息总是重新排队。部分堆栈跟踪如下。
2023-05-26 12:03:47.955 ERROR 52176 --- [undedElastic-12] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@71c11ee9]; nested exception is java.lang.IllegalStateException, failedMessage=GenericMessage [payload=byte[1948], headers={azure_service_bus_expires_at=2023-06-11T11:29:47.566Z, OwnedBy=owner, azure_service_bus_received_message_context=com.azure.messaging.servicebus.ServiceBusReceivedMessageContext@780ad3f2, azure_service_bus_message_id=ede22e431e0642ee9d3be263b3989c77, azure_service_bus_enqueued_sequence_number=132449, azure_service_bus_enqueued_time=2023-05-12T11:29:47.566Z, azure_service_bus_lock_token=c2167e9a-b6f5-4aed-b390-0f9714caf2a9, azure_service_bus_sequence_number=6969, azure_service_bus_time_to_live=PT720H, azure_service_bus_delivery_count=0, azure_service_bus_state=ACTIVE, id=fd556d60-a851-e39c-bca0-59e90d83722d, contentType=application/json, azure_service_bus_locked_until=2023-05-26T09:04:47.840Z, timestamp=1685091827953}]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65)
...
Caused by: java.lang.IllegalStateException
at com.subscriptions.Consumer.lambda$consume$0(Consumer.java:70)
...
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:588)
...
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.handleMessageInternal(FunctionConfiguration.java:623)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
... 25 more
2023-05-26 12:03:47.955 ERROR 52176 --- [undedElastic-12] .s.i.s.i.ServiceBusInboundChannelAdapter : Error in the operation USER_CALLBACK occurred on entity service/subscriptions/Subscription. Error: {}
...
我注意到,设置spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete=false
会使消息正确传递,因为我手动检查点消息。问题可能与这个设置有关吗?
英文:
I have a simple Spring Cloud Azure Service Bus consumer defined as simply as follows, following the official documentation:
@Bean
public Consumer<Message<byte[]>> consume() {
return message -> someProcessing(message);
}
The default policy, if the spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.requeue-rejected
flag is false
, is that messages that fail during consummation are requeued on the main queue for MaxDeliveryCount
times, before going to the DLQ. I tested this behaviour and it consistently works - if, for example, I update the code to
@Bean
public Consumer<Message<byte[]>> consume() {
return message -> {
someProcessing(message);
throw new IllegalStateException();
}
}
the message gets requeued as expected.
If I bump the number of consecutive calls, however, by setting spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.max-concurrent-calls=2
, the behaviour becomes undefined. Some times it abandons the messages altogether, other times it requeues them in the main queue. I don't understand why the behaviour is not consistent, i.e., the messages are always requeued. Part of the stack trace is as following.
2023-05-26 12:03:47.955 ERROR 52176 --- [undedElastic-12] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@71c11ee9]; nested exception is java.lang.IllegalStateException, failedMessage=GenericMessage [payload=byte[1948], headers={azure_service_bus_expires_at=2023-06-11T11:29:47.566Z, OwnedBy=owner, azure_service_bus_received_message_context=com.azure.messaging.servicebus.ServiceBusReceivedMessageContext@780ad3f2, azure_service_bus_message_id=ede22e431e0642ee9d3be263b3989c77, azure_service_bus_enqueued_sequence_number=132449, azure_service_bus_enqueued_time=2023-05-12T11:29:47.566Z, azure_service_bus_lock_token=c2167e9a-b6f5-4aed-b390-0f9714caf2a9, azure_service_bus_sequence_number=6969, azure_service_bus_time_to_live=PT720H, azure_service_bus_delivery_count=0, azure_service_bus_state=ACTIVE, id=fd556d60-a851-e39c-bca0-59e90d83722d, contentType=application/json, azure_service_bus_locked_until=2023-05-26T09:04:47.840Z, timestamp=1685091827953}]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:216)
at com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter.access$600(ServiceBusInboundChannelAdapter.java:73)
at com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter$IntegrationRecordMessageListener.onMessage(ServiceBusInboundChannelAdapter.java:201)
at com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter$IntegrationRecordMessageListener.onMessage(ServiceBusInboundChannelAdapter.java:186)
at com.azure.messaging.servicebus.ServiceBusProcessorClient$1.onNext(ServiceBusProcessorClient.java:383)
at com.azure.messaging.servicebus.ServiceBusProcessorClient$1.onNext(ServiceBusProcessorClient.java:362)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.IllegalStateException
at com.subscriptions.Consumer.lambda$consume$0(Consumer.java:70)
at com.helper.AnonymousAuthenticationRunner.runAnonymously(AnonymousAuthenticationRunner.java:24)
at com.subscriptions.Consumer.lambda$consume$1(Consumer.java:50)
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.invokeConsumer(SimpleFunctionRegistry.java:1007)
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:742)
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:588)
at org.springframework.cloud.stream.function.PartitionAwareFunctionWrapper.apply(PartitionAwareFunctionWrapper.java:84)
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionWrapper.apply(FunctionConfiguration.java:791)
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.handleMessageInternal(FunctionConfiguration.java:623)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
... 25 more
2023-05-26 12:03:47.955 ERROR 52176 --- [undedElastic-12] .s.i.s.i.ServiceBusInboundChannelAdapter : Error in the operation USER_CALLBACK occurred on entity service/subscriptions/Subscription. Error: {}
com.azure.messaging.servicebus.ServiceBusException: failed to send Message to channel 'service.Subscription.errors'; nested exception is com.azure.messaging.servicebus.ServiceBusException: A disposition requested earlier is waiting for the broker's ack; a new disposition request is not allowed.
at com.azure.messaging.servicebus.ServiceBusProcessorClient$1.onNext(ServiceBusProcessorClient.java:387)
at com.azure.messaging.servicebus.ServiceBusProcessorClient$1.onNext(ServiceBusProcessorClient.java:362)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'service.Subscription.errors'; nested exception is com.azure.messaging.servicebus.ServiceBusException: A disposition requested earlier is waiting for the broker's ack; a new disposition request is not allowed.
at org.springframework.integration.support.utils.IntegrationUtils.wrapInDeliveryExceptionIfNecessary(IntegrationUtils.java:166)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:339)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendErrorMessageIfNecessary(MessageProducerSupport.java:262)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:219)
at com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter.access$600(ServiceBusInboundChannelAdapter.java:73)
at com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter$IntegrationRecordMessageListener.onMessage(ServiceBusInboundChannelAdapter.java:201)
at com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter$IntegrationRecordMessageListener.onMessage(ServiceBusInboundChannelAdapter.java:186)
at com.azure.messaging.servicebus.ServiceBusProcessorClient$1.onNext(ServiceBusProcessorClient.java:383)
... 10 common frames omitted
Caused by: com.azure.messaging.servicebus.ServiceBusException: A disposition requested earlier is waiting for the broker's ack; a new disposition request is not allowed.
at com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient.lambda$updateDisposition$52(ServiceBusReceiverAsyncClient.java:1480)
at reactor.core.publisher.Mono.lambda$onErrorMap$31(Mono.java:3733)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:278)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:106)
at reactor.core.publisher.Operators.error(Operators.java:198)
at reactor.core.publisher.MonoError.subscribe(MonoError.java:53)
at reactor.core.publisher.Mono.subscribe(Mono.java:4400)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)
at reactor.core.publisher.Operators$MonoSubscriber.onError(Operators.java:1863)
at reactor.core.publisher.MonoCacheTime$CoordinatorSubscriber.signalCached(MonoCacheTime.java:340)
at reactor.core.publisher.MonoCacheTime$CoordinatorSubscriber.onError(MonoCacheTime.java:363)
at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:201)
at com.azure.core.amqp.implementation.handler.ReceiverUnsettledDeliveries$DispositionWork.onComplete(ReceiverUnsettledDeliveries.java:736)
at com.azure.core.amqp.implementation.handler.ReceiverUnsettledDeliveries.lambda$sendDispositionImpl$3(ReceiverUnsettledDeliveries.java:334)
at com.azure.core.amqp.implementation.handler.DispatchHandler.onTimerTask(DispatchHandler.java:32)
at com.azure.core.amqp.implementation.ReactorDispatcher$WorkScheduler.run(ReactorDispatcher.java:207)
at org.apache.qpid.proton.reactor.impl.SelectableImpl.readable(SelectableImpl.java:118)
at org.apache.qpid.proton.reactor.impl.IOHandler.handleQuiesced(IOHandler.java:61)
at org.apache.qpid.proton.reactor.impl.IOHandler.onUnhandled(IOHandler.java:390)
at org.apache.qpid.proton.engine.BaseHandler.onReactorQuiesced(BaseHandler.java:87)
at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:206)
at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324)
at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:292)
at com.azure.core.amqp.implementation.ReactorExecutor.run(ReactorExecutor.java:91)
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
... 5 common frames omitted
Suppressed: java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
at reactor.core.publisher.Mono.block(Mono.java:1707)
at com.azure.messaging.servicebus.ServiceBusReceivedMessageContext.abandon(ServiceBusReceivedMessageContext.java:65)
at com.azure.spring.cloud.stream.binder.servicebus.ServiceBusMessageChannelBinder.abandon(ServiceBusMessageChannelBinder.java:230)
at com.azure.spring.cloud.stream.binder.servicebus.ServiceBusMessageChannelBinder.lambda$getErrorMessageHandler$1(ServiceBusMessageChannelBinder.java:189)
at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:222)
at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:178)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendErrorMessageIfNecessary(MessageProducerSupport.java:262)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:219)
at com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter.access$600(ServiceBusInboundChannelAdapter.java:73)
at com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter$IntegrationRecordMessageListener.onMessage(ServiceBusInboundChannelAdapter.java:201)
at com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter$IntegrationRecordMessageListener.onMessage(ServiceBusInboundChannelAdapter.java:186)
at com.azure.messaging.servicebus.ServiceBusProcessorClient$1.onNext(ServiceBusProcessorClient.java:383)
at com.azure.messaging.servicebus.ServiceBusProcessorClient$1.onNext(ServiceBusProcessorClient.java:362)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
... 5 common frames omitted
Caused by: com.azure.core.amqp.exception.AmqpException: A disposition requested earlier is waiting for the broker's ack; a new disposition request is not allowed.
... 19 common frames omitted
I noticed that setting spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete=false
makes messages be delivered correctly, as I am checkpointing the messages manually. Could the issue be from somewhere around that?
答案1
得分: 1
所以,看起来你有并发消息?如果多个消息同时失败,可能很难理解问题并修复它。
我会尝试为你解决这个问题...但我不喜欢Java。
问题1. 你尝试过 messageContent.abandon()
吗?接下来,我会找一个关于并发流程的视频,解释一下。我对逻辑的理解有限,但它有助于提高速度和规模。对于人类来说,它在提供进程顺序方面效果不佳,我理解它就像在Python编程中的 set()
对比数组。并发会按照一种可以增加数据处理速度的顺序将负载分散到多个CPU上,它通过不像Python中的 set()
函数那样使用数据结构来避免自我重复来实现。
(这是我用来研究问题的视频链接:https://www.youtube.com/watch?v=0LQPNjjfolk)
我通过使用 set()
而不是数组来加速遍历数据。这是因为只有唯一的值可以进入 set()
,而真假检查比索引数据更快。在某些语言中,它还可以节省内存。
如果你正在处理许多并发事件,你会想要设计一个类似 set()
的方法。
我对服务总线消费者的理解是,当你使用自动完成并将其设置为true时,如果发生异常,消息将被视为失败并自动放弃并重新排队。
系统需要经纪人确认。在并发处理期间,它可能永远无法得到。记录这种类型的异常在IT系统的网络中是必不可少的。作为一名前支持工程师,我记得很多艰难的日子都在分析单向数据流和数据包捕获方面:(
如果是这样的话,你应该尝试 messageContent.abandon()
。这将允许你在并发消息处理时进行进一步处理。
我建议你查看服务总线的最佳实践架构:
我还建议计划测试你的系统。通过思考如何测试你的系统,你可以更好地规划、研究和解决内部的新问题:)。我通过查看Java和服务总线架构的最佳实践(我分享的视频),然后查看Microsoft服务总线文档,然后思考Python中的逻辑(因为我不喜欢Java :)),找到了一个解决方案。
我是微软的Azure云架构师的第一年。感谢你选择了微软,请让我知道你的反馈。如果你有任何问题,我将更新这篇文章。
更新:
当我继续阅读文档时,看起来我们可能为你的用例找到了解决方案:PeekLock。
https://learn.microsoft.com/en-us/azure/architecture/patterns/competing-consumers
请告诉我上面的内容是否适用于你的用例。
最终更新(我在Java中找到了解决方案:):
根据MS Java SDK文档,这里是在Java中使用PeekLock的示例代码:
文档: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/servicebus/azure-m
代码:
发送消息
ServiceBusSenderClient sender = new ServiceBusClientBuilder()
.connectionString("<<连接到服务总线命名空间的连接字符串>>")
.sender()
.queueName("<<队列名称>>")
.buildClient();
List<ServiceBusMessage> messages = Arrays.asList(
new ServiceBusMessage("Hello world").setMessageId("1"),
new ServiceBusMessage("Bonjour").setMessageId("2"));
sender.sendMessages(messages);
// 当你使用完发送器后,关闭它。
sender.close();
接收消息:
// 处理以PeekLock模式接收的单个消息的示例代码。
Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
final ServiceBusReceivedMessage message = context.getMessage();
// 随机完成或放弃每个消息。理想情况下,在实际情况下,如果处理消息的业务逻辑达到了不需要Service Bus重新传递相同消息的期望状态,那么应该调用context.complete(),否则调用context.abandon()。
final boolean success = Math.random() < 0.5;
if (success) {
try {
context.complete();
} catch (Exception completionError) {
System.out.printf("消息 %s 完成失败\n", message.getMessageId());
completionError.printStackTrace();
}
} else {
try {
context.abandon();
} catch (Exception abandonError) {
System.out.printf("消息 %s 放弃失败\n", message.getMessageId());
abandonError.printStackTrace();
}
}
};
// 如果发生错误,将调用的示例代码
Consumer<ServiceBusErrorContext> processError = errorContext -> {
System.err.println("接收消息时发生错误:" + errorContext.getException());
};
// 通过构建器和其子构建器创建处理器客户端
ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
.connectionString("<<连接到服务总线命名空间的连接字符串>>")
.processor()
.queueName("<<队列名称>>")
.receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
.disableAutoComplete() // 确保明确选择手动结算(例如完成、放弃)。
.processMessage(processMessage)
.processError(processError)
.disableAutoComplete()
.buildProcessorClient();
// 在后台启动处理器并立即返回
processorClient.start();
上面是Java SDK的示例。关于Spring Java和PeekLock,这里有一个相关的问题:
https://stackoverflow.com/questions/63603152
英文:
so, it looks like you have concurrent messages? if a few fail at the same time, it can be hard to understand the issue and fix it.
I'm going to give this a try for you...but I hate java.
Question 1. have you tried messageContent.abandon()? next, i would find a video breaking down concurrent flows. My limited understanding of the logic is it helps with speed and scale. It is weak at providing and order to the process for a human lol. I understand it like set() vs array in programing python (the language I know best). concurrency will split the load across CPU's in an order that will increase the speed of processing the data over all. it does this by not repeating its self using a data structure like or better than the set() function in python.
(here is a video i watch to research the problem: https://www.youtube.com/watch?v=0LQPNjjfolk)
I speed up looping through data by using set() over arrays. This is because only unique values can go in set() and a true or false check is quicker than indexing the data. It also saves memory in some languages too.
if you are processing many concurrent events, you would want to design an approach similar to set().
my understanding of service bus consumers, when you use auto-complete and its set to true if an exception is thrown the message is considered failed and is automatically abandon and requeued.
the system needs a broker acknowledgment. it may never get it during concurrent processing. Logging this kind of exception is a must in Networking for IT systems. as an ex-support engineer i remember many hard days analysis Unidirectional data flow with packet capture:(
If this is so, you should try messageContent.abandon(). This will let you further process even with concurrent message processing.
I would recommend reviewing best practice architecture for service bus:
I would also recommend planning testing your system. by thinking about how you will test your system, you can better, plan, research and solve new issues in house:). I got to a solution by review best practices for java and service bus architecture (the video i shared), then i looked at the Microsoft documentation for service bus, then I thought of the logic in python (because i hate java
I am in my first year at Microsoft as an Azure Cloud Architect. Thank you for choosing Microsoft and please let me know your feedback. I will update this post if you have any questions.
Update:
As i contune to read through the documentation it looks like we may have a solution for your use case: PeekLock.
https://learn.microsoft.com/en-us/azure/architecture/patterns/competing-consumers
Please tell me if the above fit your use case.
Final update (I found you a solution in java :):
according to the MS Java SDK documentation here is example code for using PeekLock in java:
docs: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/servicebus/azure-m
code:
send messages
ServiceBusSenderClient sender = new ServiceBusClientBuilder()
.connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
.sender()
.queueName("<< QUEUE NAME >>")
.buildClient();
List<ServiceBusMessage> messages = Arrays.asList(
new ServiceBusMessage("Hello world").setMessageId("1"),
new ServiceBusMessage("Bonjour").setMessageId("2"));
sender.sendMessages(messages);
// When you are done using the sender, dispose of it.
sender. Close();
Receive messages:
// Sample code that processes a single message which is received in PeekLock mode.
Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
final ServiceBusReceivedMessage message = context.getMessage();
// Randomly complete or abandon each message. Ideally, in real-world scenarios, if the business logic
// handling message reaches desired state such that it doesn't require Service Bus to redeliver
// the same message, then context.complete() should be called otherwise context.abandon().
final boolean success = Math.random() < 0.5;
if (success) {
try {
context.complete();
} catch (Exception completionError) {
System.out.printf("Completion of the message %s failed\n", message.getMessageId());
completionError.printStackTrace();
}
} else {
try {
context.abandon();
} catch (Exception abandonError) {
System.out.printf("Abandoning of the message %s failed\n", message.getMessageId());
abandonError.printStackTrace();
}
}
};
// Sample code that gets called if there's an error
Consumer<ServiceBusErrorContext> processError = errorContext -> {
System.err.println("Error occurred while receiving message: " + errorContext.getException());
};
// create the processor client via the builder and its sub-builder
ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
.connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
.processor()
.queueName("<< QUEUE NAME >>")
.receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
.disableAutoComplete() // Make sure to explicitly opt in to manual settlement (e.g. complete, abandon).
.processMessage(processMessage)
.processError(processError)
.disableAutoComplete()
.buildProcessorClient();
// Starts the processor in the background and returns immediately
processorClient.start();
The above is an example for the java SDK. Here is a questions in relation to Spring Java and PeekLock:
https://stackoverflow.com/questions/63603152/for-the-azure-servicebus-jms-spring-boot-starter-is-there-a-properties-setting - see here about PeekLock with spring java
答案2
得分: 1
可能的解决方案
考虑到这个页面上的示例代码:https://learn.microsoft.com/en-us/azure/developer/java/spring-framework/configure-spring-cloud-stream-binder-java-app-with-service-bus?tabs=use-a-service-bus-queue
以及这个接口:https://azuresdkdocs.blob.core.windows.net/$web/java/azure-spring-integration-core/2.0.0-beta.1/index.html?com/azure/spring/integration/core/api/Checkpointer.html
我认为你应该像这样做:
public Consumer<Message<String>> consume() {
return message->{
Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
LOGGER.info("New message received: '{}'", message.getPayload());
checkpointer.success()
.doOnSuccess(s->someProcessing(message))
.doOnError(e-> checkpointer.falure())
.block();
};
}
旧回答
根据这里的 Javadoc:https://github.com/Azure/azure-sdk-for-java/blob/fd4ed4402038bf529d02642ed037a4669b396f1a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/models/ServiceBusReceiveMode.java#L50
当您使用默认的“ReceiveMode”时,您应该告诉服务如果您未能处理消息,您希望它执行什么操作。
所以,我认为@BlackFox是正确的,即使他使用的API与您的不同。您不能只是抛出异常,您必须告诉Azure总线应该如何处理消息。在您告诉它该做什么之前,消息将被锁定。
英文:
Probable solution
considering the example code on this page: https://learn.microsoft.com/en-us/azure/developer/java/spring-framework/configure-spring-cloud-stream-binder-java-app-with-service-bus?tabs=use-a-service-bus-queue
and this interface : https://azuresdkdocs.blob.core.windows.net/$web/java/azure-spring-integration-core/2.0.0-beta.1/index.html?com/azure/spring/integration/core/api/Checkpointer.html
I think you're supposed to do something like this :
public Consumer<Message<String>> consume() {
return message->{
Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
LOGGER.info("New message received: '{}'", message.getPayload());
checkpointer.success()
.doOnSuccess(s->someProcessing(message))
.doOnError(e-> checkpointer.falure())
.block();
};
}
Old answer
According to the Javadoc there : https://github.com/Azure/azure-sdk-for-java/blob/fd4ed4402038bf529d02642ed037a4669b396f1a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/models/ServiceBusReceiveMode.java#L50
when you use the default "ReceiveMode", you are supposed to let know the service what you want it to do with the message if you fail to process it.
So, I think @BlackFox is right even if the API he's using is not the same as yours. You cannot just throw an exception, you must tell to the azure bus what to do with the message. Until you tell it what to do, the message is locked.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论