Why is my Spring Batch multi-threaded step executing all reads before any processing?

huangapple go评论98阅读模式
英文:

Why is my Spring Batch multi-threaded step executing all reads before any processing?

问题

以下是翻译好的内容:

我正在尝试编写一个用于将遗留数据库中数百万条条目(具有复杂的架构)转换为精简的JSON格式并将该JSON发布到GCP PubSub的Spring Batch流程。为了使这个过程尽可能高效,我尝试利用Spring Batch的多线程步骤(Spring-Batch multi-threaded Step)。

为了测试我的流程,我从小的数据集开始,使用页面大小和块大小为5,处理的总条目限制为20,线程池中只有1个线程。我尝试逐步执行该流程以验证它是否按照我预期的方式工作,但事实并非如此。

我预期配置RepositoryItemReader的页面大小为5会导致它仅从数据库中读取5条记录,然后在读取下5条记录之前处理这5条记录。但实际情况并非如此。在日志中,由于我启用了Hibernate的show-sql功能,我可以看到读取器在开始任何处理之前就读取了所有20条记录。

为什么我的多线程步骤在执行任何处理之前就执行了所有的读取操作?我配置错了吗?显然,我不希望在开始处理任何内容之前,我的作业就尝试将数百万个DTO加载到内存中...

以下是我如何配置作业的方式:

  1. @Configuration
  2. public class ConversionBatchJobConfig {
  3. @Bean
  4. public SimpleCompletionPolicy processChunkSize(@Value("${commit.chunk.size:5}") Integer chunkSize) {
  5. return new SimpleCompletionPolicy(chunkSize);
  6. }
  7. @Bean
  8. @StepScope
  9. public ItemStreamReader<DbProjection> dbReader(
  10. MyDomainRepository myDomainRepository,
  11. @Value("#{jobParameters[pageSize]}") Integer pageSize, // pageSize和chunkSize目前都为5
  12. @Value("#{jobParameters[limit]}") Integer limit) { // limit为40
  13. RepositoryItemReader<DbProjection> myDomainRepositoryReader = new RepositoryItemReader<>();
  14. myDomainRepositoryReader.setRepository(myDomainRepository);
  15. myDomainRepositoryReader.setMethodName("findActiveDbDomains"); // 一种本地查询
  16. myDomainRepositoryReader.setArguments(new ArrayList<Object>() {{
  17. add("ACTIVE");
  18. }});
  19. myDomainRepositoryReader.setSort(new HashMap<String, Sort.Direction>() {{
  20. put("update_date", Sort.Direction.ASC);
  21. }});
  22. myDomainRepositoryReader.setPageSize(pageSize);
  23. myDomainRepositoryReader.setMaxItemCount(limit);
  24. myDomainRepositoryReader.setSaveState(false);
  25. return myDomainRepositoryReader;
  26. }
  27. // 其他Bean的配置...
  28. @Bean
  29. public Step conversionProcess(SimpleCompletionPolicy processChunkSize,
  30. ItemStreamReader<DbProjection> dbReader,
  31. ItemProcessor<DbProjection, JsonMessage> dataConverter,
  32. ItemWriter<JsonMessage> jsonPublisher,
  33. StepBuilderFactory stepBuilderFactory,
  34. TaskExecutor conversionThreadPool,
  35. @Value("${conversion.failure.limit:20}") int maximumFailures) {
  36. return stepBuilderFactory.get("conversionProcess")
  37. .<DbProjection, JsonMessage>chunk(processChunkSize)
  38. .reader(dbReader)
  39. .processor(dataConverter)
  40. .writer(jsonPublisher)
  41. .faultTolerant()
  42. .skipPolicy(new MyCustomConversionSkipPolicy(maximumFailures))
  43. .listener(new MyConversionSkipListener(processStatus))
  44. .taskExecutor(conversionThreadPool)
  45. .build();
  46. }
  47. @Bean
  48. public Job conversionJob(Step conversionProcess,
  49. JobBuilderFactory jobBuilderFactory) {
  50. return jobBuilderFactory.get("conversionJob")
  51. .start(conversionProcess)
  52. .build();
  53. }
  54. }

请注意,上面的代码是原文的翻译,可能存在格式或代码错误。如果您有任何问题,请随时提问。

英文:

I'm attempting to write a Spring Batch process for converting millions of entries in a legacy DB, with a sprawling schema, into a streamlined JSON format and publishing that JSON to GCP PubSub. In order to make this process as efficient as possible, I'm attempting to leverage a Spring-Batch multi-threaded Step.

To test my process, I've started small, with a page size and chunk size of 5, a limit of 20 entries total to process, and a thread-pool of just 1 thread. I'm attempting to step through the process to validate it's working as I expected - but it's not.

I expected that configuring my RepositoryItemReader with a page size of 5, would cause it to read just 5 records from the DB - and then process those records in a single chunk of 5 before reading the next 5. But that's not what's happening. Instead, in the logs, since I have hibernate show-sql enabled, I can see the reader reads ALL 20 records before any processing starts.

