英文:
Spring Integration 6.1 breaking change: threads no longer block indefinitely
问题
-
确保始终记录“first flow ended”的方法是什么?
- 确保您的代码在6.1版本中按照期望的方式执行,您可以尝试以下方法:
- 在Spring Integration 6.1的配置中,查看与“first flow ended”相关的配置,确保没有出现任何新的配置或更改,可能需要更新配置以适应6.1的新行为。
- 检查在6.1中是否有任何新的日志记录配置或级别更改,确保您的日志记录级别允许“INFO”级别消息被记录。如果有必要,调整日志配置以确保“INFO”级别消息被记录。
- 确保您的代码在6.1版本中按照期望的方式执行,您可以尝试以下方法:
-
确保“second flow started”等待第一个流结束的方法是什么?
- 要确保“second flow started”等待第一个流结束,您可以尝试以下方法:
- 在Spring Integration 6.1的配置中,查看与第一个流和第二个流相关的配置,确保您没有在第二个流的配置中引入了任何新的并行执行行为。您可能需要调整配置以确保第二个流等待第一个流的结束。
- 检查与线程池或执行器服务相关的配置,以确保在第一个流中的操作完成之前不会启动第二个流。可能需要调整线程池配置或等待机制以实现这一点。
- 要确保“second flow started”等待第一个流结束,您可以尝试以下方法:
请注意,由于具体的配置和代码结构可能会有所不同,您可能需要深入研究您的Spring Integration配置和代码以解决这两个问题。如果有具体的代码示例或更多的配置信息可用,将更容易提供详细的建议。
英文:
We're having some issues with unwanted parallel execution of threads that used to execute sequentially. I narrowed it down to this breaking change in v6.1
I'm unsure how I get our code to work with v6.1. It's breaks multiple applications currently in use in production.
Here's an example code to explain the behavior before and after upgrading to 6.1:
.routeToRecipients(route -> route
.recipientFlow(flow -> flow
.log(INFO, logCat, m -> "first flow started")
.scatterGather(
scatterer -> scatterer
.applySequence(true)
.recipientFlow(innerFlow -> innerFlow
.channel(c -> c.executor(executorService))
.log(INFO, logCat, m -> "subflow started")
.transform(source -> {
try {
Thread.sleep(1000 * 40);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return source;
})
.log(INFO, logCat, m -> "subflow ended")
.bridge()),
gatherer -> gatherer.releaseStrategy(group -> {
log.info("group size {}", group.size());
return group.size() == 1;
}))
.log(INFO, logCat, m -> "first flow ended")
.nullChannel()
)
.recipientFlow(flow -> flow
.log(INFO, logCat, m -> "second flow started")
.nullChannel()
)
)
With Spring Integration 6.0:
13:44:02.357 INFO [thread-0] ...: first flow started
13:44:02.360 INFO [thread-0] ...: subflow started
13:44:42.366 INFO [thread-5] ...: subflow ended
13:44:42.370 INFO [thread-5] ...: group size 1
13:44:42.371 INFO [thread-0] ...: first flow ended // good
13:44:42.372 INFO [thread-0] ...: second flow started // good
With Spring Integration 6.1 (notice also that "first flow ended" is never logged):
12:21:32.389 INFO [thread-0] ...: first flow started
12:21:32.391 INFO [thread-0] ...: subflow started
12:22:02.399 INFO [thread-0] ...: second flow started // bad, starts exactly 30 seconds after the first flow is blocked
12:22:12.399 INFO [thread-5] ...: subflow ended
12:22:12.402 INFO [thread-5] ...: group size 1
How can I:
- make sure "first flow ended" is always logged ?
- make sure "second flow started" waits for the first flow to end ?
答案1
得分: 1
你所展示的行为是正确的,也符合你提到的变化。就像我在迁移指南中所说的,现在默认的超时时间是30秒。我们可以从ScatterGatherHandler
的代码中看到:
private long gatherTimeout = IntegrationContextUtils.DEFAULT_TIMEOUT;
而这个默认超时时间如下:
/**
* 用于阻塞操作(如发送和接收消息)的默认超时时间。
* @since 6.1
*/
public static final long DEFAULT_TIMEOUT = 30000L;
基本上,这比你的 Thread.sleep(1000 * 40);
要小。因此,你的第一个子流会在没有进一步的操作的情况下静默结束,因为没有产生回复。
要解决你的问题并将其恢复到先前版本的行为,你需要提供一个比你的阻塞休眠时间更长的超时时间:
scatterGatherSpec -> scatterGatherSpec.gatherTimeout(41000)
作为scatterGather()
配置的第三个参数。
你也可以设置.requiresReply(true)
,如果在超时时间内没有回复,就会失败。
英文:
The behavior you demonstrates is correct and expected according the change you are mentioning. As I said in that Migration Guide, the default timeout now is 30 seconds. And we simply can see that from the ScatterGatherHandler
code:
private long gatherTimeout = IntegrationContextUtils.DEFAULT_TIMEOUT;
Where that one is this:
/**
* The default timeout for blocking operations like send and receive messages.
* @since 6.1
*/
public static final long DEFAULT_TIMEOUT = 30000L;
Which is, essentially, smaller than your Thread.sleep(1000 * 40);
. Therefore your fist sub-flow finishes silently without any further actions. Just because there is no reply to produce.
To fix your problem and bring it back to the previous version behavior, you need to provide that timeout bigger, than your blocking sleep:
scatterGatherSpec -> scatterGatherSpec.gatherTimeout(41000)
as a third argument of that scatterGather()
configuration.
You also can set .requiresReply(true)
to fail if no reply within that timeout.
答案2
得分: 0
使用Artem的解释,我能够像这样解决它:
.scatterGather(
scatterer -> scatterer
.recipientFlow(flow -> flow
.gateway(subflow, spec -> spec.replyTimeout(-1L))),
gatherer -> gatherer.doSomething(),
spec -> spec.gatherTimeout(-1L)
)
我需要在两个地方禁用新的超时,即gatherTimeout()
和gateway()
的超时(用于长时间运行的子流程)。
英文:
Using Artem's explanations, I was able to solve it like this:
.scatterGather(
scatterer -> scatterer
.recipientFlow(flow -> flow
.gateway(subflow, spec -> spec.replyTimeout(-1L)),
gatherer -> gatherer.doSomething(),
spec -> spec.gatherTimeout(-1L)
)
I needed to disable the new timeout in 2 locations, the gatherTimeout()
as well as the gateway()
timeout (for long running sub-flows).
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论