英文:
Why is my Spring Batch multi-threaded step executing all reads before any processing?
问题
以下是翻译好的内容:
我正在尝试编写一个用于将遗留数据库中数百万条条目(具有复杂的架构)转换为精简的JSON格式并将该JSON发布到GCP PubSub的Spring Batch流程。为了使这个过程尽可能高效,我尝试利用Spring Batch的多线程步骤(Spring-Batch multi-threaded Step)。
为了测试我的流程,我从小的数据集开始,使用页面大小和块大小为5,处理的总条目限制为20,线程池中只有1个线程。我尝试逐步执行该流程以验证它是否按照我预期的方式工作,但事实并非如此。
我预期配置RepositoryItemReader的页面大小为5会导致它仅从数据库中读取5条记录,然后在读取下5条记录之前处理这5条记录。但实际情况并非如此。在日志中,由于我启用了Hibernate的show-sql功能,我可以看到读取器在开始任何处理之前就读取了所有20条记录。
为什么我的多线程步骤在执行任何处理之前就执行了所有的读取操作?我配置错了吗?显然,我不希望在开始处理任何内容之前,我的作业就尝试将数百万个DTO加载到内存中...
以下是我如何配置作业的方式:
@Configuration
public class ConversionBatchJobConfig {
@Bean
public SimpleCompletionPolicy processChunkSize(@Value("${commit.chunk.size:5}") Integer chunkSize) {
return new SimpleCompletionPolicy(chunkSize);
}
@Bean
@StepScope
public ItemStreamReader<DbProjection> dbReader(
MyDomainRepository myDomainRepository,
@Value("#{jobParameters[pageSize]}") Integer pageSize, // pageSize和chunkSize目前都为5
@Value("#{jobParameters[limit]}") Integer limit) { // limit为40
RepositoryItemReader<DbProjection> myDomainRepositoryReader = new RepositoryItemReader<>();
myDomainRepositoryReader.setRepository(myDomainRepository);
myDomainRepositoryReader.setMethodName("findActiveDbDomains"); // 一种本地查询
myDomainRepositoryReader.setArguments(new ArrayList<Object>() {{
add("ACTIVE");
}});
myDomainRepositoryReader.setSort(new HashMap<String, Sort.Direction>() {{
put("update_date", Sort.Direction.ASC);
}});
myDomainRepositoryReader.setPageSize(pageSize);
myDomainRepositoryReader.setMaxItemCount(limit);
myDomainRepositoryReader.setSaveState(false);
return myDomainRepositoryReader;
}
// 其他Bean的配置...
@Bean
public Step conversionProcess(SimpleCompletionPolicy processChunkSize,
ItemStreamReader<DbProjection> dbReader,
ItemProcessor<DbProjection, JsonMessage> dataConverter,
ItemWriter<JsonMessage> jsonPublisher,
StepBuilderFactory stepBuilderFactory,
TaskExecutor conversionThreadPool,
@Value("${conversion.failure.limit:20}") int maximumFailures) {
return stepBuilderFactory.get("conversionProcess")
.<DbProjection, JsonMessage>chunk(processChunkSize)
.reader(dbReader)
.processor(dataConverter)
.writer(jsonPublisher)
.faultTolerant()
.skipPolicy(new MyCustomConversionSkipPolicy(maximumFailures))
.listener(new MyConversionSkipListener(processStatus))
.taskExecutor(conversionThreadPool)
.build();
}
@Bean
public Job conversionJob(Step conversionProcess,
JobBuilderFactory jobBuilderFactory) {
return jobBuilderFactory.get("conversionJob")
.start(conversionProcess)
.build();
}
}
请注意,上面的代码是原文的翻译,可能存在格式或代码错误。如果您有任何问题,请随时提问。
英文:
I'm attempting to write a Spring Batch process for converting millions of entries in a legacy DB, with a sprawling schema, into a streamlined JSON format and publishing that JSON to GCP PubSub. In order to make this process as efficient as possible, I'm attempting to leverage a Spring-Batch multi-threaded Step.
To test my process, I've started small, with a page size and chunk size of 5, a limit of 20 entries total to process, and a thread-pool of just 1 thread. I'm attempting to step through the process to validate it's working as I expected - but it's not.
I expected that configuring my RepositoryItemReader with a page size of 5, would cause it to read just 5 records from the DB - and then process those records in a single chunk of 5 before reading the next 5. But that's not what's happening. Instead, in the logs, since I have hibernate show-sql enabled, I can see the reader reads ALL 20 records before any processing starts.
Why is my multithreaded step peforming ALL of its reading before executing any processing? Have I misconfigured it? Obviously I wouldn't want my Job trying to load millions of DTOs into memory before it starts processing anything...
Here's how I've configured my job:
@Configuration
public class ConversionBatchJobConfig {
@Bean
public SimpleCompletionPolicy processChunkSize(@Value("${commit.chunk.size:5}") Integer chunkSize) {
return new SimpleCompletionPolicy(chunkSize);
}
@Bean
@StepScope
public ItemStreamReader<DbProjection> dbReader(
MyDomainRepository myDomainRepository,
@Value("#{jobParameters[pageSize]}") Integer pageSize, //pageSize and chunkSize both 5 for now
@Value("#{jobParameters[limit]}") Integer limit) { //limit is 40
RepositoryItemReader<DbProjection> myDomainRepositoryReader = new RepositoryItemReader<>();
myDomainRepositoryReader.setRepository(myDomainRepository);
myDomainRepositoryReader.setMethodName("findActiveDbDomains"); //A native query
myDomainRepositoryReader.setArguments(new ArrayList<Object>() {{
add("ACTIVE");
}});
myDomainRepositoryReader.setSort(new HashMap<String, Sort.Direction>() {{
put("update_date", Sort.Direction.ASC);
}});
myDomainRepositoryReader.setPageSize(pageSize);
myDomainRepositoryReader.setMaxItemCount(limit);
myDomainRepositoryReader.setSaveState(false);
return myDomainRepositoryReader;
}
@Bean
@StepScope
public ItemProcessor<DbProjection, JsonMessage> dataConverter(DataRetrievalService dataRetrievalService) {
return new DbProjectionToJsonMessageConverter(dataRetrievalService);
}
@Bean
@StepScope
public ItemWriter<JsonMessage> jsonPublisher(GcpPubsubPublisherService publisherService) {
return new JsonMessageWriter(publisherService);
}
@Bean
public Step conversionProcess(SimpleCompletionPolicy processChunkSize,
ItemStreamReader<DbProjection> dbReader,
ItemProcessor<DbProjection, JsonMessage> dataConverter,
ItemWriter<JsonMessage> jsonPublisher,
StepBuilderFactory stepBuilderFactory,
TaskExecutor conversionThreadPool,
@Value("${conversion.failure.limit:20}") int maximumFailures) {
return stepBuilderFactory.get("conversionProcess")
.<DbProjection, JsonMessage>chunk(processChunkSize)
.reader(dbReader)
.processor(dataConverter)
.writer(jsonPublisher)
.faultTolerant()
.skipPolicy(new MyCustomConversionSkipPolicy(maximumFailures))
// ^ for now this returns true for everything until 20 failures
.listener(new MyConversionSkipListener(processStatus))
// ^ for now this just logs the error
.taskExecutor(conversionThreadPool)
.build();
}
@Bean
public Job conversionJob(Step conversionProcess,
JobBuilderFactory jobBuilderFactory) {
return jobBuilderFactory.get("conversionJob")
.start(conversionProcess)
.build();
}
}
答案1
得分: 1
你需要检查 hibernate.jdbc.fetch_size
的值并进行相应的设置。
pageSize
和 fetchSize
是不同的参数。你可以在这里找到关于它们之间区别的更多细节:https://stackoverflow.com/a/58058009/5019386。因此在你的情况下,如果 fetchSize
大于 pageSize
,可能会获取比页面大小更多的记录。
英文:
You need to check the value of hibernate.jdbc.fetch_size
and set it accordingly.
The pageSize
and fetchSize
are different parameters. You can find more details on the difference here: https://stackoverflow.com/a/58058009/5019386. So in your case, if the fetchSize
is bigger than pageSize
, then it's possible that more records are fetched than the page size.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论