ServiceBusSessionReceiverAsyncClient 在关闭时抛出 IllegalStateException。

huangapple go评论77阅读模式
英文:

ServiceBusSessionReceiverAsyncClient throwing IllegalStateException During Close

问题

使用ServicebusSessionReceiverAsyncClient从Service Bus队列接收单个消息时,会引发IllegalStateException异常。错误消息提到尝试向已关闭的连接添加信用额度。

我正在使用take(1)和next()将Flux转换为单一结果的Mono。文档中表示在流上使用take(1)会在第一个结果后关闭流,这正是我想要的。

我接收器的代码:

private <T extends IWocTransaction> Mono<Optional<T>> responseAsync(String transactionId, Class<T> clazz) {

        var asyncClient = sbClientBuilder.connectionString(sbConnectionString)
                .sessionReceiver()
                .queueName("my-callback-queue")
                .receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
                .buildAsyncClient();

        var msgStream = Flux.usingWhen(asyncClient.acceptSession(transactionId),
                receiver -> receiver.receiveMessages(),
                receiver -> Mono.fromRunnable(receiver::close)
        );

        return Mono.from(msgStream
                        .timeout(timeout)
                        .take(1)
                        .next()
                ).map(message -> {
                    var json = message.getBody().toString();
                    try {
                        var val = objectMapper.readValue(json, clazz);
                        return val != null ? Optional.of(val) : Optional.<T>empty();
                    } catch (Exception e) {
                        log.error("Error deserializing response from string {}", json, e);
                        return Optional.<T>empty();
                    }
                })
                .doOnError(t -> {
                    if (t instanceof TimeoutException) {
                        log.error("Timeout error waiting on API callback {}", kv("ApiTimeout", timeout.toString()), t);
                    } else {
                        log.error("Error waiting for async callback", t);
                    }
                }).onErrorReturn(Optional.empty());
    }

这段代码运行正常,但在每次运行时都会收到这个异常:

13:46:27.122 [io-executor-thread-1] INFO  c.a.m.s.ServiceBusClientBuilder - {"az.sdk.message":"Closing a dependent client.","numberOfOpenClients":1}
13:46:27.127 [receiver-0-1] INFO  c.a.m.s.ServiceBusSessionReceiver - {"az.sdk.message":"There is no lock token.","sessionId":"adfadsr","messageId":"fb70e81e4d304b8fb34092440243554a"}
13:46:27.138 [receiver-0-1] INFO  c.a.m.s.ServiceBusReceiverAsyncClient - Removing receiver links.
13:46:27.167 [receiver-0-1] ERROR c.a.c.a.i.ReactorReceiver - {"az.sdk.message":"Cannot add credits to closed link: adfadsr","exception":"Cannot add credits to closed link: adfadsr","connectionId":"MF_57a511_1680201985206","entityPath":"woc-callback-queue","linkName":"adfadsr"}
13:46:27.175 [receiver-0-1] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: Cannot add credits to closed link: adfadsr
Caused by: java.lang.IllegalStateException: Cannot add credits to closed link: adfadsr
...
13:46:27.194 [reactor-executor-1] INFO  c.a.c.a.i.handler.ReceiveLinkHandler - {"az.sdk.message":"onLinkRemoteClose","connectionId":"MF_57a511_1680201985206","errorCondition":null,"errorDescription":null,"linkName":"adfadsr","entityPath":"woc-callback-queue"}
13:46:27.198 [reactor-executor-1] INFO  c.a.c.a.i.ReactorSession - {"az.sdk.message":"Complete. Removing receive link.","connectionId":"MF_57a511_1680201985206","linkName":"adfadsr","entityPath":"woc-callback-queue"}
13:46:27.199 [reactor-executor-1] INFO  c.a.c.a.i.handler.ReceiveLinkHandler - {"az.sdk.message":"onLinkFinal","connectionId":"MF_57a511_1680201985206","linkName":"adfadsr","entityPath":"woc-callback-queue"}

你可以如何防止IllegalStateException被抛出或至少处理它?

英文:

When using the ServicebusSessionReceiverAsyncClient to receive a single message from a Service Bus Queue, an IllegalStateException is thrown. The message mentions trying to add credits to an already closed connection.

I'm using take(1) and next() to transform the Flux into a single result Mono. The documentation says that using take(1) on the stream will close the stream after the first result, which is what I'm looking to do.

My receiver code:

private &lt;T extends IWocTransaction&gt; Mono&lt;Optional&lt;T&gt;&gt; responseAsync(String transactionId, Class&lt;T&gt; clazz) {

        var asyncClient = sbClientBuilder.connectionString(sbConnectionString)
                .sessionReceiver()
                .queueName(&quot;my-callback-queue&quot;)
                .receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
                .buildAsyncClient();

        var msgStream = Flux.usingWhen(asyncClient.acceptSession(transactionId),
                receiver -&gt; receiver.receiveMessages(),
                receiver -&gt; Mono.fromRunnable(receiver::close)
        );

        return Mono.from(msgStream

                        .timeout(timeout)
                        .take(1)
                        .next()

                ).map(message -&gt; {

                    var json = message.getBody().toString();

                    try {
                        var val = objectMapper.readValue(json, clazz);
                        return val != null ? Optional.of(val) : Optional.&lt;T&gt;empty();
                    } catch (Exception e) {
                        log.error(&quot;Error deserializing response from string {}&quot;, json, e);
                        return Optional.&lt;T&gt;empty();
                    }
                })
                .doOnError(t -&gt; {
                    if (t instanceof TimeoutException) {
                        log.error(&quot;Timeout error waiting on API callback {}&quot;, kv(&quot;ApiTimeout&quot;, timeout.toString()), t);
                    } else {
                        log.error(&quot;Error waiting for async callback&quot;, t);
                    }
                }).onErrorReturn(Optional.empty());
    }

