英文:
Why am I getting onComplete signal when an exception is encountered in Spring Cloud Stream reactive consumer?
问题
我正在使用Spring Reactor与Spring Cloud Stream(GCP Pub/Sub Binder),并遇到错误处理问题。我能够通过一个非常简单的示例来重现此问题:
@Bean
public Function<Flux<String>, Mono<Void>> consumer() {
return flux -> flux
.doOnNext(msg -> log.info("New message received: {}", msg))
.map(msg -> {
if (true) {
throw new RuntimeException("exception encountered!");
}
return msg;
})
.doOnError(throwable -> log.error("Failed to consume message", throwable))
.then();
}
我期望的行为是看到“Failed to consume message”打印出来,然而实际情况似乎并非如此。当在链中添加.log()
调用以查看onNext
/onComplete
信号时,我预期会看到onError
信号。
我的实际代码类似于这样:
@Bean
public Function<Flux<CustomMessage>, Mono<Void>> consumer(MyService myService) {
return flux -> flux
.doOnNext(msg -> log.info("New message received: {}", msg))
.flatMap(myService::processMessage) // 异常发生在这里
.doOnError(throwable -> log.error("Failed to consume message", throwable))
.then();
}
我注意到在我的服务类中,我试图对我的Reactor发布者进行错误处理。然而,在使用Spring Cloud Stream时,onError
信号不会发生。如果我在单元测试中简单地调用我的服务,例如myService.processMessage(msg)
并模拟异常,我的反应链将正确传播错误信号。
当我连接到Spring Cloud Stream时,似乎出现了问题。我想知道Spring Cloud Function/Stream是否进行了任何全局错误包装?
在我非平凡的代码中,我确实注意到以下错误消息,这可能与我未收到错误信号有关:
ERROR --- o.s.c.s.b.AbstractBinder : Failed to process the following content which will be dropped: ...
为了进一步混淆,如果我将我的Spring Cloud Stream绑定切换到非反应式实现,我能够在我的反应链中获得onError
信号,如下所示:
@Bean
public Consumer<CustomMessage> consumer(MyService myService) {
return customMessage -> Mono.just(customMessage)
.doOnNext(msg -> log.info("New message received: {}", msg))
.flatMap(myService::processMessage) // 异常发生在这里
.doOnError(throwable -> log.error("Failed to consume message", throwable)) // 这次成功打印
.subscribe();
}
英文:
I'm using Spring Reactor with Spring Cloud Stream (GCP Pub/Sub Binder) and running into error handling issues. I'm able to reproduce the issue with a very simple example:
@Bean
public Function<Flux<String>, Mono<Void>> consumer() {
return flux -> flux
.doOnNext(msg -> log.info("New message received: {}", msg))
.map(msg -> {
if (true) {
throw new RuntimeException("exception encountered!");
}
return msg;
})
.doOnError(throwable -> log.error("Failed to consume message", throwable))
.then();
}
The behavior I expect is to see "Failed to consume message" print, however, that's not what appears to happen. When adding a .log()
call to the chain I see onNext
/onComplete
signals, I would expect to see onError
signals.
My actual code looks something like this:
@Bean
public Function<Flux<CustomMessage>, Mono<Void>> consumer(MyService myService) {
return flux -> flux
.doOnNext(msg -> log.info("New message received: {}", msg))
.flatMap(myService::processMessage) // exception happens deep in here
.doOnError(throwable -> log.error("Failed to consume message", throwable))
.then();
}
I noticed that deep in my service class I was attempting to do error handling on my Reactor publishers. However, the onError
signal wouldn't occur when using Spring Cloud Stream. If I simply invoked my service as such myService.processMessage(msg)
in a unit test and mocked the exception, my reactive chain would propagate error signals correctly.
It seems to be an issue when I hook in to Spring Cloud Stream. I'm wondering if Spring Cloud Function/Stream is doing any global error wrapping?
In my non-trivial code I do notice this error message that may have something to do with why I'm not getting error signals?
ERROR --- onfiguration$FunctionToDestinationBinder : Failed to process the following content which will be dropped: ...
To further my confusion, I am able to get the onError
signal in my reactive chain if I switch my Spring Cloud Stream binding to the non-reactive implementation as so:
@Bean
public Consumer<CustomMessage> consumer(MyService myService) {
return customMessage -> Mono.just(customMessage)
.doOnNext(msg -> log.info("New message received: {}", msg))
.flatMap(myService::processMessage) // exception happens deep in here
.doOnError(throwable -> log.error("Failed to consume message", throwable)) // prints successfully this time
.subscribe();
}
答案1
得分: 2
这是我从自己的调查中汇总的信息,也许这对其他人有所帮助。需要预先警告的是,我可能没有使用正确的“Spring Reactor语言”,但这是我解决问题的方式...
在 Hoxton.SR5
中,一个onErrorContinue
已经包含在反应式绑定中,用于管理flux订阅。onErrorContinue
的问题在于,它通过在失败的操作符(如果支持)上应用BiConsumer函数来影响_上游_操作符。
这意味着当我们的 map
/flatMap
操作符中发生错误时,onErrorContinue
的 BiConsumer 会触发并修改下游信号,使其要么是 onComplete()
(Mono<T>
),要么是 request(...)
(如果它从 Flux<T>
请求了新元素)。这导致我们的 doOnError(...)
操作符不会执行,因为没有 onError()
信号。
最终,SCS团队决定移除这个错误处理包装。Hoxton.SR6
不再具有这个 onErrorContinue
。然而,这意味着异常传播到SCS绑定会导致Flux订阅被切断。随后的消息将无法路由,因为没有订阅者。
这种错误处理已传递给客户端,我们在_inner publisher_中添加了一个 onErrorResume
操作符,以有效地丢弃错误信号。当在 myService::processMessage
publisher 中遇到错误时,onErrorResume
会切换到传递的回退 publisher,并从操作符链中的那一点继续。在我们的情况下,这个回退 publisher 简单地返回 Mono.empty()
,这使我们能够丢弃错误信号,同时仍允许内部错误处理机制运行,而且不会影响外部源 publisher。
onErrorResume
示例/解释
上面的技术可以用一个非常简单的例子来说明。
Flux.just(1, 2, 3)
.flatMap(i -> i == 2
? Mono.error(new RuntimeException("error"))
: Mono.just(i))
.onErrorResume(t -> Flux.just(4, 5, 6))
.doOnNext(i -> log.info("Element: {}", i))
.subscribe();
上面的 Flux<Integer>
将输出以下内容:
Element: 1
Element: 4
Element: 5
Element: 6
由于在元素 2
处遇到错误,onErrorResume
回退生效,新的 publisher 变为 Flux.just(4, 5, 6)
,从回退处 恢复。在我们的情况下,我们不想影响源 publisher(即 Flux.just(1, 2, 3)
)。我们只想丢弃错误元素(2
),并继续到下一个元素(3
)。
我们不能简单地将 Flux.just(4, 5, 6)
更改为 Flux.empty()
或 Mono.empty()
,如下所示:
Flux.just(1, 2, 3)
.flatMap(i -> i == 2
? Mono.error(new RuntimeException("error"))
: Mono.just(i))
.onErrorResume(t -> Mono.empty())
.doOnNext(i -> log.info("Element: {}", i))
.subscribe();
这将导致输出以下内容:
Element: 1
这是因为 onErrorResume
已将上游 publishers 替换为回退 publisher(即 Mono.empty()
),并从该点 恢复。
为了实现我们期望的输出:
Element: 1
Element: 3
我们必须将 onErrorResume
操作符放在 flatMap
的内部 publisher 上:
public Mono<Integer> func(int i) {
return i == 2 ? Mono.error(new RuntimeException("error")) : Mono.just(i);
}
Flux.just(1, 2, 3)
.flatMap(i -> func(i)
onErrorResume(t -> Mono.empty()))
.doOnNext(i -> log.info("Element: {}", i))
.subscribe();
现在,onErrorResume
仅影响由 func(i)
返回的内部 publisher。如果在 func(i)
中的操作符中发生错误,onErrorResume
会回退到 Mono.empty()
,有效地完成 Mono<T>
而不会崩溃。这还允许在回退运行之前应用于 func(i)
中的错误处理操作符(例如 doOnError
)。这是因为它不像 onErrorContinue
那样影响上游操作符,并在错误位置改变下一个信号。
最终解决方案
重新使用我问题中的代码片段,我将我的Spring Cloud版本升级到了 Hoxton.SR6
,并将代码更改为类似以下的形式:
@Bean
public Function<Flux<CustomMessage>, Mono<Void>> consumer(MyService myService) {
return flux -> flux
.doOnNext(msg -> log.info("New message received: {}", msg))
.flatMap(msg -> myService.processMessage(msg)
.onErrorResume(throwable -> Mono.empty())
)
.then();
}
请注意,onErrorResume
在内部 publisher 上(在 flatMap
内部)。
英文:
So this is what I've gathered from my own investigations, maybe this might help others. Forewarning, I might not be using the right "Spring Reactor Language" but this is how I ended up solving it...
In Hoxton.SR5
, an onErrorContinue
was included on the reactive binding that managed the flux subscription. The problem with onErrorContinue
is that it affects upstream operators by applying the BiConsumer function at the operator that failed (if supported).
This means that when an error occurred in our map
/flatMap
operators, the onErrorContinue
BiConsumer would kick in and modify the downstream signal to either onComplete()
(Mono<T>
) or request(...)
(if it requested a new element from a Flux<T>
). This resulted in our doOnError(...)
operators not executing since there were no onError()
signals.
Eventually the SCS team decided to remove this error handling wrapper. Hoxton.SR6
no longer has this onErrorContinue
. However, this meant that exceptions propagating up to the SCS binding would result in the Flux subscription being severed. Subsequent messages would then have nowhere to be routed since there were no subscribers.
This error handling has been passed along to the clients, we add an onErrorResume
operator to the inner publisher to effectively drop error signals. When an error is encountered within the myService::processMessage
publisher, onErrorResume
will switch publishers to the fallback publisher that was passed in as a parameter and resume from that point in the operator chain. In our case, this fallback publisher simply returns Mono.empty()
which allows us to drop the error signals while still allowing internal error handling mechanisms to operate while also not affecting the outer source publisher.
onErrorResume
Example/Explanation
The above technique can be illustrated with a very simple example.
Flux.just(1, 2, 3)
.flatMap(i -> i == 2
? Mono.error(new RuntimeException("error")
: Mono.just(i))
.onErrorResume(t -> Flux.just(4, 5, 6))
.doOnNext(i -> log.info("Element: {}", i))
.subscribe();
The Flux<Integer>
above will output the following:
Element: 1
Element: 4
Element: 5
Element: 6
Since an error is encountered at element 2
, onErrorResume
fallback kicks in and the new publisher becomes Flux.just(4, 5, 6)
effectively resuming from the fallback. In our case, we don't want to affect the source publisher (i.e. Flux.just(1, 2, 3)
). We want to just drop the erroneous element (2
) and continue to the next element (3
).
We can't simply change Flux.just(4, 5, 6)
to Flux.empty()
or Mono.empty()
as so:
Flux.just(1, 2, 3)
.flatMap(i -> i == 2
? Mono.error(new RuntimeException("error")
: Mono.just(i))
.onErrorResume(t -> Mono.empty())
.doOnNext(i -> log.info("Element: {}", i))
.subscribe();
This would cause the following to be output:
Element: 1
This is because onErrorResume
has replaced the upstream publishers with the fallback publisher (i.e. Mono.empty()
) and resumed from that point on.
To achieve our desired output of:
Element: 1
Element: 3
We must place the onErrorResume
operator on the inner publisher of the flatMap
:
public Mono<Integer> func(int i) {
return i = 2 ? Mono.error(new RuntimeException("error")) : Mono.just(i);
}
Flux.just(1, 2, 3)
.flatMap(i -> func(i)
onErrorResume(t -> Mono.empty()))
.doOnNext(i -> log.info("Element: {}", i))
.subscribe();
Now, the onErrorResume
only effects the inner publisher returned by func(i)
. If an error occurs from operators in func(i)
, onErrorResume
will fallback to Mono.empty()
effectively completing the Mono<T>
without blowing up. This also still allows error handling operators (e.g. doOnError
) within func(i)
to be applied before the fallback runs. This is because, unlike onErrorContinue
, it does not affect upstream operators and change the next signal at the location of the error.
Final Solution
Reusing the code-snippet in my question, I've upgraded my Spring Cloud version to Hoxton.SR6
and changed the code to something like this:
@Bean
public Function<Flux<CustomMessage>, Mono<Void>> consumer(MyService myService) {
return flux -> flux
.doOnNext(msg -> log.info("New message received: {}", msg))
.flatMap(msg -> myService.processMessage(msg)
.onErrorResume(throwable -> Mono.empty())
)
.then();
}
Note that the onErrorResume
is on the inner publisher (inside the flatMap
).
答案2
得分: 0
我认为问题存在于以下代码中:
.map(msg -> new RuntimeException("exception encountered!"))
您map
行中的Lambda表达式返回了一个异常,而不是抛出一个异常。
英文:
I think the problem exists in the following code:
<!-- begin snippet: js hide: false console: true babel: false -->
<!-- language: java -->
.map(msg -> new RuntimeException("exception encountered!"))
<!-- end snippet -->
The lambda in your map line is returning an exception, not throwing one.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论