Spring Batch – ItemWriter is writing same object read by ItemReader but not the one returned after processing through ItemProcessor

huangapple go评论134阅读模式
英文:

Spring Batch - ItemWriter is writing same object read by ItemReader but not the one returned after processing through ItemProcessor

问题

我的情景是:JdbcPagingItemReader 从 Oracle 数据库中读取并返回名为 'Employee' 的对象。然后,将这个 'Employee' 对象传递给 Processor,以便从多个表中获取更多信息,然后返回 'AggregatedEmployee' 对象(它实际上扩展了 Employee)。我使用 KafkaItemWriter 将处理过的对象写入 Kafka,但是写入程序(writer)尝试写入 'Employee' 本身,而不是写入 'AggregatedEmployee'。

@Mahmoud Ben Hassine: 我看到您在 Spring Batch 上提供了许多建议。请分享一下您的想法。

ProcessorInterface 代码:

public interface PageProcessor<T> {
    <R extends Employee> R process(T page);
}

Step Bean 代码:

@Bean
protected Step step1(CompositeJdbcPagingItemReader<Employee> reader, KafkaItemWriter<String, AggregatedEmployee> writer) {
    return steps.get("step1")
                .<Employee, AggregatedEmployee>chunk(5)
                .reader(reader)
                .writer(writer)
                .build();
}

ProcessorInterface 的实现类代码:

public class EmployeeProcessor implements PageProcessor<Employee> {
    private NamedParameterJdbcTemplate jdbcTemplate;

    public void setDataSource(DataSource dataSource) {
        jdbcTemplate = new NamedParameterJdbcTemplate(dataSource);
    }

    @SuppressWarnings("unchecked")
    @Override
    public <R extends Employee> R process(Employee page) {
        // ... 实现在这里
    }
}

KafkaItemWriter Bean:

@Bean
KafkaItemWriter<String, AggregatedEmployee> writer() {
    return new KafkaItemWriterBuilder<String, AggregatedEmployee>()
            .kafkaTemplate(aggregatedEmployeekafkaTemplate)
            .itemKeyMapper(aggregatedEmployee -> String.valueOf(aggregatedEmployee.getEmployeeId()))
            .build();
}

编辑后以显示 processor:

public class CompositeJdbcPagingItemReader<T> extends JdbcPagingItemReader<T> {
    private PageProcessor<T> pageProcessor;

    public void setPageProcessor(PageProcessor<T> pageProcessor) {
        this.pageProcessor = pageProcessor;
    }

当 Reader Bean 被创建时,processor 对象也会被创建并通过上面显示的 setter 设置到 reader 中,然后 EmployeeProcessor 中的处理逻辑也会被执行。

错误:

java.lang.ClassCastException: class com.sample.model.Employee cannot be cast to class com.sample.model.AggregatedEmployee (com.sample.model.Employee and com.sample.model.AggregatedEmployee are in unnamed module of loader 'app')
at org.springframework.batch.item.KeyValueItemWriter.write(KeyValueItemWriter.java:43)
英文:

My scenario is: JdbcPagingItemReader is reading from Oracle db and returning Object let say 'Employee'. Then this 'Employee' object is passed to Processor to make another call to db to pull more information from multiple tables and return 'AggregatedEmployee' object (it actually extends Employee). I am using KafkaItemWriter to write the processed object to Kafka but rather than writing AggregatedEmployee, writer is trying to write 'Employee' itself.

@Mahmoud Ben Hassine: I have seen your lot of suggestions on Spring Batch.Please share your thoughts.

ProcessorInterface Code:

public interface PageProcessor&lt;T&gt; {
       &lt;R extends Employee&gt; R process(T page);
}

Step Bean Code:

@Bean
protected Step step1 (CompositeJdbcPagingItemReader &lt;Employee&gt; reader, KafkaItemWriter &lt;String, AggregatedEmployee&gt; writer) {
	return steps.get(&quot;step1&quot;)
				.&lt;Employee, AggregatedEmployee&gt;chunk(5).
				reader(reader).
				writer(writer).build();
	}

Implementation Class of ProcessorInterface Code:

public class EmployeeProcessor implements PageProcessor&lt;Employee&gt; {
	private NamedParameterJdbcTemplate jdbcTemplate;

    public void setDataSource(DataSource dataSource) {
		jdbcTemplate = new NamedParameterJdbcTemplate(dataSource);
	}

    @SuppressWarnings(&quot;unchecked&quot;)
	@Override
	public &lt;R extends Employee&gt; R process(Employee page) {
             ... implementation goes here
    }

KafkaItemWriter Bean:

    @Bean
	KafkaItemWriter&lt;String,AggregatedEmployee&gt; writer(){
		return new KafkaItemWriterBuilder&lt;String, AggregatedEmployee&gt;()
				.kafkaTemplate(aggregatedEmployeekafkaTemplate)
				.itemKeyMapper(aggregatedEmployee -&gt; String.valueOf(aggregatedEmployee.getEmployeeId()))
				.build();
	}

Edited to show processor:

public class CompositeJdbcPagingItemReader&lt;T&gt; extends JdbcPagingItemReader&lt;T&gt; {
	private PageProcessor&lt;T&gt; pageProcessor;

	public void setPageProcessor(PageProcessor&lt;T&gt; pageProcessor) {
		this.pageProcessor = pageProcessor;
	}

And when Reader bean is created, processor object is also created and set into reader through above shown setter and processor logic written in EmployeeProcessor is also getting executed.

Error:

java.lang.ClassCastException: class com.sample.model.Employee cannot be cast to class com.sample.model.AggregatedEmployee (com.sample.model.Employee and com.sample.model.AggregatedEmployee are in unnamed module of loader &#39;app&#39;)
	at org.springframework.batch.item.KeyValueItemWriter.write(KeyValueItemWriter.java:43)

答案1

得分: 1

你没有在步骤中设置处理器:

@Bean
protected Step step1(CompositeJdbcPagingItemReader<Employee> reader, KafkaItemWriter<String, AggregatedEmployee> writer) {
    return steps.get("step1")
                .<Employee, AggregatedEmployee>chunk(5)
                .reader(reader)
                .processor(employeeProcessor()) // 在这里设置处理器
                .writer(writer)
                .build();
}

你需要设置处理器,在处理器中进行类型转换 Employee -> AggregatedEmployee

英文:

You did not set the processor on your step:

@Bean
protected Step step1 (CompositeJdbcPagingItemReader &lt;Employee&gt; reader, KafkaItemWriter &lt;String, AggregatedEmployee&gt; writer) {
	return steps.get(&quot;step1&quot;)
				.&lt;Employee, AggregatedEmployee&gt;chunk(5).
				reader(reader).
				writer(writer).build();
	}

You need to set the processor which will do the type conversion Employee -> AggregatedEmployee on your step.

huangapple
  • 本文由 发表于 2020年9月17日 23:42:01
  • 转载请务必保留本文链接:https://go.coder-hub.com/63941611.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定