春季批处理 SkipListener 在跳过的项目时未被调用

huangapple go评论77阅读模式
英文:

Spring Batch SkipListener is not called for skipped item

问题

在我的Spring Batch Job中,我有一个Step,其中一些错误会被跳过。运行后,我还可以从数据库中看到发生了跳过。

问题: 被跳过的项目从未经过skipListener

我已经验证如下:

  • skipListener中的断点从未触发。
  • 被跳过的错误没有被记录。
  • 被跳过的错误没有添加到我的自定义错误列表中。

我还有另一个非常相似的工作,其中skip监听器正常工作。我无法在skipListener或作业配置中找到任何重大差异。这两个作业甚至共享相同的skip策略。

我尝试为该步骤添加另一个监听器(ItemProcessListener),不知何故它完美地工作,尽管这两个监听器配置方式相同。

这种行为的原因可能是什么?


源代码

该步骤:

@Bean(name = Beans.STEP)
public Step step(
        @Qualifier(Beans.READER) MyBatisCursorItemReader<CustomItem> reader, 
        CustomItemProcessor processor,
        @Qualifier(Beans.WRITER) ItemWriter<CustomItem> writer,
        @Qualifier(Beans.SKIP_LISTENER) SkipListener<CustomItem, CustomItem> skipListener,
        @Qualifier(Beans.PROCESS_LISTENER) ItemProcessListener<CustomItem, CustomItem> processListener) {
            
    return stepBuilderFactory.get(Beans.STEP)
            .<CustomItem, CustomItem> chunk(1)
            .reader(reader)
            .processor(processor)
            .writer(writer)
            .faultTolerant()
            .skipPolicy(new CustomSkipPolicy())
            .listener(skipListener) // 不起作用
            .listener(processListener) // 起作用
            .build();
}  

处理监听器:
这在步骤中正常工作。

@Bean(name = Beans.PROCESS_LISTENER)
ItemProcessListener<CustomItem, CustomItem> listener() {
    return new ItemProcessListener<CustomItem, CustomItem> () {

        @Override
        public void beforeProcess(CustomItem item) {
            log.trace("before process");
            
        }

        @Override
        public void afterProcess(CustomItem item, CustomItem result) {
            log.trace("after process");
            
        }

        @Override
        public void onProcessError(CustomItem item, Exception e) {
            log.trace("on process error");
        }
    };
}

跳过监听器:
这些方法从未被调用。

@Bean(name = Beans.SKIP_LISTENER)
SkipListener<CustomItem, CustomItem> skipListener() {
    return new SkipListener<CustomItem, CustomItem>() {

        @Override
        public void onSkipInRead(Throwable t) {
            log.trace("skip on read");
            
        }

        @Override
        public void onSkipInWrite(CustomItem item, Throwable t) {
            log.trace("skip on write");
        }

        @Override
        public void onSkipInProcess(CustomItem item, Throwable t) {
            log.trace("skip on process");
        }
    };
}

跳过策略:
跳过策略对于跳过执行某些推断逻辑和记录操作。我省略了与skipListener无关的部分。

另外,我尝试用skip(RuntimeException.class)方法替换skipPolicy()配置步骤。然后,skipListener也不起作用。

public class CustomSkipPolicy extends GeneralBatchSkipPolicy {
    
    @Override
    public boolean shouldSkip(Throwable t, int skipCount) {
        boolean isSkippable;
        
        // 在中间执行其他操作
        
        return isSkippable;
    }
}
英文:

In my Spring Batch Job, I have one Step where some of the errors are skipped. After the run, I can also see from the database that a skip has occurred.

Problem: the skipped items never pass through the skipListener.

I have verified this as follows:

  • the breakpoint in the skipListener is never triggered
  • the skipped error isn't logged
  • the skipped error isn't added to my custom error list

I also have another very similar job where the skip listener works correctly. I cannot find any significant difference in either the skip listeners or job configurations. The jobs even share the same skip policy.

I tried adding another listener (ItemProcessListener) to the step. Somehow that works perfectly, even though both listeners are configured the same way.

What could be the reason for this behaviour?


Source code

The step:

@Bean(name = Beans.STEP)
public Step step(
		@Qualifier(Beans.READER) MyBatisCursorItemReader&lt;CustomItem&gt; reader, 
		CustomItemProcessor processor,
		@Qualifier(Beans.WRITER) ItemWriter&lt;CustomItem&gt; writer,
		@Qualifier(Beans.SKIP_LISTENER) SkipListener&lt;CustomItem, CustomItem&gt; skipListener,
		@Qualifier(Beans.PROCESS_LISTENER) ItemProcessListener&lt;CustomItem, CustomItem&gt; processListener) {
			
	return stepBuilderFactory.get(Beans.STEP)
			.&lt;CustomItem, CustomItem&gt; chunk(1)
			.reader(reader)
			.processor(processor)
			.writer(writer)
			.faultTolerant()
 			.skipPolicy(new CustomSkipPolicy())
	    	.listener(skipListener) // doesn&#39;t work
			.listener(processListener) // does work
			.build();
}        

