如何在Spring Batch RetryContext中设置值

huangapple go评论102阅读模式
英文:

How to set values in Spring Batch RetryContext

问题

我有一个用于批处理作业的步骤定义如下:

  1. @Autowired
  2. RetryContextCache retryContextCache;
  3. @Bean
  4. public Step correctionStep(JpaTransactionManager transactionManager) {
  5. return new StepBuilder("correction-step", jobRepository)
  6. .<String, List<BookingInfo>>chunk(10, transactionManager)
  7. .reader(customItemReader())
  8. .processor(correctionProcessor())
  9. .writer(customItemWriter)
  10. .taskExecutor(correctionTaskExecutor())
  11. .faultTolerant()
  12. .retryPolicy(retryPolicy())
  13. .backOffPolicy(exponentialBackOffPolicy())
  14. .retryContextCache(retryContextCache)
  15. .skipPolicy(skipPolicy())
  16. .build();
  17. }

我有RetryContextRetryContextCache的bean定义:

  1. @Bean
  2. RetryContextCache retryContextCache() {
  3. return new MapRetryContextCache();
  4. }
  5. @Bean
  6. RetryContext retryContext() {
  7. return new RetryContextSupport(null);
  8. }

现在,我在processor中使用它们,如下所示:

  1. @Component
  2. public class CorrectionProcessor implements ItemProcessor <String, List <BookingInfo>> {
  3. @Autowired
  4. RetryContextCache retryContextCache;
  5. @Autowired
  6. RetryContext retryContext;
  7. public List<BookingInfo> process(String bookingId) throws Exception {
  8. List<BookingInfo> list = new ArrayList <> ();
  9. if (retryContextCache.containsKey(bookingId)) {
  10. list = (List<BookingInfo>) retryContextCache.get(bookingId).getAttribute(bookingId);
  11. } else {
  12. // fetch and populate list from database.
  13. }
  14. try {
  15. // do something with the list.
  16. } catch (Exception e) {
  17. // modify something in the list.
  18. retryContext.setAttribute(bookingId, list);
  19. retryContextCache.put(bookingId, retryContext);
  20. throw e;
  21. }
  22. }
  23. }

你可以看到,在重新抛出异常以便重试机制生效之前,我尝试在retryContextCache中设置一些值。

当重试发生时,它会进入上面代码中提到的if条件。但是,retryContextCache.get(bookingId).getAttribute(bookingId)的值始终为null

我是否错误地设置了重试上下文中的值?为什么不起作用?

英文:

I have the below step definition for a batch job:

  1. @Autowired
  2. RetryContextCache retryContextCache;
  3. @Bean
  4. public Step correctionStep(JpaTransactionManager transactionManager) {
  5. return new StepBuilder("correction-step", jobRepository)
  6. .<String, List<BookingInfo>>chunk(10, transactionManager)
  7. .reader(customItemReader())
  8. .processor(correctionProcessor())
  9. .writer(customItemWriter)
  10. .taskExecutor(correctionTaskExecutor())
  11. .faultTolerant()
  12. .retryPolicy(retryPolicy())
  13. .backOffPolicy(exponentialBackOffPolicy())
  14. .retryContextCache(retryContextCache)
  15. .skipPolicy(skipPolicy())
  16. .build();
  17. }

I have bean definitions for RetryContext and RetryContextCache:

  1. @Bean
  2. RetryContextCache retryContextCache() {
  3. return new MapRetryContextCache();
  4. }
  5. @Bean
  6. RetryContext retryContext() {
  7. return new RetryContextSupport(null);
  8. }

Now, I use them in the processor like below:

  1. @Component
  2. public class CorrectionProcessor implements ItemProcessor <String, List <BookingInfo>> {
  3. @Autowired
  4. RetryContextCache retryContextCache;
  5. @Autowired
  6. RetryContext retryContext;
  7. public List<BookingInfo> process(String bookingId) throws Exception {
  8. List<BookingInfo> list = new ArrayList <> ();
  9. if (retryContextCache.containsKey(bookingId)) {
  10. list = (List<BookingInfo>) retryContextCache.get(bookingId).getAttribute(bookingId);
  11. } else {
  12. // fetch and populate list from database.
  13. }
  14. try {
  15. // do something with the list.
  16. } catch (Exception e) {
  17. // modify something in the list.
  18. retryContext.setAttribute(bookingId, list);
  19. retryContextCache.put(bookingId, retryContext);
  20. throw e;
  21. }
  22. }
  23. }

