Spring集成并行拆分

huangapple go评论59阅读模式
英文:

Spring integation parallel split

问题

我有这样的代码。是否可以控制第一个分割的顺序?

@Bean
public TaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(10);
    executor.setMaxPoolSize(50);
    return executor;
}

@Bean
public IntegrationFlow firstFlow() {
    return IntegrationFlows.from("firstChannel")
        .split()
        .channel("inputChannel")
        .get();
}

@Bean
public IntegrationFlow inputFlow() {
    return IntegrationFlows.from("inputChannel")
        .channel(MessageChannels.executor(taskExecutor()))
        .split()
        .handle(this::mapping)
        .aggregate()
        .channel("aggregateChannel")
        .get();
}

@Bean
public IntegrationFlow aggregateFlow() {
    return IntegrationFlows.from("aggregateChannel")
        .aggregate()
        .get();
}

我想要异步处理 "mapping" 方法,但要在第一个分割的消息出现在 "aggregateChannel" 后才开始处理第二个消息并发送到 "inputChannel"。

英文:

I have code like this.
Is it possible to control ordering of first split?

` @Bean
  public TaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(10);
    executor.setMaxPoolSize(50);
    return executor;
  }


  @Bean
  public IntegrationFlow firstFlow() {
    return IntegrationFlows.from("firstChannel")
        .split()
        .channel("inputChannel")
        .get();
  }
  
  @Bean
  public IntegrationFlow inputFlow() {
    return IntegrationFlows.from("inputChannel")
        .channel(MessageChannels.executor(taskExecutor()))
        .split()
        .handle(this::mapping)
        .aggregate()
        .channel("aggregateChannel")
        .get();
  }

  @Bean
  public IntegrationFlow aggregateFlow() {
    return IntegrationFlows.from("aggregateChannel")
        .aggregate()
        .get();
  }`

I want to have async handling of method "mapping", but to start handling second message from first split and sending to inputChannel only when first one will appear in aggregateChannel

答案1

得分: 0

所以这是一个可能解决方案的单元测试

@SpringJUnitConfig
public class So75547720Tests {

    @Autowired
    BeanFactory beanFactory;

    @Test
    void sequentialSplitButSubSplitParallel() {
        List<String> firstList = List.of("1", "2", "3", "4");
        List<String> secondList = List.of("5", "6", "7", "8");
        List<List<String>> testData = List.of(firstList, secondList);

        MessagingTemplate messagingTemplate = new MessagingTemplate();
        messagingTemplate.setBeanFactory(this.beanFactory);

        List<List<String>> result = messagingTemplate.convertSendAndReceive("firstChannel", testData, List.class);
        assertThat(result).isNotNull().hasSize(2);
        assertThat(result.get(0)).hasSameElementsAs(firstList);
        assertThat(result.get(1)).hasSameElementsAs(secondList);
        System.out.println(result);
    }

    @Configuration
    @EnableIntegration
    public static class TestConfiguration {

        @Bean
        public TaskExecutor taskExecutor() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(5);
            return executor;
        }

        @Bean
        public IntegrationFlow firstFlow() {
            return IntegrationFlow.from("firstChannel")
                    .split()
                    .channel("inputChannel")
                    .get();
        }

        @Bean
        public IntegrationFlow inputFlow() {
            return IntegrationFlow.from("inputChannel")
                    .gateway(subFlow -> subFlow
                            .split()
                            .channel(MessageChannels.executor(taskExecutor()))
                            .handle(this::mapping)
                            .aggregate())
                    .channel("aggregateChannel")
                    .get();
        }

        @Bean
        public IntegrationFlow aggregateFlow() {
            return IntegrationFlow.from("aggregateChannel")
                    .aggregate()
                    .get();
        }

        private String mapping(String payload, Map<String, ?> headers) {
            System.out.println("Handling thread: " + Thread.currentThread().getName() + " for: " + payload);
            return payload.toUpperCase();
        }

    }

}

