SpringBatch在步骤之间共享大量数据。

huangapple go评论78阅读模式
英文:

SpringBatch Sharing Large Amounts of Data Between Steps

问题

我需要在 Spring Batch 实现中的作业步骤之间共享相对较大量的数据。我知道 StepExecutionContextJobExecutionContext 可以用作这方面的机制。然而,我阅读到由于这些必须有大小限制(小于2500个字符),这对我来说太小了。在我这个初学者的单步骤 Spring Batch 实现中,我的单步骤作业如下:

@Configuration
@EnableBatchProcessing
public class BatchConfig {

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;


    private static final String GET_DATA =
    "    SELECT " +
    "stuffA, " +
    "stuffB, " +
    "FROM STUFF_TABLE " +
    "ORDER BY stuffA ASC";

    @Bean
    public ItemReader<StuffDto> databaseCursorItemReader(DataSource dataSource) {
        return new JdbcCursorItemReaderBuilder<StuffDto>()
                .name("cursorItemReader")
            .dataSource(dataSource)
            .sql(GET_DATA)
            .rowMapper(new BeanPropertyRowMapper<>(StuffDto.class))
                .build();
    }

    @Bean
    ItemProcessor<StuffDto, StuffDto> databaseXmlItemProcessor() {
        return new QueryLoggingProcessor();
    }

    @Bean
    public ItemWriter<StuffDto> databaseCursorItemWriter() {
        return new LoggingItemWriter();
    }

    @Bean
    public Step databaseCursorStep(@Qualifier("databaseCursorItemReader") ItemReader<StuffDto> reader,
    @Qualifier("databaseCursorItemWriter") ItemWriter<StuffDto> writer,
    StepBuilderFactory stepBuilderFactory) {
        return stepBuilderFactory.get("databaseCursorStep")
            .<StuffDto, StuffDto>chunk(1)
        .reader(reader)
            .writer(writer)
            .build();
    }

    @Bean
    public Job databaseCursorJob(@Qualifier("databaseCursorStep") Step exampleJobStep,
    JobBuilderFactory jobBuilderFactory) {
        return jobBuilderFactory.get("databaseCursorJob")
            .incrementer(new RunIdIncrementer())
            .flow(exampleJobStep)
            .end()
            .build();
    }
}

这在某种意义上是有效的,因为我可以成功地从数据库中读取,并在写入步骤中写入到类似于以下的 loggingitemwriter 中:

public class LoggingItemWriter implements ItemWriter<StuffDto> {

    private static final Logger LOGGER = LoggerFactory.getLogger(LoggingItemWriter.class);

    @Override
    public void write(List<? extends StuffDto> list) throws Exception {
        LOGGER.info("Writing stuff: {}", list);
    }
}

然而,我需要能够在第二个步骤中使 StuffDto(或等效的内容)及其数据可用,该步骤将对其执行某些处理,而不仅仅是记录日志。

如果假设步骤和作业上下文受限,我将非常感谢任何关于如何实现此目标的想法。谢谢。

英文:

I have a need to share relatively large amounts of data between job steps for a spring batch implementation. I am aware of StepExecutionContext and JobExecutionContext as mechanisms for this. However, I read that since these must be limited in size (less than 2500 characters). That is too small for my needs. In my novice one-step Spring Batch implementation, my single step job is as below:

