英文:
How to set values in Spring Batch RetryContext
问题
我有一个用于批处理作业的步骤定义如下:
  @Autowired
  RetryContextCache retryContextCache;
  @Bean
  public Step correctionStep(JpaTransactionManager transactionManager) {
    return new StepBuilder("correction-step", jobRepository)
        .<String, List<BookingInfo>>chunk(10, transactionManager)
        .reader(customItemReader())
        .processor(correctionProcessor())
        .writer(customItemWriter)
        .taskExecutor(correctionTaskExecutor())
        .faultTolerant()
        .retryPolicy(retryPolicy())
        .backOffPolicy(exponentialBackOffPolicy())
        .retryContextCache(retryContextCache)
        .skipPolicy(skipPolicy())
        .build();
  }
我有RetryContext和RetryContextCache的bean定义:
  @Bean
  RetryContextCache retryContextCache() {
    return new MapRetryContextCache();
  }
  @Bean
  RetryContext retryContext() {
    return new RetryContextSupport(null);
  }
现在,我在processor中使用它们,如下所示:
@Component
public class CorrectionProcessor implements ItemProcessor <String, List <BookingInfo>> {
  @Autowired
  RetryContextCache retryContextCache;
  @Autowired
  RetryContext retryContext;
  public List<BookingInfo> process(String bookingId) throws Exception {
    List<BookingInfo> list = new ArrayList <> ();
    if (retryContextCache.containsKey(bookingId)) {
      list = (List<BookingInfo>) retryContextCache.get(bookingId).getAttribute(bookingId);
    } else {
      // fetch and populate list from database.
    }
    try {
      // do something with the list.
    } catch (Exception e) {
      // modify something in the list.
      retryContext.setAttribute(bookingId, list);
      retryContextCache.put(bookingId, retryContext);
      throw e;
    }
  }
}
你可以看到,在重新抛出异常以便重试机制生效之前,我尝试在retryContextCache中设置一些值。
当重试发生时,它会进入上面代码中提到的if条件。但是,retryContextCache.get(bookingId).getAttribute(bookingId)的值始终为null。
我是否错误地设置了重试上下文中的值?为什么不起作用?
英文:
I have the below step definition for a batch job:
  @Autowired
  RetryContextCache retryContextCache;
  @Bean
  public Step correctionStep(JpaTransactionManager transactionManager) {
    return new StepBuilder("correction-step", jobRepository)
        .<String, List<BookingInfo>>chunk(10, transactionManager)
        .reader(customItemReader())
        .processor(correctionProcessor())
        .writer(customItemWriter)
        .taskExecutor(correctionTaskExecutor())
        .faultTolerant()
        .retryPolicy(retryPolicy())
        .backOffPolicy(exponentialBackOffPolicy())
        .retryContextCache(retryContextCache)
        .skipPolicy(skipPolicy())
        .build();
  }
I have bean definitions for RetryContext and RetryContextCache:
  @Bean
  RetryContextCache retryContextCache() {
    return new MapRetryContextCache();
  }
  @Bean
  RetryContext retryContext() {
    return new RetryContextSupport(null);
  }
Now, I use them in the processor like below:
@Component
public class CorrectionProcessor implements ItemProcessor <String, List <BookingInfo>> {
  @Autowired
  RetryContextCache retryContextCache;
  @Autowired
  RetryContext retryContext;
  public List<BookingInfo> process(String bookingId) throws Exception {
    List<BookingInfo> list = new ArrayList <> ();
    if (retryContextCache.containsKey(bookingId)) {
      list = (List<BookingInfo>) retryContextCache.get(bookingId).getAttribute(bookingId);
    } else {
      // fetch and populate list from database.
    }
    try {
      // do something with the list.
    } catch (Exception e) {
      // modify something in the list.
      retryContext.setAttribute(bookingId, list);
      retryContextCache.put(bookingId, retryContext);
      throw e;
    }
  }
}
You can see that I try to set some values in the retryContextCache before I re-throw the exception for the retry mechanism to work.
When retry happens, it goes inside the if condition mentioned in the above code.
But, the value of retryContextCache.get(bookingId).getAttribute(bookingId) is always null.
Am I setting the value in retry context incorrectly? Why is this not working?
答案1
得分: 0
这是我解决问题的方法。
我想在重试开始之前保存对象状态。RetryContext 的作用范围从错误抛出后立即开始。所以,我无法在 RetryContext 中设置值。
因此,我创建了一个带有 StepScope 的 bean:
@Bean
@StepScope
public Map<String, BookingInfo> objectStateMap() {
 return new ConcurrentHashMap<>();
}
然后,我在需要的地方自动装配了这个哈希表。
接着,在捕获块中的错误重新抛出之前,我将修改后的对象写入了这个哈希表。
然后,我为我的用例创建了一个 SkipListener。这将在失败时捕获被跳过的对象。然后,它将根据需要执行它们各自的任务。
@Component
@Slf4j
public class CustomSkipListener {
  @Autowired
  Map<String, List<BookingInfo>> objectStateMap;
  @OnSkipInRead
  public void onSkipInRead(Throwable t) {
    log.error("Read has skipped because of error : " + t.getMessage());
  }
  @OnSkipInWrite
  public void onSkipInWrite(List<BookingInfo> item, Throwable t) {
    log.error("Write has skipped because of error : " + t.getMessage());
  }
  @OnSkipInProcess
  public void onSkipInProcess(String bookingId, Throwable t) {
    List<BookingInfo> bookingInfos = objectStateMap.get(bookingId);
    // do some tasks..
    objectStateMap.remove(bookingId);
  }
}
只需将这个 listener 注册到主要的作业中。
英文:
Here is how I resolved the issue.
I wanted to save object state before the retry starts. The scope for RetryContext starts right after the error is thrown. So, I was not able to set the value in RetryContext.
So, I created a bean with StepScope:
@Bean
@StepScope
public Map<String,BookingInfo> objectStateMap() {
 return new ConcurrentHashMap<>();
}
And then, I autowired this hash wherever required.
Then, I wrote the modified objects to this hash before the error is re-thrown in the catch block.
Then, I created a SkipListener for my usecase. This will capture the skipped object in case of failure. And then, it will do their respective task accordingly.
@Component
@Slf4j
public class CustomSkipListener {
  @Autowired
  Map<String, List<BookingInfo>> objectStateMap;
  @OnSkipInRead
  public void onSkipInRead(Throwable t) {
    log.error("Read has skipped because of error : " + t.getMessage());
  }
  @OnSkipInWrite
  public void onSkipInWrite(List<BookingInfo> item, Throwable t) {
    log.error("Write has skipped because of error : " + t.getMessage());
  }
  @OnSkipInProcess
  public void onSkipInProcess(String bookingId, Throwable t) {
    List<BookingInfo> bookingInfos = objectStateMap.get(bookingId);
    // do some tasks..
    objectStateMap.remove(bookingId);
  }
}
Just registered this listener into the main job.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。


评论