Java – CompletableFutures – 如果出现异常,如何取消所有的 future?

huangapple go评论68阅读模式
英文:

Java - CompletableFutures - How can i cancel all futures if there are exceptions

问题

以下是您提供的代码的翻译部分:

我有一个方法包含在下面),用于返回一个```CompletableFuture```列表的值

该方法应该

 1. 能够在给定时间内超时
 2. 能够取消所有的futures如果有超过n个异常

第一个点运行良好确实在超时限制后中止。(之后我仍然需要调用```exectuorService.shutdownNow()```来返回给调用者)。我遇到的问题是第二个我试图完成的事情

假设我有一个包含20,000个futures的列表所有这些futures都会有一个异常那么为什么要让它们全部执行如果我看到异常太多那么我假设所有的futures都有问题我希望取消它们

此外我希望每个future都有一个单独的超时时间无论它可能需要多长时间但这也不起作用可能是由于下面概述的相同原因

似乎问题是因为当我调用```allDoneFuture.thenApply()```在这一点上它等待并让所有的futures都完成无论是成功还是异常只有在它们全部完成后它才会遍历每个future并获取其结果在那时当它们已经完成时取消有什么好处呢

如果有人能够向我展示如何满足这个具体的需求监视异常和单独的超时并根据这些取消所有其他任务”,我将不胜感激

谢谢

以下是我编写的方法