@Configuration
@EnableBatchProcessing
public class BatchConfig {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
private static final String GET_DATA =
&quot;    SELECT &quot; +
&quot;stuffA, &quot; +
&quot;stuffB, &quot; +
&quot;FROM STUFF_TABLE &quot; +
&quot;ORDER BY stuffA ASC&quot;;
@Bean
public ItemReader&lt;StuffDto&gt; databaseCursorItemReader(DataSource dataSource) {
return new JdbcCursorItemReaderBuilder&lt;StuffDto&gt;()
.name(&quot;cursorItemReader&quot;)
.dataSource(dataSource)
.sql(GET_DATA)
.rowMapper(new BeanPropertyRowMapper&lt;&gt;(StuffDto.class))
.build();
}
@Bean
ItemProcessor&lt;StuffDto, StuffDto&gt; databaseXmlItemProcessor() {
return new QueryLoggingProcessor();
}
@Bean
public ItemWriter&lt;StuffDto&gt; databaseCursorItemWriter() {
return new LoggingItemWriter();
}
@Bean
public Step databaseCursorStep(@Qualifier(&quot;databaseCursorItemReader&quot;) ItemReader&lt;StuffDto&gt; reader,
@Qualifier(&quot;databaseCursorItemWriter&quot;) ItemWriter&lt;StuffDto&gt; writer,
StepBuilderFactory stepBuilderFactory) {
return stepBuilderFactory.get(&quot;databaseCursorStep&quot;)
.&lt;StuffDto, StuffDto&gt;chunk(1)
.reader(reader)
.writer(writer)
.build();
}
@Bean
public Job databaseCursorJob(@Qualifier(&quot;databaseCursorStep&quot;) Step exampleJobStep,
JobBuilderFactory jobBuilderFactory) {
return jobBuilderFactory.get(&quot;databaseCursorJob&quot;)
.incrementer(new RunIdIncrementer())
.flow(exampleJobStep)
.end()
.build();
}
}

This works fine in the sense that I can successfully read from the database and write in the writer step to a loggingitemwriter like this:

public class LoggingItemWriter implements ItemWriter&lt;StuffDto&gt; {
private static final Logger LOGGER = LoggerFactory.getLogger(LoggingItemWriter.class);
@Override
public void write(List&lt;? extends StuffDto&gt; list) throws Exception {
LOGGER.info(&quot;Writing stuff: {}&quot;, list);
}
}

However, I need to be able to make available that StuffDto (or equivalent) and it's data to a second step that would be performing some processing against it rather than just logging it.

I would be grateful for any ideas on how that could be accomplished if you assume that the step and job contexts are too limited. Thanks.

答案1

得分: 1

如果您不想将数据写入数据库或文件系统,实现相同目标的一种方法如下所示:

  1. 在您的config类中创建自己的作业上下文bean,具有所需的属性,并使用@JobScope进行注释。
  2. org.springframework.batch.core.step.tasklet接口实现到您的reader、processor和writer类中。如果您希望更多地控制步骤,还可以实现org.springframework.batch.core.StepExecutionListener接口。
  3. 使用@Autowired获取您自己的context对象,并使用其setter-getter方法来存储和检索数据。

示例代码:

Config.java

@Autowired
private Processor processor;

@Autowired
private Reader reader;

@Autowired
private Writer writer;

@Autowired
private JobBuilderFactory jobBuilderFactory;

@Autowired
private StepBuilderFactory stepBuilderFactory;

@Bean
@JobScope
public JobContext getJobContexts() {
    JobContext newJobContext = new JobContext();
    return newJobContext;
}

@Bean
public Step reader() {
    return stepBuilderFactory.get("reader")
      .tasklet(reader)
      .build();
}

@Bean
public Step processor() {
    return stepBuilderFactory.get("processor")
      .tasklet(processor)
      .build();
}

@Bean
public Step writer() {
    return stepBuilderFactory.get("writer")
      .tasklet(writer)
      .build();
}

public Job testJob() {
  
    return jobBuilderFactory.get("testJob")
      .start(reader())
      .next(processor())
      .next(writer())
      .build();
}

//以下将启动作业
@Scheduled(fixedRate = 1000)
public void startJob(){
    
    Map<String, JobParameter> confMap = new HashMap<>();
    confMap.put("time", new JobParameter(new Date()));
    JobParameters jobParameters = new JobParameters(confMap);
    monitorJobLauncher.run(testJob(), jobParameters);
}

JobContext.java

private List<StuffDto> dataToProcess = new ArrayList<>();
private List<StuffDto> dataToWrite = new ArrayList<>();

//getter

SampleReader.java

@Component
public class SampleReader implements Tasklet, StepExecutionListener {
    @Autowired
    private JobContext context;
    
    @Override
    public void beforeStep(StepExecution stepExecution) {
        //在执行此步骤之前需要执行的逻辑。
    }
    
    @Override
    public void afterStep(StepExecution stepExecution) {
        //在执行此步骤之后需要执行的逻辑。
    }
    
    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
        // 位于此处的任何代码将会为reader执行。
        // 从数据库获取StuffDto对象并将其添加到jobContext的dataToProcess列表中。
        return RepeatStatus.FINISHED;
    }
}

SampleProcessor.java

@Component
public class SampleProcessor implements Tasklet {
    
    @Autowired
    private JobContext context;
    
    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
        // 位于此处的任何代码将会为processor执行。
        // context.getDataToProcessList();
        // 应用业务逻辑并设置要写入的数据。
        return RepeatStatus.FINISHED;
    }
}

writer类的方式相同。

注意:请注意,在这里,您需要自己编写与数据库相关的样板代码。但是,通过这种方式,您可以更好地控制逻辑,并且不必担心上下文大小限制。所有数据都将在内存中,因此一旦操作完成,这些数据将被垃圾回收。我希望您理解我想要传达的内容。

要了解有关TaskletChunk的更多信息,请阅读此链接

英文:

If you do not want to write the data in the database or filesystem, one way to achieve the same is like below:

  1. Create your own job context bean in your config class having the required properties and annotated it with @JobScope.
  2. Implement org.springframework.batch.core.step.tasklet interface to your reader, processor and writer classes. If you want more control over steps you can also implement org.springframework.batch.core.StepExecutionListener with it.
  3. Get your own context object using @Autowire and use the setter-getter method of it to store and retrieve the data.

Sample Code:

Config.java

@Autowired
private Processor processor;
@Autowired
private Reader reader;
@Autowired
private Writer writer;
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
@JobScope
public JobContext getJobContexts() {
JobContext newJobContext = new JobContext();
return newJobContext;
}
@Bean
public Step reader() {
return stepBuilderFactory.get(&quot;reader&quot;)
.tasklet(reader)
.build();
}
@Bean
public Step processor() {
return stepBuilderFactory.get(&quot;processor&quot;)
.tasklet(processor)
.build();
}
@Bean
public Step writer() {
return stepBuilderFactory.get(&quot;writer&quot;)
.tasklet(writer)
.build();
}
public Job testJob() {
return jobBuilderFactory.get(&quot;testJob&quot;)
.start(reader())
.next(processor())
.next(writer())
.build();
}
//Below will start the job
@Scheduled(fixedRate = 1000)
public void starJob(){
Map&lt;String, JobParameter&gt; confMap = new HashMap&lt;&gt;();
confMap.put(&quot;time&quot;, new JobParameter(new Date()));
JobParameters jobParameters = new JobParameters(confMap);
monitorJobLauncher.run(testJob(), jobParameters);
}

JobContext.java

private List&lt;StuffDto&gt; dataToProcess = new ArrayList&lt;&gt;();
private List&lt;StuffDto&gt; dataToWrite = new ArrayList&lt;&gt;();
//getter

SampleReader.java

@Component
public class SampleReader  implements Tasklet,StepExecutionListener{
@Autowired
private JobContext context;
@Override
public void beforeStep(StepExecution stepExecution) {
//logic that you need to perform before the execution of this step.
}
@Override
public void afterStep(StepExecution stepExecution) {
//logic that you need to perform after the execution of this step.
}
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext){
// Whatever code is here will get executed for reader.
//  Fetch StuffDto object from database and add it to jobContext 
//dataToProcess list.
return RepeatStatus.FINISHED;
}
}

SampleProcessor.java

   @Component
public class SampleProcessor  implements Tasklet{
@Autowired
private JobContext context;
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext){
// Whatever code is here will get executed for processor.
// context.getDataToProcessList();
// apply business logic and set the data to write.
return RepeatStatus.FINISHED;
}

Same ways for the writer class.

Note: Please note here that here you need to write database-related boilerplate code on your own. But this way you can have more control over your logic and nothing to worry about context size limit. All the data will be in memory so as soon as operation done those will be garbage collected. I hope you get the idea of what I willing to convey.

To read more about Tasklet vs Chunk read this.

huangapple
  • 本文由 发表于 2020年8月25日 00:36:39
  • 转载请务必保留本文链接:https://go.coder-hub.com/63565151.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定