有没有一种方法将CompletableFuture放入循环中?

huangapple go评论71阅读模式
英文:

Is there a way to put a CompletableFuture in a loop?

问题

下面代码的问题在于我必须等待所有三个任务完成。

如果第一个和第二个任务在200毫秒内完成,而第三个任务在2秒内完成,那么在加载下一个三个URL之前,我将不得不等待2秒。

理想情况下,我希望在每个任务完成时立即发送新的请求,并以某种方式延迟主线程,直到ArrayList为空为止。

简单来说,我希望每个CompletableFuture在旧任务完成时以某种循环方式运行。

(我经常在JavaScript中使用事件来做到这一点)

有人能想出我如何才能实现这一点吗?

private static void httpClientExample() {

    ArrayList<String> urls = new ArrayList<>(
            Arrays.asList(
                    "https://www.bing.com/",
                    "https://openjdk.java.net/",
                    "https://openjdk.java.net/",
                    "https://google.com/",
                    "https://github.com/",
                    "https://stackoverflow.com/"
            ));

    HttpClient httpClient = HttpClient.newHttpClient();

    urls.forEach(url -> {
        CompletableFuture<Void> task = httpClient.sendAsync(HttpRequest.newBuilder()
                .uri(URI.create(url))
                .build(), HttpResponse.BodyHandlers.ofString())
                .thenApply(HttpResponse::uri).thenAccept(System.out::println);

        // Delay the main thread until this task is completed
        task.join();
    });

    System.out.println("Main Thread Completed");
}
英文:

The problem with the code below is that I have to wait for all three tasks to finish.

If the 1st and 2nd tasks complete in 200ms and the 3rd completes in 2s then I will have to wait 2s before I load the next three URLs.

Ideally I would send a new request as soon as each task finishes and delay the main thread somehow until the ArrayList was empty.

In simple terms I would like each completable future to run in a kind of loop that is triggered by the old task completing.

(I do this quite often in JavaScript using events)

Can anybody think how I might achieve this?

    private static void httpClientExample(){

    ArrayList&lt;String&gt; urls = new ArrayList&lt;&gt;(
            Arrays.asList(
                    &quot;https://www.bing.com/&quot;,
                    &quot;https://openjdk.java.net/&quot;,
                    &quot;https://openjdk.java.net/&quot;,
                    &quot;https://google.com/&quot;,
                    &quot;https://github.com/&quot;,
                    &quot;https://stackoverflow.com/&quot;
            ));

    HttpClient httpClient = HttpClient.newHttpClient();

    var task1 = httpClient.sendAsync(HttpRequest.newBuilder()
            .uri(URI.create(urls.get(0)))
            .build(), HttpResponse.BodyHandlers.ofString())
            .thenApply(HttpResponse::uri).thenAccept(System.out::println);

    var task2 = httpClient.sendAsync(HttpRequest.newBuilder()
            .uri(URI.create(urls.get(1)))
            .build(), HttpResponse.BodyHandlers.ofString())
            .thenApply(HttpResponse::uri).thenAccept(System.out::println);

    var task3 = httpClient.sendAsync(HttpRequest.newBuilder()
            .uri(URI.create(urls.get(2)))
            .build(), HttpResponse.BodyHandlers.ofString())
            .thenApply(HttpResponse::uri).thenAccept(System.out::println);

    // All tasks have to complete
    var all = CompletableFuture.allOf(task1, task2, task3).join();
    
    // Get the next 3 URLs

    System.out.println(&quot;Main Thread Completed&quot;);
}

答案1

得分: 4

让任务本身移除另一个待处理的 URL 并提交它,需要一个线程安全的队列。

也许更容易让主线程来做,例如:

var httpClient = HttpClient.newHttpClient();
var pending = new ArrayDeque&lt;CompletableFuture&lt;?&gt;&gt;(3);
for(String url: urls) {
    while(pending.size() &gt;= 3 &amp;&amp; !pending.removeIf(CompletableFuture::isDone))
        CompletableFuture.anyOf(pending.toArray(CompletableFuture&lt;?&gt;[]::new)).join();

    pending.addLast(httpClient.sendAsync(HttpRequest.newBuilder()
            .uri(URI.create(url))
            .build(), HttpResponse.BodyHandlers.ofString())
            .thenApply(HttpResponse::uri).thenAccept(System.out::println));
}
CompletableFuture.allOf(pending.toArray(CompletableFuture&lt;?&gt;[]::new)).join();

这将等待三个提交的任务中至少有一个完成(使用 anyOf/join)然后再提交下一个。当循环结束时,可能有最多三个仍在运行的任务。循环后的 allOf/join 将等待这些任务的完成,因此所有任务在之后都已完成。如果希望在已知所有任务已提交而无需等待其完成时,使发起线程继续进行,请删除最后一句。

英文:

Letting the job itself remove another pending URL and submit it, would require a thread safe queue.

It might be easier to let the main thread do it, e.g. like

var httpClient = HttpClient.newHttpClient();
var pending = new ArrayDeque&lt;CompletableFuture&lt;?&gt;&gt;(3);
for(String url: urls) {
    while(pending.size() &gt;= 3 &amp;&amp; !pending.removeIf(CompletableFuture::isDone))
        CompletableFuture.anyOf(pending.toArray(CompletableFuture&lt;?&gt;[]::new)).join();

    pending.addLast(httpClient.sendAsync(HttpRequest.newBuilder()
            .uri(URI.create(url))
            .build(), HttpResponse.BodyHandlers.ofString())
            .thenApply(HttpResponse::uri).thenAccept(System.out::println));
}
CompletableFuture.allOf(pending.toArray(CompletableFuture&lt;?&gt;[]::new)).join();

This will wait until at least one of the three submitted jobs has completed (using anyOf/join) before submitting the next one. When the loop ends, there might be up to three still running jobs. The subsequent allOf/join after the loop will wait for the completion of those jobs, so all jobs have been completed afterwards. When you want the initiator thread to proceed when it is known that all jobs have been submitted, without waiting for their completion, just remove the last statement.

答案2

得分: 1

如果您对最大并行调用数量没有要求,事情会变得更简单:

private static void httpClientExample() throws Exception {

  final ArrayList<String> urls = ...; //URL列表

  final HttpClient httpClient = HttpClient.newBuilder().executor(
                                    Executors.newFixedThreadPool(10)).build();

  final List<CompletableFuture<Void>> allFutures = new ArrayList<>();
  for (String url : urls) {
    final CompletableFuture<Void> completableFuture = httpClient
        .sendAsync(HttpRequest.newBuilder().uri(URI.create(url)).build(),
            HttpResponse.BodyHandlers.ofString())
        .thenApply(HttpResponse::uri).thenAccept(System.out::println);
    allFutures.add(completableFuture);
  }

  CompletableFuture.allOf(allFutures.toArray(new CompletableFuture[0])).get();
}
英文:

If you don't have a requirement on the maximum amount of parallel calls things become a lot easier:

private static void httpClientExample() throws Exception {

  final ArrayList&lt;String&gt; urls = ...; //list of urls 

  final HttpClient httpClient = HttpClient.newBuilder().executor(
                                    Executors.newFixedThreadPool(10)).build();

  final List&lt;CompletableFuture&lt;Void&gt;&gt; allFutures = new ArrayList&lt;&gt;();
  for (String url : urls) {
    final CompletableFuture&lt;Void&gt; completableFuture = httpClient
        .sendAsync(HttpRequest.newBuilder().uri(URI.create(url)).build(),
            HttpResponse.BodyHandlers.ofString())
        .thenApply(HttpResponse::uri).thenAccept(System.out::println);
    allFutures.add(completableFuture);
  }

  CompletableFuture.allOf(allFutures.toArray(CompletableFuture[]::new)).get();
}

huangapple
  • 本文由 发表于 2020年10月22日 03:05:00
  • 转载请务必保留本文链接:https://go.coder-hub.com/64470108.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定