英文:
How to enforce timeout and cancel async CompletableFuture Jobs
问题
// 提交异步任务
List<CompletableFuture<String>> futures = submitJobs(); // 使用CompletableFuture.supplyAsync
List<CompletableFuture<Void>> all = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
try {
all.get(100L, TimeUnit.MILLISECONDS);
} catch (TimeoutException e){
for(CompletableFuture<String> f : futures) {
if(!f.isDone()) {
/*
来自Java文档:
@param mayInterruptIfRunning 此值在此实现中没有效果,因为不使用中断来控制处理。
*/
f.cancel(true);
}
}
}
List<String> output = new ArrayList<>();
for(CompletableFuture<String> fu : futures) {
if(!fu.isCancelled()) { // 这一步需要吗?
output.add(fu.join());
}
}
return output;
- 这样的实现会起作用吗?是否有更好的方法?
- 如何正确地取消Future?Java文档说,线程不能被中断?所以,如果我取消一个Future,并调用
join()
,是否会立即获取结果,因为线程不会被中断? - 在等待结束后使用
join()
还是get()
获取结果,哪种方式更推荐?
注意:以上只是您提供的代码的翻译部分,没有包括问题内容。如果您还有其他问题或需要进一步解答,请随时提问。
英文:
I am using Java 8, and I want to know the recommended way to enforce timeout on 3 async jobs that I would to execute async and retrieve the result from the future. Note that the timeout is the same for all 3 jobs. I also want to cancel the job if it goes beyond time limit.
I am thinking something like this:
// Submit jobs async
List<CompletableFuture<String>> futures = submitJobs(); // Uses CompletableFuture.supplyAsync
List<CompletableFuture<Void>> all = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
try {
allFutures.get(100L, TimeUnit.MILLISECONDS);
} catch (TimeoutException e){
for(CompletableFuture f : future) {
if(!f.isDone()) {
/*
From Java Doc:
@param mayInterruptIfRunning this value has no effect in this
* implementation because interrupts are not used to control
* processing.
*/
f.cancel(true);
}
}
}
List<String> output = new ArrayList<>();
for(CompeletableFuture fu : futures) {
if(!fu.isCancelled()) { // Is this needed?
output.add(fu.join());
}
}
return output;
- Will something like this work? Is there a better way?
- How to cancel the future properly? Java doc says, thread cannot be interrupted? So, if I were to cancel a future, and call
join()
, will I get the result immediately since the thread will not be interrupted? - Is it recommended to use join() or get() to get the result after waiting is over?
答案1
得分: 1
值得注意的是,在CompletableFuture上调用cancel方法实际上与在当前阶段上调用completeExceptionally方法相同。取消操作不会影响先前的阶段。话虽如此:
- 原则上,假设不需要上游取消(从伪代码的角度来看,上述代码有语法错误),类似这样的操作是有效的。
- CompletableFuture的取消不会中断当前线程。取消操作会立即导致所有下游阶段触发CancellationException(将中断执行流程)。
- 在调用方愿意无限期等待的情况下,'join'和'get'实际上是相同的。join会处理包装检查异常。如果调用方希望设置超时,将需要使用get。
包含一个部分来说明取消时的行为。请注意,在取消时下游进程将不会启动,但上游进程甚至在取消后仍会继续执行。
public static void main(String[] args) throws Exception
{
// ...(代码部分未翻译)
}
private static CompletableFuture<String> appendNumber(CompletableFuture<String> baseFuture)
{
// ...(代码部分未翻译)
}
private static CompletableFuture<String> printIfCancelled(CompletableFuture<String> baseFuture)
{
// ...(代码部分未翻译)
}
如果需要取消上游进程(例如:取消某个网络调用),将需要自定义处理。
英文:
It is worth noting that calling cancel on CompletableFuture is effectively the same as calling completeExceptionally on the current stage. The cancellation will not impact prior stages. With that said:
- In principle, something like this will work assuming upstream cancellation is not necessary (from a pseudocode perspective, the above has syntax errors).
- CompletableFuture cancellation will not interrupt the current thread. Cancellation will cause all downstream stages to be triggered immediately with a CancellationException (will short circuit the execution flow).
- 'join' and 'get' are effectively the same in the case where the caller is willing to wait indefinitely. Join handles wrapping the checked Exceptions for you. If the caller wants to timeout, get will be needed.
Including a segment to illustrate the behavior on cancellation. Note how downstream processes will not be started, but upstream processes continue even after cancellation.
public static void main(String[] args) throws Exception
{
int maxSleepTime = 1000;
Random random = new Random();
AtomicInteger value = new AtomicInteger();
List<String> calculatedValues = new ArrayList<>();
Supplier<String> process = () -> { try { Thread.sleep(random.nextInt(maxSleepTime)); System.out.println("Stage 1 Running!"); } catch (InterruptedException e) { e.printStackTrace(); } return Integer.toString(value.getAndIncrement()); };
List<CompletableFuture<String>> stage1 = IntStream.range(0, 10).mapToObj(val -> CompletableFuture.supplyAsync(process)).collect(Collectors.toList());
List<CompletableFuture<String>> stage2 = stage1.stream().map(Test::appendNumber).collect(Collectors.toList());
List<CompletableFuture<String>> stage3 = stage2.stream().map(Test::printIfCancelled).collect(Collectors.toList());
CompletableFuture<Void> awaitAll = CompletableFuture.allOf(stage2.toArray(new CompletableFuture[0]));
try
{
/*Wait 1/2 the time, some should be complete. Some not complete -> TimeoutException*/
awaitAll.get(maxSleepTime / 2, TimeUnit.MILLISECONDS);
}
catch(TimeoutException ex)
{
for(CompletableFuture<String> toCancel : stage2)
{
boolean irrelevantValue = false;
if(!toCancel.isDone())
toCancel.cancel(irrelevantValue);
else
calculatedValues.add(toCancel.join());
}
}
System.out.println("All futures Cancelled! But some Stage 1's may still continue printing anyways.");
System.out.println("Values returned as of cancellation: " + calculatedValues);
Thread.sleep(maxSleepTime);
}
private static CompletableFuture<String> appendNumber(CompletableFuture<String> baseFuture)
{
return baseFuture.thenApply(val -> { System.out.println("Stage 2 Running"); return "#" + val; });
}
private static CompletableFuture<String> printIfCancelled(CompletableFuture<String> baseFuture)
{
return baseFuture.thenApply(val -> { System.out.println("Stage 3 Running!"); return val; }).exceptionally(ex -> { System.out.println("Stage 3 Cancelled!"); return ex.getMessage(); });
}
If it is necessary to cancel the upstream process (ex: cancel some network call), custom handling will be needed.
答案2
得分: 0
调用 cancel 后,您将无法加入该 future,因为会产生异常。
终止计算的一种方法是让其引用该 future 并定期检查:如果已取消,则从内部中止计算。
如果计算是循环,可以在每次迭代时进行检查。
您是否需要它是 CompletableFuture?因为另一种方法是避免使用 CompletableFuture,改为使用简单的 Future 或者 FutureTask:如果使用 Executor 执行它,调用 future.cancel(true) 将在可能的情况下终止计算。
回答问题:"调用 join(),我会立即获得结果吗"。
不,您不会立即获得结果,它将挂起并等待完成计算:无法强制在更短的时间内完成长时间的计算。
您可以调用 future.complete(value) 并提供一个值,该值将作为默认结果被其他引用该 future 的线程使用。
英文:
After calling cancel you cannot join the furure, since you get an exception.
One way to terminate the computation is to let it have a reference to the future and check it periodically: if it was cancelled abort the computation from inside.
This can be done if the computaion is a loop where at each iteration you can do the check.
Do you need it to be a CompletableFuture? Cause another way is to avoid to use a CompleatableFuture, and use a simple Future or a FutureTask instead: if you execute it with an Executor calling future.cancel(true) will terminate the computation if possbile.
Answerring to the question: "call join(), will I get the result immediately".
No you will not get it immediately, it will hang and wait to complete the computation: there is no way to force a computation that takes a long time to complete in a shorter time.
You can call future.complete(value) providing a value to be used as default result by other threads that have a reference to that future.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论