如何控制在将它们写入数据库之前处理块中的元素?

huangapple go评论62阅读模式
英文:

How control chunk processed elements before write them to database?

问题

I have this situation in my jt:

  • Reader: 读取一个大型csv文件
  • Processor: 处理行,使用数据库Select检查数据,如果数据存在,则标记为更新。如果不存在,则标记为插入。
  • Writer: 分类器将项目重定向到插入/更新的写入器。

我的commit-interval是1000。如果我的文件在相同的“chunk”间隔中包含重复的项目,如何检测这些项目应标记为更新而不是插入?

文件示例:

Item1Code|2023-05-01|02 --> 检测不存在,标记为插入
Item2Code|2023-05-02|03 --> 检测不存在,标记为插入
Item3Code|2023-05-03|03 --> 检测不存在,标记为插入
Item1Code|2023-05-04|03 --> 检测不存在,标记为插入(应该被检测为存在,因为它将在第1行插入)
Item4Code|2023-05-05|03 --> 检测不存在,标记为插入
...
在此提交 <<

我认为只有使用commit-interval 1才能解决这个问题。在Spring Batch中有一些工具可以解决这种类型的问题吗?

提前感谢。

英文:

I have this situation in my jt:

  • Reader: Read a big csv file
  • Processor: Process lines, check data using a database Select, if data exists, marks it to update. If not exists marks it to insert.
  • Writer: Classifier to redirect the item to insert/update Writers.

My commit-interval is 1000. If my file contains repeated items in the same "chunk" interval. ¿How can detect that these item should be marked as updated instead insert?

File example:

Item1Code|2023-05-01|02 --&gt; detect not exists, mark as insert
Item2Code|2023-05-02|03 --&gt; detect not exists, mark as insert
Item3Code|2023-05-03|03 --&gt; detect not exists, mark as insert
Item1Code|2023-05-04|03 --&gt; detect not exists, mark as insert (should be detected as exists because It will inserted in line1)
Item4Code|2023-05-05|03 --&gt; detect not exists, mark as insert
...
commit here &lt;&lt;

I only think this can be resolved using commit-interval 1. There are some tools in Spring batch to resolve this type of problem?

thanks in advance

答案1

得分: 1

没有针对这个特定问题的开箱即用的工具。然而,可以使用内存缓存和ChunkListener接口轻松实现。

组件的代码示例:

@Component
public class InsertedCache implements ChunkListener {
    private Set<String> cache = new HashSet<>();

    @Override
    public void beforeChunk(ChunkContext context) {
        cache.clear();
    }

    public boolean isInserted(String id) {
        return !cache.add(id);
    }
}

ItemProcessor中的使用示例:

@Slf4j
public class CustomerItemProcessor implements ItemProcessor {

    @Autowired
    private InsertedCache insertedCache;

    @Override
    public Object process(Object item) {
        if (item instanceof Customer) {
            Customer customer = (Customer) item;
            if (insertedCache.isInserted(customer.getName())) {
                log.info("already inserted customer: {}", customer);
            }
        }
        return item;
    }
}
英文:

There is no out-of-the-box tool for this specific problem. However it can be easy implemented using an in-memory cache and the ChunkListener interface.

Code example of the component:

@Component
public class InsertedCache implements ChunkListener {
    private Set&lt;String&gt; cache = new HashSet&lt;&gt;();

    @Override
    public void beforeChunk(ChunkContext context) {
        cache.clear();
    }

    public boolean isInserted(String id) {
        return !cache.add(id);
    }
}

Usage example in the ItemProcessor:

@Slf4j
public class CustomerItemProcessor implements ItemProcessor {

    @Autowired
    private InsertedCache insertedCache;

    @Override
    public Object process(Object item) {
        if (item instanceof Customer) {
            Customer customer = (Customer) item;
            if (insertedCache.isInserted(customer.getName())) {
                log.info(&quot;already iserted customer: {}&quot;, customer);
            }
        }
        return item;
    }
}

huangapple
  • 本文由 发表于 2023年5月17日 15:05:49
  • 转载请务必保留本文链接:https://go.coder-hub.com/76269361.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定