英文:
Listenable Future callback is heavily delayed
问题
我的Guava可监听的未来上的回调延迟了。我正在编写一个应用程序,它基本上有一个线程池,一旦"长时间运行的任务"完成,然后我有一个成功的回调,如下所示:
ListenableFuture<Boolean> listenableFuture = service.submit(() -> publish(eventName, record, producer));
Futures.addCallback(listenableFuture, new FutureCallback<>() {
@Override
public void onSuccess(Boolean result) {
System.out
.println(LocalTime.now() + " 任务成功完成,结果为: " + result);
}
@Override
public void onFailure(Throwable t) {
System.out.println(LocalTime.now() + " 任务失败,结果为: " + t.getMessage());
}
}, service);
我的服务初始化如下:
executorService = new ThreadPoolExecutor(poolSize, poolSize,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(targetAmount),
handler);
service = MoreExecutors.listeningDecorator(executorService);
在我执行的长时间运行的任务中,当网络请求成功时,我会打印出来。
private Boolean publish(String eventName, ProducerRecord rec, Producer producer) {
AtomicBoolean failed = new AtomicBoolean(false);
producer.send(rec, (metadata, exception) -> {
if (exception != null) {
failed.set(true);
} else {
System.out.println("网络请求成功");
}
});
if (failed.get() == true) {
return false;
}
return true;
}
问题
如果我将线程池设置为处理1百万个任务,那么在看到单个回调println语句被触发之前,我将看到1百万个 "网络请求成功"。为什么会这样?
英文:
My callbacks on my Guava listenable futures are delayed. I'm writing an application that essentially has a thread pool and once the "long running task" is complete, then I have a success callback like so:
ListenableFuture<Boolean> listenableFuture = service.submit(() -> publish(eventName, record, producer));
Futures.addCallback(listenableFuture, new FutureCallback<>() {
@Override
public void onSuccess(Boolean result) {
System.out
.println(LocalTime.now() + " Task completed successfully with result: " + result);
}
@Override
public void onFailure(Throwable t) {
System.out.println(LocalTime.now() + " Task failed with result: " + t.getMessage());
}
}, service);
My initialization of the service looks like the following:
executorService = new ThreadPoolExecutor(poolSize, poolSize,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(targetAmount),
handler);
service = MoreExecutors.listeningDecorator(executorService);
Within my long-running task I execute, I print when the network request is successful.
private Boolean publish(String eventName, ProducerRecord rec, Producer producer) {
AtomicBoolean failed = new AtomicBoolean(false);
producer.send(rec, (metadata, exception) -> {
if (exception != null) {
failed.set(true);
} else {
System.out.println("The network request is successful");
}
});
if (failed.get() == true) {
return false;
}
return true;
}
Problem
If I set the threadpool to process 1MM things, I will see 1MM The network request is successful
before I see a single callback println statement get fired. Why is this the case?
答案1
得分: 1
根据 文档 中 Futures.addCallback 的说明,回调在执行器中运行。只有在所有其他排队的元素完成后,才会运行您的回调。要解决这个问题,您可以创建一个单独的执行器来执行这些回调,或者由于看起来您控制着池构造函数,您可以创建一个 优先级队列 来替代 LinkedBlockingQueue 作为任务队列,并对回调设置优先级。
英文:
As the documentation for Futures.addCallback states, the callback is run in the executor. Your callback is only going to be run after all of the other queued elements have completed. To fix this you could create a separate executor to just execute the callbacks, or since it seems you control the pool constructor you could create a priority queue instead of a LinkedBlockingQueue as the task queue and prioritize your callbacks.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论