第一个 split() 将项目顺序发射到 inputChannel 中。
然后我们使用一个子流的 gateway。这个网关将等待一个回复,然后将其推送到下一个 aggregateChannel。有趣的部分实际上在于子流中,我们使用了第二个分裂器,它根据 Executor 通道并行发射项目。内部聚合器在收集完当前分裂的所有项目之前不会发射。只有在此之后,我们才会获取来自顶层分裂的下一个项目。

测试的结果可能像这样:

Handling thread: taskExecutor-2 for: 2
Handling thread: taskExecutor-1 for: 1
Handling thread: taskExecutor-3 for: 3
Handling thread: taskExecutor-4 for: 4
Handling thread: taskExecutor-2 for: 6
Handling thread: taskExecutor-5 for: 5
Handling thread: taskExecutor-3 for: 7
Handling thread: taskExecutor-1 for: 8
[[2, 3, 1, 4], [6, 5, 7, 8]]
英文:

So, here is a unit test for possible solution:

@SpringJUnitConfig
public class So75547720Tests {
@Autowired
BeanFactory beanFactory;
@Test
void sequentialSplitButSubSplitParallel() {
List&lt;String&gt; firstList = List.of(&quot;1&quot;, &quot;2&quot;, &quot;3&quot;, &quot;4&quot;);
List&lt;String&gt; secondList = List.of(&quot;5&quot;, &quot;6&quot;, &quot;7&quot;, &quot;8&quot;);
List&lt;List&lt;String&gt;&gt; testData = List.of(firstList, secondList);
MessagingTemplate messagingTemplate = new MessagingTemplate();
messagingTemplate.setBeanFactory(this.beanFactory);
List&lt;List&lt;String&gt;&gt; result = messagingTemplate.convertSendAndReceive(&quot;firstChannel&quot;, testData, List.class);
assertThat(result).isNotNull().hasSize(2);
assertThat(result.get(0)).hasSameElementsAs(firstList);
assertThat(result.get(1)).hasSameElementsAs(secondList);
System.out.println(result);
}
@Configuration
@EnableIntegration
public static class TestConfiguration {
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
return executor;
}
@Bean
public IntegrationFlow firstFlow() {
return IntegrationFlow.from(&quot;firstChannel&quot;)
.split()
.channel(&quot;inputChannel&quot;)
.get();
}
@Bean
public IntegrationFlow inputFlow() {
return IntegrationFlow.from(&quot;inputChannel&quot;)
.gateway(subFlow -&gt; subFlow
.split()
.channel(MessageChannels.executor(taskExecutor()))
.handle(this::mapping)
.aggregate())
.channel(&quot;aggregateChannel&quot;)
.get();
}
@Bean
public IntegrationFlow aggregateFlow() {
return IntegrationFlow.from(&quot;aggregateChannel&quot;)
.aggregate()
.get();
}
private String mapping(String payload, Map&lt;String, ?&gt; headers) {
System.out.println(&quot;Handling thread: &quot; + Thread.currentThread().getName() + &quot; for: &quot; + payload);
return payload.toUpperCase();
}
}
}

The first split() emits items sequentially into that inputChannel.
Then we use a gateway for sub-flow. This gateway will wait for a reply to push it forward for the next aggregateChannel. The interesting part is indeed in that sub-flow where we use a second splitter which does emit items in parallel according to an Executor channel. An inner aggregator won't emit until it gathers all the items for current split. And only after that we go for the next item from a top level split.

The result of the test might be like this:

Handling thread: taskExecutor-2 for: 2
Handling thread: taskExecutor-1 for: 1
Handling thread: taskExecutor-3 for: 3
Handling thread: taskExecutor-4 for: 4
Handling thread: taskExecutor-2 for: 6
Handling thread: taskExecutor-5 for: 5
Handling thread: taskExecutor-3 for: 7
Handling thread: taskExecutor-1 for: 8
[[2, 3, 1, 4], [6, 5, 7, 8]]

huangapple
  • 本文由 发表于 2023年2月24日 00:35:53
  • 转载请务必保留本文链接:https://go.coder-hub.com/75547720.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定