Why is my Spring Batch multi-threaded step executing all reads before any processing?

huangapple go评论98阅读模式

Why is my Spring Batch multi-threaded step executing all reads before any processing?



我正在尝试编写一个用于将遗留数据库中数百万条条目(具有复杂的架构)转换为精简的JSON格式并将该JSON发布到GCP PubSub的Spring Batch流程。为了使这个过程尽可能高效,我尝试利用Spring Batch的多线程步骤(Spring-Batch multi-threaded Step)。





  1. @Configuration
  2. public class ConversionBatchJobConfig {
  3. @Bean
  4. public SimpleCompletionPolicy processChunkSize(@Value("${commit.chunk.size:5}") Integer chunkSize) {
  5. return new SimpleCompletionPolicy(chunkSize);
  6. }
  7. @Bean
  8. @StepScope
  9. public ItemStreamReader<DbProjection> dbReader(
  10. MyDomainRepository myDomainRepository,
  11. @Value("#{jobParameters[pageSize]}") Integer pageSize, // pageSize和chunkSize目前都为5
  12. @Value("#{jobParameters[limit]}") Integer limit) { // limit为40
  13. RepositoryItemReader<DbProjection> myDomainRepositoryReader = new RepositoryItemReader<>();
  14. myDomainRepositoryReader.setRepository(myDomainRepository);
  15. myDomainRepositoryReader.setMethodName("findActiveDbDomains"); // 一种本地查询
  16. myDomainRepositoryReader.setArguments(new ArrayList<Object>() {{
  17. add("ACTIVE");
  18. }});
  19. myDomainRepositoryReader.setSort(new HashMap<String, Sort.Direction>() {{
  20. put("update_date", Sort.Direction.ASC);
  21. }});
  22. myDomainRepositoryReader.setPageSize(pageSize);
  23. myDomainRepositoryReader.setMaxItemCount(limit);
  24. myDomainRepositoryReader.setSaveState(false);
  25. return myDomainRepositoryReader;
  26. }
  27. // 其他Bean的配置...
  28. @Bean
  29. public Step conversionProcess(SimpleCompletionPolicy processChunkSize,
  30. ItemStreamReader<DbProjection> dbReader,
  31. ItemProcessor<DbProjection, JsonMessage> dataConverter,
  32. ItemWriter<JsonMessage> jsonPublisher,
  33. StepBuilderFactory stepBuilderFactory,
  34. TaskExecutor conversionThreadPool,
  35. @Value("${conversion.failure.limit:20}") int maximumFailures) {
  36. return stepBuilderFactory.get("conversionProcess")
  37. .<DbProjection, JsonMessage>chunk(processChunkSize)
  38. .reader(dbReader)
  39. .processor(dataConverter)
  40. .writer(jsonPublisher)
  41. .faultTolerant()
  42. .skipPolicy(new MyCustomConversionSkipPolicy(maximumFailures))
  43. .listener(new MyConversionSkipListener(processStatus))
  44. .taskExecutor(conversionThreadPool)
  45. .build();
  46. }
  47. @Bean
  48. public Job conversionJob(Step conversionProcess,
  49. JobBuilderFactory jobBuilderFactory) {
  50. return jobBuilderFactory.get("conversionJob")
  51. .start(conversionProcess)
  52. .build();
  53. }
  54. }



I'm attempting to write a Spring Batch process for converting millions of entries in a legacy DB, with a sprawling schema, into a streamlined JSON format and publishing that JSON to GCP PubSub. In order to make this process as efficient as possible, I'm attempting to leverage a Spring-Batch multi-threaded Step.

To test my process, I've started small, with a page size and chunk size of 5, a limit of 20 entries total to process, and a thread-pool of just 1 thread. I'm attempting to step through the process to validate it's working as I expected - but it's not.

I expected that configuring my RepositoryItemReader with a page size of 5, would cause it to read just 5 records from the DB - and then process those records in a single chunk of 5 before reading the next 5. But that's not what's happening. Instead, in the logs, since I have hibernate show-sql enabled, I can see the reader reads ALL 20 records before any processing starts.

Why is my multithreaded step peforming ALL of its reading before executing any processing? Have I misconfigured it? Obviously I wouldn't want my Job trying to load millions of DTOs into memory before it starts processing anything...

