英文:
Spring Batch - ItemWriter is writing same object read by ItemReader but not the one returned after processing through ItemProcessor
问题
我的情景是:JdbcPagingItemReader
从 Oracle 数据库中读取并返回名为 'Employee' 的对象。然后,将这个 'Employee' 对象传递给 Processor,以便从多个表中获取更多信息,然后返回 'AggregatedEmployee' 对象(它实际上扩展了 Employee)。我使用 KafkaItemWriter
将处理过的对象写入 Kafka,但是写入程序(writer)尝试写入 'Employee' 本身,而不是写入 'AggregatedEmployee'。
@Mahmoud Ben Hassine: 我看到您在 Spring Batch 上提供了许多建议。请分享一下您的想法。
ProcessorInterface 代码:
public interface PageProcessor<T> {
<R extends Employee> R process(T page);
}
Step Bean 代码:
@Bean
protected Step step1(CompositeJdbcPagingItemReader<Employee> reader, KafkaItemWriter<String, AggregatedEmployee> writer) {
return steps.get("step1")
.<Employee, AggregatedEmployee>chunk(5)
.reader(reader)
.writer(writer)
.build();
}
ProcessorInterface 的实现类代码:
public class EmployeeProcessor implements PageProcessor<Employee> {
private NamedParameterJdbcTemplate jdbcTemplate;
public void setDataSource(DataSource dataSource) {
jdbcTemplate = new NamedParameterJdbcTemplate(dataSource);
}
@SuppressWarnings("unchecked")
@Override
public <R extends Employee> R process(Employee page) {
// ... 实现在这里
}
}
KafkaItemWriter Bean:
@Bean
KafkaItemWriter<String, AggregatedEmployee> writer() {
return new KafkaItemWriterBuilder<String, AggregatedEmployee>()
.kafkaTemplate(aggregatedEmployeekafkaTemplate)
.itemKeyMapper(aggregatedEmployee -> String.valueOf(aggregatedEmployee.getEmployeeId()))
.build();
}
编辑后以显示 processor:
public class CompositeJdbcPagingItemReader<T> extends JdbcPagingItemReader<T> {
private PageProcessor<T> pageProcessor;
public void setPageProcessor(PageProcessor<T> pageProcessor) {
this.pageProcessor = pageProcessor;
}
当 Reader Bean 被创建时,processor 对象也会被创建并通过上面显示的 setter 设置到 reader 中,然后 EmployeeProcessor 中的处理逻辑也会被执行。
错误:
java.lang.ClassCastException: class com.sample.model.Employee cannot be cast to class com.sample.model.AggregatedEmployee (com.sample.model.Employee and com.sample.model.AggregatedEmployee are in unnamed module of loader 'app')
at org.springframework.batch.item.KeyValueItemWriter.write(KeyValueItemWriter.java:43)
英文:
My scenario is: JdbcPagingItemReader
is reading from Oracle db and returning Object let say 'Employee'. Then this 'Employee
' object is passed to Processor to make another call to db to pull more information from multiple tables and return 'AggregatedEmployee
' object (it actually extends Employee). I am using KafkaItemWriter
to write the processed object to Kafka but rather than writing AggregatedEmployee
, writer is trying to write 'Employee' itself.
@Mahmoud Ben Hassine: I have seen your lot of suggestions on Spring Batch.Please share your thoughts.
ProcessorInterface Code:
public interface PageProcessor<T> {
<R extends Employee> R process(T page);
}
Step Bean Code:
@Bean
protected Step step1 (CompositeJdbcPagingItemReader <Employee> reader, KafkaItemWriter <String, AggregatedEmployee> writer) {
return steps.get("step1")
.<Employee, AggregatedEmployee>chunk(5).
reader(reader).
writer(writer).build();
}
Implementation Class of ProcessorInterface Code:
public class EmployeeProcessor implements PageProcessor<Employee> {
private NamedParameterJdbcTemplate jdbcTemplate;
public void setDataSource(DataSource dataSource) {
jdbcTemplate = new NamedParameterJdbcTemplate(dataSource);
}
@SuppressWarnings("unchecked")
@Override
public <R extends Employee> R process(Employee page) {
... implementation goes here
}
KafkaItemWriter Bean:
@Bean
KafkaItemWriter<String,AggregatedEmployee> writer(){
return new KafkaItemWriterBuilder<String, AggregatedEmployee>()
.kafkaTemplate(aggregatedEmployeekafkaTemplate)
.itemKeyMapper(aggregatedEmployee -> String.valueOf(aggregatedEmployee.getEmployeeId()))
.build();
}
Edited to show processor:
public class CompositeJdbcPagingItemReader<T> extends JdbcPagingItemReader<T> {
private PageProcessor<T> pageProcessor;
public void setPageProcessor(PageProcessor<T> pageProcessor) {
this.pageProcessor = pageProcessor;
}
And when Reader bean is created, processor object is also created and set into reader through above shown setter and processor logic written in EmployeeProcessor is also getting executed.
Error:
java.lang.ClassCastException: class com.sample.model.Employee cannot be cast to class com.sample.model.AggregatedEmployee (com.sample.model.Employee and com.sample.model.AggregatedEmployee are in unnamed module of loader 'app')
at org.springframework.batch.item.KeyValueItemWriter.write(KeyValueItemWriter.java:43)
答案1
得分: 1
你没有在步骤中设置处理器:
@Bean
protected Step step1(CompositeJdbcPagingItemReader<Employee> reader, KafkaItemWriter<String, AggregatedEmployee> writer) {
return steps.get("step1")
.<Employee, AggregatedEmployee>chunk(5)
.reader(reader)
.processor(employeeProcessor()) // 在这里设置处理器
.writer(writer)
.build();
}
你需要设置处理器,在处理器中进行类型转换 Employee
-> AggregatedEmployee
。
英文:
You did not set the processor on your step:
@Bean
protected Step step1 (CompositeJdbcPagingItemReader <Employee> reader, KafkaItemWriter <String, AggregatedEmployee> writer) {
return steps.get("step1")
.<Employee, AggregatedEmployee>chunk(5).
reader(reader).
writer(writer).build();
}
You need to set the processor which will do the type conversion Employee
-> AggregatedEmployee
on your step.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论