This code works fine but I'm getting this exception on every run:

13:46:27.122 [io-executor-thread-1] INFO  c.a.m.s.ServiceBusClientBuilder - {&quot;az.sdk.message&quot;:&quot;Closing a dependent client.&quot;,&quot;numberOfOpenClients&quot;:1}
13:46:27.127 [receiver-0-1] INFO  c.a.m.s.ServiceBusSessionReceiver - {&quot;az.sdk.message&quot;:&quot;There is no lock token.&quot;,&quot;sessionId&quot;:&quot;adfadsr&quot;,&quot;messageId&quot;:&quot;fb70e81e4d304b8fb34092440243554a&quot;}
13:46:27.138 [receiver-0-1] INFO  c.a.m.s.ServiceBusReceiverAsyncClient - Removing receiver links.
13:46:27.167 [receiver-0-1] ERROR c.a.c.a.i.ReactorReceiver - {&quot;az.sdk.message&quot;:&quot;Cannot add credits to closed link: adfadsr&quot;,&quot;exception&quot;:&quot;Cannot add credits to closed link: adfadsr&quot;,&quot;connectionId&quot;:&quot;MF_57a511_1680201985206&quot;,&quot;entityPath&quot;:&quot;woc-callback-queue&quot;,&quot;linkName&quot;:&quot;adfadsr&quot;}
13:46:27.175 [receiver-0-1] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
**reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: Cannot add credits to closed link: adfadsr**
Caused by: java.lang.IllegalStateException: Cannot add credits to closed link: adfadsr
at com.azure.core.amqp.implementation.ReactorReceiver.addCredits(ReactorReceiver.java:227)
at com.azure.messaging.servicebus.ServiceBusSessionReceiver.lambda$new$2(ServiceBusSessionReceiver.java:92)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:138)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.request(FluxHide.java:152)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:447)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527)
at reactor.core.scheduler.ImmediateScheduler$ImmediateSchedulerWorker.schedule(ImmediateScheduler.java:84)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.trySchedule(FluxPublishOn.java:312)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.onNext(FluxPublishOn.java:237)
at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
at io.micronaut.reactive.reactor.instrument.ReactorSubscriber.onNext(ReactorSubscriber.java:57)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440)
Caused by: java.lang.IllegalStateException: Cannot add credits to closed link: adfadsr
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527)
at io.micronaut.reactive.reactor.instrument.ReactorInstrumentation.lambda$init$0(ReactorInstrumentation.java:62)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
13:46:27.194 [reactor-executor-1] INFO  c.a.c.a.i.handler.ReceiveLinkHandler - {&quot;az.sdk.message&quot;:&quot;onLinkRemoteClose&quot;,&quot;connectionId&quot;:&quot;MF_57a511_1680201985206&quot;,&quot;errorCondition&quot;:null,&quot;errorDescription&quot;:null,&quot;linkName&quot;:&quot;adfadsr&quot;,&quot;entityPath&quot;:&quot;woc-callback-queue&quot;}
13:46:27.198 [reactor-executor-1] INFO  c.a.c.a.i.ReactorSession - {&quot;az.sdk.message&quot;:&quot;Complete. Removing receive link.&quot;,&quot;connectionId&quot;:&quot;MF_57a511_1680201985206&quot;,&quot;linkName&quot;:&quot;adfadsr&quot;,&quot;entityPath&quot;:&quot;woc-callback-queue&quot;}
13:46:27.199 [reactor-executor-1] INFO  c.a.c.a.i.handler.ReceiveLinkHandler - {&quot;az.sdk.message&quot;:&quot;onLinkFinal&quot;,&quot;connectionId&quot;:&quot;MF_57a511_1680201985206&quot;,&quot;linkName&quot;:&quot;adfadsr&quot;,&quot;entityPath&quot;:&quot;woc-callback-queue&quot;}

How can I prevent the IllegalStateException from being thrown or at least handle it?

答案1

得分: 1

IllegalStateException: 无法向关闭的连接添加信用 "不应抛出" 到应用程序,而应只记录。这有时是因为有几个线程同时运行。一个是非阻塞的IO线程(处理消息帧,通过流帧发送信用),第二个是将消息传递给应用程序的Worker线程。还有第三个,应用程序的handler线程,在该线程上调用应用程序的 "responseAsync"。

发生的情况是当 "responseAsync" 从任何线程关闭客户端时,仍然有可能后台的IO线程正在接收关闭请求时执行一些工作。当IO线程在发送流帧时,客户端的其他部分正在关闭时,此错误将出现在日志中。可以忽略此日志条目。

看起来应用程序的设计是为每个请求创建并释放客户端。这意味着应用程序在每个请求上创建和关闭TCP(到服务总线)连接,这可能会很重。

英文:

this IllegalStateException: Cannot add credits to closed link "should not be thrown" to the application but "should only be logged".

It sometimes happens because there are a couple of threads running concurrently. One is a non-blocking IO_thread (handling message frames, sending credit through flow frames), and the second is the Worker_thread that delivers the message to the application. There is a third one, the application's handler_thread, on which the app's responseAsync is invoked.

What happens is when the responseAsync closes the client from [any]_thread, it is still possible that the IO_thread in the background is doing some work while it receives the closure request. The error appears on the log when IO_thread is in the middle of sending flow frame when other parts of the client are getting shutdown. This log entry be ignored.

It looks like the application's design is to create and dispose of clients for every request. It means the application creates and closes TCP (to Service Bus) connections on every request, which can be heavy.

huangapple
  • 本文由 发表于 2023年3月31日 03:32:06
  • 转载请务必保留本文链接:https://go.coder-hub.com/75892274.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定