英文:
Spring integation parallel split
问题
我有这样的代码。是否可以控制第一个分割的顺序?
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(50);
return executor;
}
@Bean
public IntegrationFlow firstFlow() {
return IntegrationFlows.from("firstChannel")
.split()
.channel("inputChannel")
.get();
}
@Bean
public IntegrationFlow inputFlow() {
return IntegrationFlows.from("inputChannel")
.channel(MessageChannels.executor(taskExecutor()))
.split()
.handle(this::mapping)
.aggregate()
.channel("aggregateChannel")
.get();
}
@Bean
public IntegrationFlow aggregateFlow() {
return IntegrationFlows.from("aggregateChannel")
.aggregate()
.get();
}
我想要异步处理 "mapping" 方法,但要在第一个分割的消息出现在 "aggregateChannel" 后才开始处理第二个消息并发送到 "inputChannel"。
英文:
I have code like this.
Is it possible to control ordering of first split?
` @Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(50);
return executor;
}
@Bean
public IntegrationFlow firstFlow() {
return IntegrationFlows.from("firstChannel")
.split()
.channel("inputChannel")
.get();
}
@Bean
public IntegrationFlow inputFlow() {
return IntegrationFlows.from("inputChannel")
.channel(MessageChannels.executor(taskExecutor()))
.split()
.handle(this::mapping)
.aggregate()
.channel("aggregateChannel")
.get();
}
@Bean
public IntegrationFlow aggregateFlow() {
return IntegrationFlows.from("aggregateChannel")
.aggregate()
.get();
}`
I want to have async handling of method "mapping", but to start handling second message from first split and sending to inputChannel only when first one will appear in aggregateChannel
答案1
得分: 0
所以,这是一个可能解决方案的单元测试:
@SpringJUnitConfig
public class So75547720Tests {
@Autowired
BeanFactory beanFactory;
@Test
void sequentialSplitButSubSplitParallel() {
List<String> firstList = List.of("1", "2", "3", "4");
List<String> secondList = List.of("5", "6", "7", "8");
List<List<String>> testData = List.of(firstList, secondList);
MessagingTemplate messagingTemplate = new MessagingTemplate();
messagingTemplate.setBeanFactory(this.beanFactory);
List<List<String>> result = messagingTemplate.convertSendAndReceive("firstChannel", testData, List.class);
assertThat(result).isNotNull().hasSize(2);
assertThat(result.get(0)).hasSameElementsAs(firstList);
assertThat(result.get(1)).hasSameElementsAs(secondList);
System.out.println(result);
}
@Configuration
@EnableIntegration
public static class TestConfiguration {
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
return executor;
}
@Bean
public IntegrationFlow firstFlow() {
return IntegrationFlow.from("firstChannel")
.split()
.channel("inputChannel")
.get();
}
@Bean
public IntegrationFlow inputFlow() {
return IntegrationFlow.from("inputChannel")
.gateway(subFlow -> subFlow
.split()
.channel(MessageChannels.executor(taskExecutor()))
.handle(this::mapping)
.aggregate())
.channel("aggregateChannel")
.get();
}
@Bean
public IntegrationFlow aggregateFlow() {
return IntegrationFlow.from("aggregateChannel")
.aggregate()
.get();
}
private String mapping(String payload, Map<String, ?> headers) {
System.out.println("Handling thread: " + Thread.currentThread().getName() + " for: " + payload);
return payload.toUpperCase();
}
}
}
第一个 split()
将项目顺序发射到 inputChannel
中。
然后我们使用一个子流的 gateway
。这个网关将等待一个回复,然后将其推送到下一个 aggregateChannel
。有趣的部分实际上在于子流中,我们使用了第二个分裂器,它根据 Executor
通道并行发射项目。内部聚合器在收集完当前分裂的所有项目之前不会发射。只有在此之后,我们才会获取来自顶层分裂的下一个项目。
测试的结果可能像这样:
Handling thread: taskExecutor-2 for: 2
Handling thread: taskExecutor-1 for: 1
Handling thread: taskExecutor-3 for: 3
Handling thread: taskExecutor-4 for: 4
Handling thread: taskExecutor-2 for: 6
Handling thread: taskExecutor-5 for: 5
Handling thread: taskExecutor-3 for: 7
Handling thread: taskExecutor-1 for: 8
[[2, 3, 1, 4], [6, 5, 7, 8]]
英文:
So, here is a unit test for possible solution:
@SpringJUnitConfig
public class So75547720Tests {
@Autowired
BeanFactory beanFactory;
@Test
void sequentialSplitButSubSplitParallel() {
List<String> firstList = List.of("1", "2", "3", "4");
List<String> secondList = List.of("5", "6", "7", "8");
List<List<String>> testData = List.of(firstList, secondList);
MessagingTemplate messagingTemplate = new MessagingTemplate();
messagingTemplate.setBeanFactory(this.beanFactory);
List<List<String>> result = messagingTemplate.convertSendAndReceive("firstChannel", testData, List.class);
assertThat(result).isNotNull().hasSize(2);
assertThat(result.get(0)).hasSameElementsAs(firstList);
assertThat(result.get(1)).hasSameElementsAs(secondList);
System.out.println(result);
}
@Configuration
@EnableIntegration
public static class TestConfiguration {
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
return executor;
}
@Bean
public IntegrationFlow firstFlow() {
return IntegrationFlow.from("firstChannel")
.split()
.channel("inputChannel")
.get();
}
@Bean
public IntegrationFlow inputFlow() {
return IntegrationFlow.from("inputChannel")
.gateway(subFlow -> subFlow
.split()
.channel(MessageChannels.executor(taskExecutor()))
.handle(this::mapping)
.aggregate())
.channel("aggregateChannel")
.get();
}
@Bean
public IntegrationFlow aggregateFlow() {
return IntegrationFlow.from("aggregateChannel")
.aggregate()
.get();
}
private String mapping(String payload, Map<String, ?> headers) {
System.out.println("Handling thread: " + Thread.currentThread().getName() + " for: " + payload);
return payload.toUpperCase();
}
}
}
The first split()
emits items sequentially into that inputChannel
.
Then we use a gateway
for sub-flow. This gateway will wait for a reply to push it forward for the next aggregateChannel
. The interesting part is indeed in that sub-flow where we use a second splitter which does emit items in parallel according to an Executor
channel. An inner aggregator won't emit until it gathers all the items for current split. And only after that we go for the next item from a top level split.
The result of the test might be like this:
Handling thread: taskExecutor-2 for: 2
Handling thread: taskExecutor-1 for: 1
Handling thread: taskExecutor-3 for: 3
Handling thread: taskExecutor-4 for: 4
Handling thread: taskExecutor-2 for: 6
Handling thread: taskExecutor-5 for: 5
Handling thread: taskExecutor-3 for: 7
Handling thread: taskExecutor-1 for: 8
[[2, 3, 1, 4], [6, 5, 7, 8]]
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论