You can see that I try to set some values in the retryContextCache before I re-throw the exception for the retry mechanism to work.

When retry happens, it goes inside the if condition mentioned in the above code.
But, the value of retryContextCache.get(bookingId).getAttribute(bookingId) is always null.

Am I setting the value in retry context incorrectly? Why is this not working?

答案1

得分: 0

这是我解决问题的方法。

我想在重试开始之前保存对象状态。RetryContext 的作用范围从错误抛出后立即开始。所以,我无法在 RetryContext 中设置值。

因此,我创建了一个带有 StepScope 的 bean:

  1. @Bean
  2. @StepScope
  3. public Map<String, BookingInfo> objectStateMap() {
  4. return new ConcurrentHashMap<>();
  5. }

然后,我在需要的地方自动装配了这个哈希表。

接着,在捕获块中的错误重新抛出之前,我将修改后的对象写入了这个哈希表。

然后,我为我的用例创建了一个 SkipListener。这将在失败时捕获被跳过的对象。然后,它将根据需要执行它们各自的任务。

  1. @Component
  2. @Slf4j
  3. public class CustomSkipListener {
  4. @Autowired
  5. Map<String, List<BookingInfo>> objectStateMap;
  6. @OnSkipInRead
  7. public void onSkipInRead(Throwable t) {
  8. log.error("Read has skipped because of error : " + t.getMessage());
  9. }
  10. @OnSkipInWrite
  11. public void onSkipInWrite(List<BookingInfo> item, Throwable t) {
  12. log.error("Write has skipped because of error : " + t.getMessage());
  13. }
  14. @OnSkipInProcess
  15. public void onSkipInProcess(String bookingId, Throwable t) {
  16. List<BookingInfo> bookingInfos = objectStateMap.get(bookingId);
  17. // do some tasks..
  18. objectStateMap.remove(bookingId);
  19. }
  20. }

只需将这个 listener 注册到主要的作业中。

英文:

Here is how I resolved the issue.

I wanted to save object state before the retry starts. The scope for RetryContext starts right after the error is thrown. So, I was not able to set the value in RetryContext.

So, I created a bean with StepScope:

  1. @Bean
  2. @StepScope
  3. public Map&lt;String,BookingInfo&gt; objectStateMap() {
  4. return new ConcurrentHashMap&lt;&gt;();
  5. }

And then, I autowired this hash wherever required.

Then, I wrote the modified objects to this hash before the error is re-thrown in the catch block.

Then, I created a SkipListener for my usecase. This will capture the skipped object in case of failure. And then, it will do their respective task accordingly.

  1. @Component
  2. @Slf4j
  3. public class CustomSkipListener {
  4. @Autowired
  5. Map&lt;String, List&lt;BookingInfo&gt;&gt; objectStateMap;
  6. @OnSkipInRead
  7. public void onSkipInRead(Throwable t) {
  8. log.error(&quot;Read has skipped because of error : &quot; + t.getMessage());
  9. }
  10. @OnSkipInWrite
  11. public void onSkipInWrite(List&lt;BookingInfo&gt; item, Throwable t) {
  12. log.error(&quot;Write has skipped because of error : &quot; + t.getMessage());
  13. }
  14. @OnSkipInProcess
  15. public void onSkipInProcess(String bookingId, Throwable t) {
  16. List&lt;BookingInfo&gt; bookingInfos = objectStateMap.get(bookingId);
  17. // do some tasks..
  18. objectStateMap.remove(bookingId);
  19. }
  20. }

Just registered this listener into the main job.

huangapple
  • 本文由 发表于 2023年6月15日 02:07:58
  • 转载请务必保留本文链接:https://go.coder-hub.com/76476432.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定