Why is my multithreaded step peforming ALL of its reading before executing any processing? Have I misconfigured it? Obviously I wouldn't want my Job trying to load millions of DTOs into memory before it starts processing anything...

Here's how I've configured my job:

  1. @Configuration
  2. public class ConversionBatchJobConfig {
  3. @Bean
  4. public SimpleCompletionPolicy processChunkSize(@Value(&quot;${commit.chunk.size:5}&quot;) Integer chunkSize) {
  5. return new SimpleCompletionPolicy(chunkSize);
  6. }
  7. @Bean
  8. @StepScope
  9. public ItemStreamReader&lt;DbProjection&gt; dbReader(
  10. MyDomainRepository myDomainRepository,
  11. @Value(&quot;#{jobParameters[pageSize]}&quot;) Integer pageSize, //pageSize and chunkSize both 5 for now
  12. @Value(&quot;#{jobParameters[limit]}&quot;) Integer limit) { //limit is 40
  13. RepositoryItemReader&lt;DbProjection&gt; myDomainRepositoryReader = new RepositoryItemReader&lt;&gt;();
  14. myDomainRepositoryReader.setRepository(myDomainRepository);
  15. myDomainRepositoryReader.setMethodName(&quot;findActiveDbDomains&quot;); //A native query
  16. myDomainRepositoryReader.setArguments(new ArrayList&lt;Object&gt;() {{
  17. add(&quot;ACTIVE&quot;);
  18. }});
  19. myDomainRepositoryReader.setSort(new HashMap&lt;String, Sort.Direction&gt;() {{
  20. put(&quot;update_date&quot;, Sort.Direction.ASC);
  21. }});
  22. myDomainRepositoryReader.setPageSize(pageSize);
  23. myDomainRepositoryReader.setMaxItemCount(limit);
  24. myDomainRepositoryReader.setSaveState(false);
  25. return myDomainRepositoryReader;
  26. }
  27. @Bean
  28. @StepScope
  29. public ItemProcessor&lt;DbProjection, JsonMessage&gt; dataConverter(DataRetrievalService dataRetrievalService) {
  30. return new DbProjectionToJsonMessageConverter(dataRetrievalService);
  31. }
  32. @Bean
  33. @StepScope
  34. public ItemWriter&lt;JsonMessage&gt; jsonPublisher(GcpPubsubPublisherService publisherService) {
  35. return new JsonMessageWriter(publisherService);
  36. }
  37. @Bean
  38. public Step conversionProcess(SimpleCompletionPolicy processChunkSize,
  39. ItemStreamReader&lt;DbProjection&gt; dbReader,
  40. ItemProcessor&lt;DbProjection, JsonMessage&gt; dataConverter,
  41. ItemWriter&lt;JsonMessage&gt; jsonPublisher,
  42. StepBuilderFactory stepBuilderFactory,
  43. TaskExecutor conversionThreadPool,
  44. @Value(&quot;${conversion.failure.limit:20}&quot;) int maximumFailures) {
  45. return stepBuilderFactory.get(&quot;conversionProcess&quot;)
  46. .&lt;DbProjection, JsonMessage&gt;chunk(processChunkSize)
  47. .reader(dbReader)
  48. .processor(dataConverter)
  49. .writer(jsonPublisher)
  50. .faultTolerant()
  51. .skipPolicy(new MyCustomConversionSkipPolicy(maximumFailures))
  52. // ^ for now this returns true for everything until 20 failures
  53. .listener(new MyConversionSkipListener(processStatus))
  54. // ^ for now this just logs the error
  55. .taskExecutor(conversionThreadPool)
  56. .build();
  57. }
  58. @Bean
  59. public Job conversionJob(Step conversionProcess,
  60. JobBuilderFactory jobBuilderFactory) {
  61. return jobBuilderFactory.get(&quot;conversionJob&quot;)
  62. .start(conversionProcess)
  63. .build();
  64. }
  65. }

答案1

得分: 1

你需要检查 hibernate.jdbc.fetch_size 的值并进行相应的设置。

pageSizefetchSize 是不同的参数。你可以在这里找到关于它们之间区别的更多细节:https://stackoverflow.com/a/58058009/5019386。因此在你的情况下,如果 fetchSize 大于 pageSize,可能会获取比页面大小更多的记录。

英文:

You need to check the value of hibernate.jdbc.fetch_size and set it accordingly.

The pageSize and fetchSize are different parameters. You can find more details on the difference here: https://stackoverflow.com/a/58058009/5019386. So in your case, if the fetchSize is bigger than pageSize, then it's possible that more records are fetched than the page size.

huangapple
  • 本文由 发表于 2020年5月30日 01:58:20
  • 转载请务必保留本文链接:https://go.coder-hub.com/62092002.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定