英文:
How control chunk processed elements before write them to database?
问题
I have this situation in my jt:
- Reader: 读取一个大型csv文件
- Processor: 处理行,使用数据库Select检查数据,如果数据存在,则标记为更新。如果不存在,则标记为插入。
- Writer: 分类器将项目重定向到插入/更新的写入器。
我的commit-interval是1000。如果我的文件在相同的“chunk”间隔中包含重复的项目,如何检测这些项目应标记为更新而不是插入?
文件示例:
Item1Code|2023-05-01|02 --> 检测不存在,标记为插入
Item2Code|2023-05-02|03 --> 检测不存在,标记为插入
Item3Code|2023-05-03|03 --> 检测不存在,标记为插入
Item1Code|2023-05-04|03 --> 检测不存在,标记为插入(应该被检测为存在,因为它将在第1行插入)
Item4Code|2023-05-05|03 --> 检测不存在,标记为插入
...
在此提交 <<
我认为只有使用commit-interval 1才能解决这个问题。在Spring Batch中有一些工具可以解决这种类型的问题吗?
提前感谢。
英文:
I have this situation in my jt:
- Reader: Read a big csv file
- Processor: Process lines, check data using a database Select, if data exists, marks it to update. If not exists marks it to insert.
- Writer: Classifier to redirect the item to insert/update Writers.
My commit-interval is 1000. If my file contains repeated items in the same "chunk" interval. ¿How can detect that these item should be marked as updated instead insert?
File example:
Item1Code|2023-05-01|02 --> detect not exists, mark as insert
Item2Code|2023-05-02|03 --> detect not exists, mark as insert
Item3Code|2023-05-03|03 --> detect not exists, mark as insert
Item1Code|2023-05-04|03 --> detect not exists, mark as insert (should be detected as exists because It will inserted in line1)
Item4Code|2023-05-05|03 --> detect not exists, mark as insert
...
commit here <<
I only think this can be resolved using commit-interval 1. There are some tools in Spring batch to resolve this type of problem?
thanks in advance
答案1
得分: 1
没有针对这个特定问题的开箱即用的工具。然而,可以使用内存缓存和ChunkListener
接口轻松实现。
组件的代码示例:
@Component
public class InsertedCache implements ChunkListener {
private Set<String> cache = new HashSet<>();
@Override
public void beforeChunk(ChunkContext context) {
cache.clear();
}
public boolean isInserted(String id) {
return !cache.add(id);
}
}
在ItemProcessor
中的使用示例:
@Slf4j
public class CustomerItemProcessor implements ItemProcessor {
@Autowired
private InsertedCache insertedCache;
@Override
public Object process(Object item) {
if (item instanceof Customer) {
Customer customer = (Customer) item;
if (insertedCache.isInserted(customer.getName())) {
log.info("already inserted customer: {}", customer);
}
}
return item;
}
}
英文:
There is no out-of-the-box tool for this specific problem. However it can be easy implemented using an in-memory cache and the ChunkListener interface.
Code example of the component:
@Component
public class InsertedCache implements ChunkListener {
private Set<String> cache = new HashSet<>();
@Override
public void beforeChunk(ChunkContext context) {
cache.clear();
}
public boolean isInserted(String id) {
return !cache.add(id);
}
}
Usage example in the ItemProcessor:
@Slf4j
public class CustomerItemProcessor implements ItemProcessor {
@Autowired
private InsertedCache insertedCache;
@Override
public Object process(Object item) {
if (item instanceof Customer) {
Customer customer = (Customer) item;
if (insertedCache.isInserted(customer.getName())) {
log.info("already iserted customer: {}", customer);
}
}
return item;
}
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论