英文:
Java - Concurrent processing with a common executor service instance
问题
我有n个工作线程,从Kinesis流中检索记录(这对于此问题不重要),然后将这些记录推送到执行器服务,记录将在执行器服务中处理并持久化到后端数据库。对于所有工作线程,都会使用相同的执行器服务实例。
现在有一种情况,任何给定的工作循环在处理记录时会停止,并阻塞,直到由它提交的所有记录完全处理完毕。这实际上意味着对于来自特定工作线程的记录,执行器服务中不应该有未处理/运行中的线程。
一个非常简单的实现示例如下:
-
工作类
public class Worker {
Worker(Listener listener){
this.listener = listener;
}
//定期调用以从Kinesis流中获取记录
public void processRecords(Record records) {
for (Record record : records) {
listener.handleRecord(record);
}
//如果经过了15分钟,运行以下代码。这是阻塞的。
listener.blockTillAllRecordsAreProcessed()
}
}
-
监听器类
public class Listener {
ExecutorService es;
// 相同的执行器服务在所有监听器之间共享。
Listener(ExecutorService es){
this.es = es;
}
public void handleRecord(Record record) {
//将记录提交给es并返回
// 非阻塞的
}
public boolean blockTillAllRecordsAreProcessed(){
// 这应该阻塞,直到所有记录都被处理
// 不清楚如何在共同的es中实现这一点
}
}
我唯一想到的方法是为每个工作线程创建一个本地执行器服务,并对每个批次执行类似于invokeAll
的操作,这会稍微更改实现,但可以完成工作。但我觉得应该有一种更好的方法来解决这个问题。
英文:
I have n number of worker threads that retrieve records from a kinesis stream (this is not important for this problem), which are then pushed on to an executor service where the records are processed and persisted to a backend database. This same executor service instance is used for all worker threads.
Now there is a scenario where any given worker loop stops processing records and blocks until all records that were submitted by it are processed completely. This essentially means that there should be no pending/running threads in the executor service for a record from that particular worker thread.
A very trivial example of the implementation is like this:
-
Worker class
public class Worker {
Worker(Listener listener){
this.listener = listener;
}
//called periodically to fetch records from a kinesis stream
public void processRecords(Record records) {
for (Record record : records) {
listener.handleRecord(record);
}
//if 15 minutes has elapsed, run below code. This is blocking.
listener.blockTillAllRecordsAreProcessed()
}
}
-
Listener class
public class Listener {
ExecutorService es;
// same executor service is shared across all listeners.
Listener(ExecutorService es){
this.es = es;
}
public void handleRecord(Record record) {
//submit record to es and return
// non blocking
}
public boolean blockTillAllRecordsAreProcessed(){
// this should block until all records are processed
// no clue how to implement this with a common es
}
}
The only approach I could think of is to have a local executor service for each worker and do something like invokeAll
for each batch, which would be change the implementation slightly but get the job done. but I feel like there should be a better approach to tackle this problem.
答案1
得分: 1
你可以使用CountDownLatch类来实现如下阻塞操作:
public void processRecords(List<Record> records) {
CountDownLatch latch = new CountDownLatch(records.size());
for (Record record : records) {
listener.handleRecord(record, latch);
}
// 如果经过了15分钟,执行以下代码。这是一个阻塞操作。
listener.blockTillAllRecordsAreProcessed(latch);
}
public class Listener {
ExecutorService es;
...
public void handleRecord(Record record, CountDownLatch latch) {
// 将记录提交给es并返回,非阻塞
es.submit(() -> {
someSyncTask(record);
latch.countDown();
});
}
public boolean blockTillAllRecordsAreProcessed(CountDownLatch latch){
System.out.println("等待处理完成....");
try {
// 当所有子任务都完成时,当前线程将收到通知
// 并从等待状态恢复。
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
了解更多信息,请参阅:https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CountDownLatch.html
英文:
You could use the CountdownLatch class to block as follows:
public void processRecords(List<Record> records) {
CountDownLatch latch = new CountDownLatch(records.size());
for (Record record : records) {
listener.handleRecord(record, latch);
}
//if 15 minutes has elapsed, run below code. This is blocking.
listener.blockTillAllRecordsAreProcessed(latch)
}
public class Listener {
ExecutorService es;
...
public void handleRecord(Record record, CountDownLatch latch) {
//submit record to es and return
// non blocking
es.submit(()->{
someSyncTask(record);
latch.countDown();
})
}
public boolean blockTillAllRecordsAreProcessed(CountDownLatch latch){
System.out.println("waiting for processes to complete....");
try {
//current thread will get notified if all chidren's are done
// and thread will resume from wait() mode.
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Read more here: https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CountDownLatch.html
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论