```java
/**
     * @param futures 一个CompletableFuture列表
     * @param timeout 在抛出异常之前允许futures运行的时间
     * @param timeUnit 超时的时间单位
     * @param allowedExceptions 我们容忍多少个futures抛出异常,
     * 注意:如果从futures中抛出异常,它将返回null,直到达到allowedExceptions阈值
     * */
    public static <T> List<T> extractFromFutures(List<CompletableFuture<T>> futures, int timeout, TimeUnit timeUnit, int allowedExceptions) {
        CompletableFuture<Void> allDoneFuture = CompletableFuture
                .allOf(futures.toArray(new CompletableFuture[futures.size()]));
        try {
            AtomicInteger exceptionCount = new AtomicInteger(0);
            return allDoneFuture.thenApply(v -> //当全部完成时
                    futures.stream().
                            map(future -> {
                                try {
                                    //如果我只能设置一个单独的超时时间
                                    return future.get(timeout, timeUnit);
                                } catch (Exception e) {
                                    future.cancel(true);
                                    int curExceptionCnt = exceptionCount.incrementAndGet();
                                    if(curExceptionCnt >= allowedExceptions){
                                        //我本来希望它会将异常抛给调用的try-catch,
                                        //然后取消所有的futures,但它没有这样做
                                        throw new RuntimeException(e);
                                    }
                                    else{
                                        return null;
                                    }
                                }
                            }).
                            collect(Collectors.<T>toList())
            ).get(timeout, timeUnit);
        } catch (Exception e) {
            allDoneFuture.cancel(true);
            throw new RuntimeException(e);
        }
    }

希望这有助于理解您的代码。

英文:

I have a method (included below) to return the values of a list of CompletableFutures.

The method is supposed to:

  1. be able to timeout after a given time.
  2. be able to cancel all futures if there are more than n amount of exceptions.

The first point works well and indeed bombs out after it passed the timeout limit. (I still need to call exectuorService.shutdownNow() afterwards to return to the caller). The problem I'm having is with the second thing I'm trying to accomplish.

Lets say i have a list of 20,000 futures and all of them will have an exception, then why let all of them execute, if I see that there are too many exceptions then i assume that something is wrong with all of the futures andI want to cancel them.

In addition i would love to have a timeout on each future individually how long it may take, but this also would'nt work, unassuming for the same reason outlined below.

It seems that the reason is, because when I call allDoneFuture.thenApply(), at this point it waits and lets all the futures complete, either successfully or exceptionally. Only after all of them completed does it go through each future and fetches its result. At that point what good does it do to cancel, when they have completed already.

I would much appreciate if someone can show me how to accomplish this specific need: "Monitor the exceptions, and the individual timeouts, and based on that cancel all others".

Thanks.

Below is the method I wrote:

/**
* @param futures a list of completable futures
* @param timeout how long to allow the futures to run before throwing exception
* @param timeUnit unit of timeout
* @param allowedExceptions how many of the futures do we tolerate exceptions,
* NOTE: if an exception is thrown from the futures it will return null, until it reaches the allowedExceptions threshold
* */
public static &lt;T&gt; List&lt;T&gt; extractFromFutures(List&lt;CompletableFuture&lt;T&gt;&gt; futures, int timeout, TimeUnit timeUnit, int allowedExceptions) {
CompletableFuture&lt;Void&gt; allDoneFuture = CompletableFuture
.allOf(futures.toArray(new CompletableFuture[futures.size()]));
try {
AtomicInteger exceptionCount = new AtomicInteger(0);
return allDoneFuture.thenApply(v -&gt;//when all are done
futures.stream().
map(future -&gt; {
try {
//if only I could set an individual timeout
return future.get(timeout, timeUnit);
} catch (Exception e) {
future.cancel(true);
int curExceptionCnt = exceptionCount.incrementAndGet();
if(curExceptionCnt &gt;= allowedExceptions){
//I would&#39;ve hoped that it will throw it to the calling try-catch 
//and then cancel all futures, but it doesn&#39;t
throw new RuntimeException(e);
}
else{
return null;
}
}
}).
collect(Collectors.&lt;T&gt;toList())
).get(timeout, timeUnit);
} catch (Exception e) {
allDoneFuture.cancel(true);
throw new RuntimeException(e);
}
}

答案1

得分: 1

取消所有在一定数量的异常之后剩余的所有Future,您可以在每个Future上调用exceptionally方法,并在其中递增异常计数,并在其中可能取消它们。

对于单个的超时,您可以创建一个类,该类包含具有其超时的Future,然后基于超时对它们进行排序,并使用超时减去已经流逝的时间调用get方法。

static class FutureWithTimeout<T> {
    CompletableFuture<T> f;
    long timeout;
    TimeUnit timeUnit;

    FutureWithTimeout(CompletableFuture<T> f, long timeout, TimeUnit timeUnit) {
        this.f = f;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
    }
}

public static <T> List<T> extractFromFutures(List<FutureWithTimeout<T>> futures, int allowedExceptions) {
    AtomicInteger exceptionCount = new AtomicInteger(0);
    futures.forEach(f -> f.f.exceptionally(t -> {
        if (exceptionCount.getAndIncrement() == allowedExceptions) {
            futures.forEach(c -> c.f.cancel(false));
        }
        return null;
    }));
    long t = System.nanoTime();
    return futures.stream()
        .sorted(Comparator.comparingLong(f -> f.timeUnit.toNanos(f.timeout)))
        .map(f -> {
            try {
                return f.f.get(Math.max(0, f.timeUnit.toNanos(f.timeout) - (System.nanoTime() - t)),
                    TimeUnit.NANOSECONDS);
            } catch (InterruptedException | ExecutionException | TimeoutException ex) {
                f.f.cancel(false);
                return null;
            }
        })
        .collect(Collectors.toList());
}

请注意,这可能会以与传入顺序不同的顺序返回列表。如果您需要以相同的顺序返回它们,您可以将map().collect()更改为forEachOrdered,然后在排序后重新映射它们为它们的结果。

另外,请注意,对于CompletableFuturecancel方法的mayInterruptIfRunning参数没有效果,因此我将其更改为false

英文:

To cancel all of the remaining futures after a certain number of exceptions you can call exceptionally on each of them and increment the exception count and possibly cancel them inside of that.

For individual timeouts you could create a class that holds the future with its timeout then sort them based on the timeout and call get with the timeout minus the elapsed time.

static class FutureWithTimeout&lt;T&gt; {
CompletableFuture&lt;T&gt; f;
long timeout;
TimeUnit timeUnit;
FutureWithTimeout(CompletableFuture&lt;T&gt; f, long timeout, TimeUnit timeUnit) {
this.f = f;
this.timeout = timeout;
this.timeUnit = timeUnit;
}
}
public static &lt;T&gt; List&lt;T&gt; extractFromFutures(List&lt;FutureWithTimeout&lt;T&gt;&gt; futures, int allowedExceptions) {
AtomicInteger exceptionCount = new AtomicInteger(0);
futures.forEach(f -&gt; f.f.exceptionally(t -&gt; {
if(exceptionCount.getAndIncrement() == allowedExceptions){
futures.forEach(c -&gt; c.f.cancel(false));
}
return null;
}));
long t = System.nanoTime();
return futures.stream()
.sorted(Comparator.comparingLong(f -&gt; f.timeUnit.toNanos(f.timeout)))
.map(f -&gt; {
try {
return f.f.get(Math.max(0, f.timeUnit.toNanos(f.timeout) - (System.nanoTime() - t)), 
TimeUnit.NANOSECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException ex) {
f.f.cancel(false);
return null;
}
})
.collect(Collectors.toList());
}

Note that this may return the list in a different order than it was passed in. If you need it in the same order then you could change the map().collect() to a forEachOrdered and then re map them into their results after without sorting.

Also the mayInterruptIfRunning parameter to cancel has no effect on CompletableFuture so I changed it to false.

答案2

得分: 0

CompletableFuture完全忽略了对cancel(true)的任何调用。我不知道为什么(可能是为了简化API),但这很糟糕。如果你想使未来真正可取消(可以手动检查中断,或者通过在锁上阻塞接受取消),那么你必须使用Future而不是CompletableFuture

英文:

CompletableFuture completely ignores any call to cancel(true). I don't know why (presumably to simplify the API), but it sucks. If you want to make futures actually cancelable (where you can either manually check for interruption, or accept cancellation by blocking on a lock), then you have to use Future, not CompletableFuture.

huangapple
  • 本文由 发表于 2020年8月14日 00:54:50
  • 转载请务必保留本文链接:https://go.coder-hub.com/63399831.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定