英文:
Spring Batch Transactions in StepExecutionListener
问题
我有一个 Spring Batch 作业,它从 Web 服务读取数据,然后在处理器中进行一些增强处理,然后保存到数据库。如果有人针对相同的一组参数运行相同的作业两次,我希望在数据库中删除旧数据,然后作为此作业的一部分重新写入。
我已经在 StepExecutionListener 的 Before Step 方法中编写了删除逻辑。
我该如何使我的步骤具有事务性,以便如果作业中出现错误,删除操作会被回滚?
this.stepBuilderFactory.get("xStep")
.<Item, Item>chunk(1000)
.reader(xReader)
.processor(xProcessor)
.writer(xWriter)
.listener(xStepExecutionListenerForDelete)
.build()
英文:
I have a spring batch job that reads data from a web service, does some enriching in a processor and then saves to DB. If someone runs the same job twice for same set of param I want to delete the old data in db and then re-write as part of this job.
I have written the delete logic in StepExecutionListener Before Step Method.
How can I make my step transactional so that if there is an error in the job the delete operation is rolledback?
this.stepBuilderFactory.get("xStep")
.<Item,Item>chunk(1000)
.reader(xReader)
.processor(xProcessor)
.writer(xWriter)
.listener(xStepExecutionListenerForDelete)
.build()
答案1
得分: 0
我该如何使我的步骤具有事务性,以便如果作业中出现错误,删除操作会被回滚?
你可以将删除逻辑编写为作业项写入器的一部分,该写入器在由 Spring Batch 驱动的事务内被调用。如果出现任何原因导致事务回滚,你的删除操作也会被回滚。请注意,作业项写入器不仅用于插入数据,还可用于更新数据和删除数据(MongoItemWriter#setDelete 是一个示例)。
英文:
> How can I make my step transactional so that if there is an error in the job the delete operation is rolledback?
You can write the delete logic as part of the item writer which is called inside the transaction driven by Spring Batch. If the transaction is rolled back for any reason, your delete operation will be rolled back. Note that an item writer is not only used for inserting data, but can be used to update data and delete it as well (MongoItemWriter#setDelete is an example).
答案2
得分: 0
Job Parameters
任务参数
该任务不会启动,如果已经存在具有相同“标识”任务参数的实例。
该任务将以以下异常关闭:
org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException
您可以创建一个作为随机生成值的任务参数,或者是一个日期。在您的监听器中,您可以验证前一个作业实例的任务参数,排除您的“唯一”任务参数。
Transactional Listener
将@Transactional注解添加到监听器中,以使其包装在事务中。
Example
示例
@Transactional
public class DataErasureListener implements StepExecutionListener {
@Autowired
JobExplorer jobExplorer;
@Override
public void beforeStep(StepExecution stepExecution) {
String jobName = stepExecution.getJobExecution().getJobInstance().getJobName();
Map<String, JobParameter> currentJobParameters = stepExecution.getJobParameters().getParameters();
int jobInstanceCount = jobExplorer.getJobInstanceCount(jobName);
if (jobInstanceCount == 1) {
//No prior run
return;
}
//The list of JobInstances are in descending order of creation. Grab the 2nd one.
JobInstance priorJobInstance = jobExplorer.getJobInstances(jobName, 0, jobInstanceCount).get(1);
JobExecution priorJobExecution= jobExplorer.getLastJobExecution(priorJobInstance);
Map<String, JobParameter> priorJobParameters = priorJobExecution.getJobParameters().getParameters();
//Compare prior job parameters excluding "unique" job parameters
currentJobParameters.remove("unique");
priorJobParameters.remove("unique");
if (currentJobParameters.equals(priorJobParameters)) {
//Delete old data
}
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
return stepExecution.getExitStatus();
}
}
英文:
Job Parameters
The job won't begin if there is already an instance with the same "identifying" job parameters.
The job will shutdown with the following exception:
org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException
You could create a job parameter that is a randomly generated value, or perhaps a date. In your listener, you could verify the job parameters of the previous job instance excluding your "unique" job parameter.
Transactional Listener
Add the @Transactional annotation to the listener to have it wrapped in a transaction.
Example
@Transactional
public class DataErasureListener implements StepExecutionListener {
@Autowired
JobExplorer jobExplorer;
@Override
public void beforeStep(StepExecution stepExecution) {
String jobName = stepExecution.getJobExecution().getJobInstance().getJobName();
Map<String, JobParameter> currentJobParameters = stepExecution.getJobParameters().getParameters();
int jobInstanceCount = jobExplorer.getJobInstanceCount(jobName);
if (jobInstanceCount == 1) {
//No prior run
return;
}
//The list of JobInstances are in descending order of creation. Grab the 2nd one.
JobInstance priorJobInstance = jobExplorer.getJobInstances(jobName, 0, jobInstanceCount).get(1);
JobExecution priorJobExecution= jobExplorer.getLastJobExecution(priorJobInstance);
Map<String, JobParameter> priorJobParameters = priorJobExecution.getJobParameters().getParameters();
//Compare prior job parameters excluding "unique" job parameters
currentJobParameters.remove("unique");
priorJobParameters.remove("unique");
if (currentJobParameters.equals(priorJobParameters)) {
//Delete old data
}
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
return stepExecution.getExitStatus();
}
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论