Process listener:
This works correctly in the step.

@Bean(name = Beans.PROCESS_LISTENER)
ItemProcessListener&lt;CustomItem, CustomItem&gt; listener() {
	return new ItemProcessListener&lt;CustomItem, CustomItem&gt; () {

		@Override
		public void beforeProcess(CustomItem item) {
			log.trace(&quot;before process&quot;);
			
		}

		@Override
		public void afterProcess(CustomItem item, CustomItem result) {
			log.trace(&quot;after process&quot;);
			
		}

		@Override
		public void onProcessError(CustomItem item, Exception e) {
			log.trace(&quot;on process error&quot;);
		}
	};
}

Skip listener:
These methods are never called.

@Bean(name = Beans.SKIP_LISTENER)
SkipListener&lt;CustomItem, CustomItem&gt; skipListener() {
	return new SkipListener&lt;CustomItem, CustomItem&gt;() {

		@Override
		public void onSkipInRead(Throwable t) {
			log.trace(&quot;skip on read&quot;);
			
		}

		@Override
		public void onSkipInWrite(CustomItem item, Throwable t) {
			log.trace(&quot;skip on write&quot;);
		}

		@Override
		public void onSkipInProcess(CustomItem item, Throwable t) {
			log.trace(&quot;skip on process&quot;);
		}
	};
}

Skip policy:
The skip policy does some deduction logic for skipping and also does logging. I have left out the non-relevant parts, because the deduction and logging aren't related to the skipListener in any way.

Also, I have tried configuring the step by replacing skipPolicy() with the skip(RuntimeException.class) method. Skip listener didn't work then either.

public class CustomSkipPolicy extends GeneralBatchSkipPolicy {
	
	@Override
	public boolean shouldSkip(Throwable t, int skipCount) {
		boolean isSkippable;
        
        // do stuff in between
       
        return isSkippable;
	}
}

答案1

得分: 0

我不太确定可能的根本原因是什么,但有一个解决方法来修复这个问题。

完全相同的问题在以下版本上重现,并且经过了解决方法的测试:

Spring Boot 版本 - 3.1.1 和 Spring Batch 版本 - 5.0.2

为了使 SkipListener 生效,创建一个 ItemWriteListener 和一个 ItemProcessListener,将它们的主体留空,并且在步骤配置中添加这两个监听器。请参阅下面的示例。

就是这样!!

SkipListener

@Component
@Qualifier(value = "mySkipListenerBean")
@StepScope
public class MySkipListener implements SkipListener<MyEntityA, MyEntityB> {

    @Override
    public void onSkipInRead(Throwable t) {
        // 在这里处理你的逻辑
    }

    @Override
    public void onSkipInWrite(MyEntityB myEntityB, Throwable t) {
        // 在这里处理你的逻辑
    }

    @Override
    public void onSkipInProcess(MyEntityA myEntityA, Throwable t) {
        // 在这里处理你的逻辑
    }
}

ProcessListener

@Component
@Qualifier(value = "myProcessListenerBean")
public class MyProcessListener implements ItemProcessListener<MyEntityA, MyEntityB> {

    @Override
    public void beforeProcess(MyEntityA myEntityA ) {}

    @Override
    public void afterProcess(MyEntityA myEntityA, MyEntityB myEntityB) {}

    @Override
    public void onProcessError(MyEntityA myEntityA, Exception e) {}
}

WriteListener

@Component
@Qualifier(value = "myWriteListenerBean")
public class MyWriteListener implements ItemWriteListener<MyEntityB> {

    @Override
    public void beforeWrite(Chunk<? extends MyEntityB> items) {}

    @Override
    public void afterWrite(Chunk<? extends MyEntityB> items) {}

    @Override
    public void onWriteError(Exception exception, Chunk<? extends MyEntityB> items) {}
}

步骤配置

@Component
public class MyStep {
	
	@Autowired
    @Qualifier(value = "myItemReaderBean")
    private ItemReader<MyEntityA> itemReader;

    @Autowired
    @Qualifier(value = "myItemProcessorBean")
    private ItemProcessor<MyEntityA, MyEntityB> itemProcessor;

    @Autowired
    @Qualifier(value = "myItemWriterBean")
    private ItemWriter<MyEntityB> itemWriter;

    @Autowired
    @Qualifier(value = "mySkipListenerBean")
    private SkipListener<MyEntityA, MyEntityB> skipListener;

    @Autowired
    @Qualifier(value = "myProcessListenerBean")
    private ItemProcessListener<MyEntityA, MyEntityB> itemProcessListener;

    @Autowired
    @Qualifier(value = "myWriteListenerBean")
    private ItemWriteListener<MyEntityB> itemWriteListener;
	