Here's how I've configured my job:

  1. @Configuration
  2. public class ConversionBatchJobConfig {
  3. @Bean
  4. public SimpleCompletionPolicy processChunkSize(@Value(&quot;${commit.chunk.size:5}&quot;) Integer chunkSize) {
  5. return new SimpleCompletionPolicy(chunkSize);
  6. }
  7. @Bean
  8. @StepScope
  9. public ItemStreamReader&lt;DbProjection&gt; dbReader(
  10. MyDomainRepository myDomainRepository,
  11. @Value(&quot;#{jobParameters[pageSize]}&quot;) Integer pageSize, //pageSize and chunkSize both 5 for now
  12. @Value(&quot;#{jobParameters[limit]}&quot;) Integer limit) { //limit is 40
  13. RepositoryItemReader&lt;DbProjection&gt; myDomainRepositoryReader = new RepositoryItemReader&lt;&gt;();
  14. myDomainRepositoryReader.setRepository(myDomainRepository);
  15. myDomainRepositoryReader.setMethodName(&quot;findActiveDbDomains&quot;); //A native query
  16. myDomainRepositoryReader.setArguments(new ArrayList&lt;Object&gt;() {{
  17. add(&quot;ACTIVE&quot;);
  18. }});
  19. myDomainRepositoryReader.setSort(new HashMap&lt;String, Sort.Direction&gt;() {{
  20. put(&quot;update_date&quot;, Sort.Direction.ASC);
  21. }});
  22. myDomainRepositoryReader.setPageSize(pageSize);
  23. myDomainRepositoryReader.setMaxItemCount(limit);
  24. myDomainRepositoryReader.setSaveState(false);
  25. return myDomainRepositoryReader;
  26. }
  27. @Bean
  28. @StepScope
  29. public ItemProcessor&lt;DbProjection, JsonMessage&gt; dataConverter(DataRetrievalService dataRetrievalService) {
  30. return new DbProjectionToJsonMessageConverter(dataRetrievalService);
  31. }
  32. @Bean
  33. @StepScope
  34. public ItemWriter&lt;JsonMessage&gt; jsonPublisher(GcpPubsubPublisherService publisherService) {
  35. return new JsonMessageWriter(publisherService);
  36. }
  37. @Bean
  38. public Step conversionProcess(SimpleCompletionPolicy processChunkSize,
  39. ItemStreamReader&lt;DbProjection&gt; dbReader,
  40. ItemProcessor&lt;DbProjection, JsonMessage&gt; dataConverter,
  41. ItemWriter&lt;JsonMessage&gt; jsonPublisher,
  42. StepBuilderFactory stepBuilderFactory,
  43. TaskExecutor conversionThreadPool,
  44. @Value(&quot;${conversion.failure.limit:20}&quot;) int maximumFailures) {
  45. return stepBuilderFactory.get(&quot;conversionProcess&quot;)
  46. .&lt;DbProjection, JsonMessage&gt;chunk(processChunkSize)
  47. .reader(dbReader)
  48. .processor(dataConverter)
  49. .writer(jsonPublisher)
  50. .faultTolerant()
  51. .skipPolicy(new MyCustomConversionSkipPolicy(maximumFailures))
  52. // ^ for now this returns true for everything until 20 failures
  53. .listener(new MyConversionSkipListener(processStatus))
  54. // ^ for now this just logs the error
  55. .taskExecutor(conversionThreadPool)
  56. .build();
  57. }
  58. @Bean
  59. public Job conversionJob(Step conversionProcess,
  60. JobBuilderFactory jobBuilderFactory) {
  61. return jobBuilderFactory.get(&quot;conversionJob&quot;)
  62. .start(conversionProcess)
  63. .build();
  64. }
  65. }


得分: 1

你需要检查 hibernate.jdbc.fetch_size 的值并进行相应的设置。

pageSizefetchSize 是不同的参数。你可以在这里找到关于它们之间区别的更多细节:https://stackoverflow.com/a/58058009/5019386。因此在你的情况下,如果 fetchSize 大于 pageSize,可能会获取比页面大小更多的记录。


You need to check the value of hibernate.jdbc.fetch_size and set it accordingly.

The pageSize and fetchSize are different parameters. You can find more details on the difference here: https://stackoverflow.com/a/58058009/5019386. So in your case, if the fetchSize is bigger than pageSize, then it's possible that more records are fetched than the page size.

  • 本文由 发表于 2020年5月30日 01:58:20
  • 转载请务必保留本文链接:https://go.coder-hub.com/62092002.html



:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:
