Springbatch – 如何使用Chunking或类似方法拆分工作

huangapple go评论75阅读模式
英文:

Springbatch - How to Split Work with Chunking or Similar

问题

关于如何最好地满足我的需求实现 Spring Batch 分块处理,我有一个问题。目前,我有一个工作中的任务,从数据库中读取一个集合。这个集合基本上将数据分组映射到检索信息。有点类似于:

分组         检索指令
GRP-01      <第 01 组的指令>
GRP-02      <第 02 组的指令>
..
..
GRP-N       <第 N 组的指令>

目前我有类似以下的代码(为了清晰起见,省略了一些细节);

public class BatchConfig {

.
.

@Bean
public ItemReader<CollectionDto> databaseCursorItemReader(DataSource dataSource) {
    return new JdbcCursorItemReaderBuilder<CollectionDto>()
            .name("cursorItemReader")
        .dataSource(dataSource)
        .sql(GET_DATA)
        .rowMapper(new BeanPropertyRowMapper<>(CollectionDto.class))
            .build();
}

@Bean
ItemProcessor<CollectionDto, CollectionDto> databaseXmlItemProcessor() {
    return new QueryLoggingProcessor();
}

@Bean
public ItemWriter<CollectionDto> databaseCursorItemWriter() {
    return new GroupingWriter();
} 

.
.

}

public class GroupingWriter implements ItemWriter<CollectionDto> {

@Override
public void write(List<? CollectionDto> list) {

for(CollectionDto group : list) {

   <在这里处理每个组>

}

我遇到的问题是,我与之交互的系统的性质使得这个过程太慢了。因此,我希望将工作(目前在上面的 GroupingWriter 中执行的工作)分割成块,以便可以并行处理每个组。我尝试找出一种在单独的步骤中使用分块来进行组处理的方法,但我无法想出如何将集合中的每个项分配给单独的块。如果有任何想法,我将不胜感激。谢谢。

英文:

I have a question regarding how best to pursue a springbatch chunking implementation for my needs. Currently, I have a working job where I read off a collection from a database. This collection essentially maps data grouping to retrieval information. Sort of like:

GROUPING    RETRIEVAL INSTRUCTIONS
GRP-01      <instructions for group 01>
GRP-02      <instructions for group 02>
..
..
GRP-N       <instructions for group N>

Currently I have something similar to below (some details left out for clarity);

public class BatchConfig {

.
.

@Bean
public ItemReader<CollectionDto> databaseCursorItemReader(DataSource dataSource) {
    return new JdbcCursorItemReaderBuilder<CollectionDto>()
            .name("cursorItemReader")
        .dataSource(dataSource)
        .sql(GET_DATA)
        .rowMapper(new BeanPropertyRowMapper<>(CollectionDto.class))
            .build();
}

@Bean
ItemProcessor<CollectionDto, CollectionDto> databaseXmlItemProcessor() {
    return new QueryLoggingProcessor();
}

@Bean
public ItemWriter<CollectionDto> databaseCursorItemWriter() {
    return new GroupingWriter();
} 

.
.

}

public class GroupingWriter implements ItemWriter<CollectionDto> {

@Override
public void write(List<? CollectionDto> list) {

for(CollectionDto group : list) {

   <processing here one group at a time>

}

The problem I am encountering is the nature of the systems I am interfacing with makes this too slow. So I would like to split the work (currently performed in the GroupingWriter above) probably by chunking, so that I could process each group in parallel. I was trying to figure out a way to do this group process using chunking in a separate step, but I cannot figure out how to assign each item in the collection to a separate chunk. I would be grateful for any ideas. Thanks.

答案1

得分: 1

这是我用于块处理的Spring Batch配置:

@Bean
public Job myJob() {
    return jobBuilders.get("myJob")
        .start(chunkStep())
        .build();
}

@Bean
public Step chunkStep() {
    return stepBuilderFactory.get("my_step")
        .<InputData, OutputData>chunk(20).faultTolerant()
        .reader(databaseCursorItemReader())
        .processor(processor())
        .writer(databaseCursorItemWriter())
        .build();
}

在块处理中,每个单独的项目都从ItemReader中读取,交给ItemProcessor处理,然后进行聚合。一旦读取的项目数达到提交间隔(在上面的示例中为20),整个块将通过ItemWriter写出,然后提交事务。

英文:

Here is my Spring Batch configuration for chunk-oriented processing:

@Bean
public Job myJob() {
    return jobBuilders.get(&quot;myJob&quot;)
        .start(chunkStep())
        .build();
}

@Bean
public Step chunkStep() {
	return stepBuilderFactory.get(&quot;my_step&quot;)
			.&lt;InputData, OutputData&gt;chunk(20).faultTolerant()
			 .reader(databaseCursorItemReader())
            .processor(processor())
            .writer(databaseCursorItemWriter())
            .build();
}

In chunk-oriented processing each individual item is read in from an ItemReader, handed to an ItemProcessor, and aggregated. Once the number of items read equals the commit interval (i.e. 20 in above example), the entire chunk is written out via the ItemWriter, and then the transaction is committed.

huangapple
  • 本文由 发表于 2020年8月28日 07:24:53
  • 转载请务必保留本文链接:https://go.coder-hub.com/63625412.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定