	@Bean(name = "myStepBean")
    public Step getMyStep(JobRepository jobRepository, PlatformTransactionManager transactionManager){
        return new StepBuilder("my-step", jobRepository)
                .<MyEntityA, MyEntityB>chunk(100, transactionManager)
                .reader(itemReader)
                .processor(itemProcessor)
                .writer(itemWriter)
                .listener(skipListener)
                .listener(itemProcessListener)
                .listener(itemWriteListener)
                .build();
    }
}

我测试了多种情况来使步骤监听器生效,如下所示:

不生效:

.listener(skipListener)

不生效:

.listener(skipListener)
.listener(itemProcessListener)

不生效:

.listener(skipListener)
.listener(itemWriteListener)

生效:

.listener(skipListener)
.listener(itemProcessListener)
.listener(itemWriteListener)
英文:

I am not fully sure what could be the root cause, but there is a workaround to fix this issue.

The exact same issue reproduced and workaround fix tested on :

Spring Boot version - 3.1.1 & Spring Batch version - 5.0.2

To make the SkipListener work, create an ItemWriteListener and an ItemProcessListener, leave their body blank and add those two listeners as well in the step config. See the example below.

That's it !!

SkipListener

@Component
@Qualifier(value = &quot;mySkipListenerBean&quot;)
@StepScope
public class MySkipListener implements SkipListener&lt;MyEntityA, MyEntityB&gt; {

    @Override
    public void onSkipInRead(Throwable t) {
        //do your stuff here
    }

    @Override
    public void onSkipInWrite(MyEntityB myEntityB, Throwable t) {
        //do your stuff here
    }

    @Override
    public void onSkipInProcess(MyEntityA myEntityA, Throwable t) {
        //do your stuff here
    }
}

ProcessListener

@Component
@Qualifier(value = &quot;myProcessListenerBean&quot;)
public class MyProcessListener implements ItemProcessListener&lt;MyEntityA, MyEntityB&gt; {

    @Override
    public void beforeProcess(MyEntityA myEntityA ) {}

    @Override
    public void afterProcess(MyEntityA myEntityA, MyEntityB myEntityB) {}

    @Override
    public void onProcessError(MyEntityA myEntityA, Exception e) {}
}

WriteListener

@Component
@Qualifier(value = &quot;myWriteListenerBean&quot;)
public class MyWriteListener implements ItemWriteListener&lt;MyEntityB&gt; {

    @Override
    public void beforeWrite(Chunk&lt;? extends MyEntityB&gt; items) {}

    @Override
    public void afterWrite(Chunk&lt;? extends MyEntityB&gt; items) {}

    @Override
    public void onWriteError(Exception exception, Chunk&lt;? extends MyEntityB&gt; items) {}
}

Step Configuration

@Component
public class MyStep {
	
	@Autowired
    @Qualifier(value = &quot;myItemReaderBean&quot;)
    private ItemReader&lt;MyEntityA&gt; itemReader;

    @Autowired
    @Qualifier(value = &quot;myItemProcessorBean&quot;)
    private ItemProcessor&lt;MyEntityA, MyEntityB&gt; itemProcessor;

    @Autowired
    @Qualifier(value = &quot;myItemWriterBean&quot;)
    private ItemWriter&lt;MyEntityB&gt; itemWriter;

    @Autowired
    @Qualifier(value = &quot;mySkipListenerBean&quot;)
    private SkipListener&lt;MyEntityA, MyEntityB&gt; skipListener;

    @Autowired
    @Qualifier(value = &quot;myProcessListenerBean&quot;)
    private ItemProcessListener&lt;MyEntityA, MyEntityB&gt; itemProcessListener;

    @Autowired
    @Qualifier(value = &quot;myWriteListenerBean&quot;)
    private ItemWriteListener&lt;MyEntityB&gt; itemWriteListener;
	
	@Bean(name = &quot;myStepBean&quot;)
    public Step getMyStep(JobRepository jobRepository, PlatformTransactionManager transactionManager){
        return new StepBuilder(&quot;my-step&quot;, jobRepository)
                .&lt;MyEntityA, MyEntityB&gt;chunk(100, transactionManager)
                .reader(itemReader)
                .processor(itemProcessor)
                .writer(itemWriter)
                .listener(skipListener)
                .listener(itemProcessListener)
                .listener(itemWriteListener)
                .build();
    }
}

I tested multiple scenarios to make the step listener work, See below :

Not Working :

.listener(skipListener)

Not Working :

.listener(skipListener)
.listener(itemProcessListener)

Not Working :

.listener(skipListener)
.listener(itemWriteListener)

Working :

.listener(skipListener)
.listener(itemProcessListener)
.listener(itemWriteListener)

huangapple
  • 本文由 发表于 2020年10月22日 14:57:09
  • 转载请务必保留本文链接:https://go.coder-hub.com/64476857.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定