如何按照访问时间而不是创建时间来优先处理等待的CompletableFuture?

huangapple go评论80阅读模式
英文:

How to prioritise waiting CompletableFutures by access time instead of creation time?

问题

TL;DR: 当有多个CompletableFuture等待执行时,我如何优先处理感兴趣的那些?

我有一个包含 10,000 个 CompletableFuture 的列表(它们计算产品数据库内部报告的数据行):

List<Product> products = ...;

List<CompletableFuture<DataRow>> dataRows = products
    .stream()
    .map(p -> CompletableFuture.supplyAsync(() -> calculateDataRowForProduct(p), singleThreadedExecutor))
    .collect(Collectors.toList());

每个需要大约 50 毫秒 完成,所以整个过程需要 500 秒(它们都共享相同的数据库连接,因此不能并行运行)。

假设我想访问第 9000 个产品的数据行:

dataRows.get(9000).join()

问题是,所有这些CompletableFuture按它们被创建的顺序执行,而不是按它们被访问的顺序执行。这意味着我必须等待 450 秒 才能计算那些我目前不关心的东西,最终才能获取我想要的数据行。

问题
是否有任何方法可以更改这种行为,以便我访问的 Future 优先于我目前不关心的 Future?

初步想法

我注意到 ThreadPoolExecutor 使用 BlockingQueue<Runnable> 来排队等待可用线程的条目。

因此,我考虑使用 PriorityBlockingQueue 来在访问其CompletableFuture时更改Runnable的优先级,但是存在以下问题:

  • PriorityBlockingQueue 没有方法来重新设置现有元素的优先级,而且
  • 我需要找到从 CompletableFuture 到队列中相应 Runnable 条目的方法。

在我继续深入研究之前,你认为这听起来是否是正确的方法?其他人是否曾经有过这种要求?我尝试搜索过,但什么都没有找到。也许 CompletableFuture 不是正确的做法?

背景
我们有一个内部报告,每页显示 100 个产品。最初,我们为报告的所有数据行预先计算了数据,但如果有人拥有这么多产品,这将花费太长时间。

因此,第一个优化是将计算包装在一个记忆化的供应商中:

List<Supplier<DataRow>> dataRows = products
    .stream()
    .map(p -> Suppliers.memoize(() -> calculateDataRowForProduct(p)))
    .collect(Collectors.toList());

这意味着初始显示前 100 个条目现在只需要 5 秒,而不是 500 秒(这很棒),但当用户切换到下一页时,每个页面仍然需要 另外 5 秒

因此,想法是,当用户盯着第一页时,为什么不在后台预先计算下一页。这就是我上面的问题。

英文:

TL;DR: When several CompletableFutures are waiting to get executed, how can I prioritize those whose values i'm interested in?

I have a list of 10,000 CompletableFutures (which calculate the data rows for an internal report over the product database):

List&lt;Product&gt; products = ...;

List&lt;CompletableFuture&lt;DataRow&gt;&gt; dataRows = products
    .stream()
    .map(p -&gt; CompletableFuture.supplyAsync(() -&gt; calculateDataRowForProduct(p), singleThreadedExecutor))
    .collect(Collectors.toList());

Each takes around 50ms to complete, so the entire thing finishes in 500sec. (they all share the same DB connection, so cannot run in parallel).

Let's say I want to access the data row of the 9000th product:
dataRows.get(9000).join()

The problem is, all these CompletableFutures are executed in the order they have been created, not in the order they are accessed. Which means I have to wait 450sec for it to calculate stuff that at the moment I don't care about, to finally get to the data row I want.

Question:
Is there any way to change this behaviour, so that the Futures I try to access get priority over those I don't care about at the moment?

First thoughts:

I noticed that a ThreadPoolExecutor uses a BlockingQueue&lt;Runnable&gt; to queue up entries waiting for an available Thread.

So I thought about using a PriorityBlockingQueue, to change the priority of the Runnable when I access its CompletableFuture but:

  • PriorityBlockingQueue does not have a method to reprioritize an existing element, and
  • I need to figure out a way to get from the CompletableFuture to the corresponding Runnable entry in the queue.

Before I go further down this road, do you think this sounds like the correct approach. Do others ever had this kind of requirement? I tried to search for it, but found exactly nothing. Maybe CompletableFuture is not the correct way of doing this?

Background:
We have an internal report which displays 100 products per page. Initially we precalculated all DataRows for the report, which took way to long if someone has that many products.

So first optimization was to wrap the calculation in a memoized supplier:

List&lt;Supplier&lt;DataRow&gt;&gt; dataRows = products
    .stream()
    .map(p -&gt; Suppliers.memoize(() -&gt; calculateDataRowForProduct(p)))
    .collect(Collectors.toList());

This means that initial display of first 100 entries now takes 5sec instead of 500sec (which is great), but when the user switches to the next pages, it takes another 5sec for each single one of them.

So the idea is, while the user is staring at the first screen, why not precalculate the next pages in the background. Which leads me to my question above.

答案1

得分: 5

有趣的问题 如何按照访问时间而不是创建时间来优先处理等待的CompletableFuture?

一种方法是创建自定义的FutureTask类,以便动态更改任务的优先级。

在这里,DataRowProduct都被视为简单的String,只是为了简单起见。

import java.util.*;
import java.util.concurrent.*;

public class Testing {
    private static String calculateDataRowForProduct(String product) {
        try {
            // 虚拟操作。
            Thread.sleep(200);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("计算完成:" + product);
        return "数据行为" + product;
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        PriorityBlockingQueue<Runnable> customQueue = new PriorityBlockingQueue<Runnable>(1, new CustomRunnableComparator());
        ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, customQueue);
        List<String> products = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            products.add("产品" + i);
        }
        Map<Integer, PrioritizedFutureTask<String>> taskIndexMap = new HashMap<>();
        for (int i = 0; i < products.size(); i++) {
            String product = products.get(i);
            Callable callable = () -> calculateDataRowForProduct(product);
            PrioritizedFutureTask<String> dataRowFutureTask = new PrioritizedFutureTask<>(callable, i);
            taskIndexMap.put(i, dataRowFutureTask);
            executor.execute(dataRowFutureTask);
        }

        List<Integer> accessOrder = new ArrayList<>();
        accessOrder.add(4);
        accessOrder.add(7);
        accessOrder.add(2);
        accessOrder.add(9);
        int priority = -1 * accessOrder.size();
        for (Integer nextIndex : accessOrder) {
            PrioritizedFutureTask taskAtIndex = taskIndexMap.get(nextIndex);
            assert (customQueue.remove(taskAtIndex));
            customQueue.offer(taskAtIndex.set_priority(priority++));
            // 现在此任务将位于线程池队列的前面。
            // 因此,这个任务将会在下一个执行。
        }
        for (Integer nextIndex : accessOrder) {
            PrioritizedFutureTask<String> dataRowFutureTask = taskIndexMap.get(nextIndex);
            String dataRow = dataRowFutureTask.get();
            System.out.println("索引 " + nextIndex + " 的数据行 = " + dataRow);
        }
    }
}

class PrioritizedFutureTask<T> extends FutureTask<T> implements Comparable<PrioritizedFutureTask<T>> {

    private Integer _priority = 0;
    private Callable<T> callable;

    public PrioritizedFutureTask(Callable<T> callable, Integer priority) {
        super(callable);
        this.callable = callable;
        _priority = priority;
    }

    public Integer get_priority() {
        return _priority;
    }

    public PrioritizedFutureTask set_priority(Integer priority) {
        _priority = priority;
        return this;
    }

    @Override
    public int compareTo(@NotNull PrioritizedFutureTask<T> other) {
        if (other == null) {
            throw new NullPointerException();
        }
        return get_priority().compareTo(other.get_priority());
    }
}

class CustomRunnableComparator implements Comparator<Runnable> {
    @Override
    public int compare(Runnable task1, Runnable task2) {
        return ((PrioritizedFutureTask) task1).compareTo((PrioritizedFutureTask) task2);
    }
}

输出:

计算完成:产品0
计算完成:产品4
索引 4 的数据行 = 数据行为产品4
计算完成:产品7
索引 7 的数据行 = 数据行为产品7
计算完成:产品2
索引 2 的数据行 = 数据行为产品2
计算完成:产品9
索引 9 的数据行 = 数据行为产品9
计算完成:产品1
计算完成:产品3
计算完成:产品5
计算完成:产品6
计算完成:产品8

在这里还有一个优化的空间。
customQueue.remove(taskAtIndex) 操作的时间复杂度是相对于队列的大小(或产品的总数)的 O(n)。如果产品数量不多(<= 10^5),可能不会产生太大的影响。但否则可能会导致性能问题。

一种解决方法是扩展BlockingPriorityQueue并实现从优先级队列中删除元素的功能,时间复杂度为 O(logn) 而不是 O(n)。可以通过在优先级队列结构内部保持一个哈希表来实现。这个哈希表将保持元素与其在底层数组中的索引(或重复情况下的索引)之间的数量关系。幸运的是,我之前在Python中已经实现了这样的堆结构。如果对这个优化有更多问题,最好单独提一个新问题来询问。

英文:

Interesting problem 如何按照访问时间而不是创建时间来优先处理等待的CompletableFuture?

One way is to roll out custom FutureTask class to facilitate changing priorities of tasks dynamically.

DataRow and Product are both taken as just String here for simplicity.

import java.util.*;
import java.util.concurrent.*;

public class Testing {
    private static String calculateDataRowForProduct(String product) {
        try {
            // Dummy operation.
            Thread.sleep(200);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(&quot;Computation done for &quot; + product);
        return &quot;data row for &quot; + product;
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        PriorityBlockingQueue&lt;Runnable&gt; customQueue = new PriorityBlockingQueue&lt;Runnable&gt;(1, new CustomRunnableComparator());
        ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, customQueue);
        List&lt;String&gt; products = new ArrayList&lt;&gt;();
        for (int i = 0; i &lt; 10; i++) {
            products.add(&quot;product&quot; + i);
        }
        Map&lt;Integer, PrioritizedFutureTask&lt;String&gt;&gt; taskIndexMap = new HashMap&lt;&gt;();
        for (int i = 0; i &lt; products.size(); i++) {
            String product = products.get(i);
            Callable callable = () -&gt; calculateDataRowForProduct(product);
            PrioritizedFutureTask&lt;String&gt; dataRowFutureTask = new PrioritizedFutureTask&lt;&gt;(callable, i);
            taskIndexMap.put(i, dataRowFutureTask);
            executor.execute(dataRowFutureTask);
        }

        List&lt;Integer&gt; accessOrder = new ArrayList&lt;&gt;();
        accessOrder.add(4);
        accessOrder.add(7);
        accessOrder.add(2);
        accessOrder.add(9);
        int priority = -1 * accessOrder.size();
        for (Integer nextIndex : accessOrder) {
            PrioritizedFutureTask taskAtIndex = taskIndexMap.get(nextIndex);
            assert (customQueue.remove(taskAtIndex));
            customQueue.offer(taskAtIndex.set_priority(priority++));
            // Now this task will be at the front of the thread pool queue.
            // Hence this task will execute next.
        }
        for (Integer nextIndex : accessOrder) {
            PrioritizedFutureTask&lt;String&gt; dataRowFutureTask = taskIndexMap.get(nextIndex);
            String dataRow = dataRowFutureTask.get();
            System.out.println(&quot;Data row for index &quot; + nextIndex + &quot; = &quot; + dataRow);
        }
    }
}

class PrioritizedFutureTask&lt;T&gt; extends FutureTask&lt;T&gt; implements Comparable&lt;PrioritizedFutureTask&lt;T&gt;&gt; {

    private Integer _priority = 0;
    private Callable&lt;T&gt; callable;

    public PrioritizedFutureTask(Callable&lt;T&gt; callable, Integer priority) {
        super(callable);
        this.callable = callable;
        _priority = priority;
    }

    public Integer get_priority() {
        return _priority;
    }

    public PrioritizedFutureTask set_priority(Integer priority) {
        _priority = priority;
        return this;
    }

    @Override
    public int compareTo(@NotNull PrioritizedFutureTask&lt;T&gt; other) {
        if (other == null) {
            throw new NullPointerException();
        }
        return get_priority().compareTo(other.get_priority());
    }
}

class CustomRunnableComparator implements Comparator&lt;Runnable&gt; {
    @Override
    public int compare(Runnable task1, Runnable task2) {
        return ((PrioritizedFutureTask)task1).compareTo((PrioritizedFutureTask)task2);
    }
}

Output:

Computation done for product0
Computation done for product4
Data row for index 4 = data row for product4
Computation done for product7
Data row for index 7 = data row for product7
Computation done for product2
Data row for index 2 = data row for product2
Computation done for product9
Data row for index 9 = data row for product9
Computation done for product1
Computation done for product3
Computation done for product5
Computation done for product6
Computation done for product8

There is one more scope of optimization here.
The customQueue.remove(taskAtIndex) operation has O(n) time complexity with respect to the size of the queue (or the total number of products).
It might not affect much if the number of products is less (<= 10^5).
But it might result in a performance issue otherwise.

One solution to that is to extend BlockingPriorityQueue and roll out functionality to remove an element from a priority queue in O(logn) rather than O(n).
We can achieve that by keeping a hashmap inside the PriorityQueue structure. This hashmap will keep a count of elements vs the index (or indices in case of duplicates) of that element in the underlying array.
Fortunately, I had already implemented such a heap in Python sometime back.
If you have more questions on this optimization, its probably better to ask a new question altogether.

答案2

得分: 3

你可以在开始时避免提交所有任务给执行器,而是仅提交一个后台任务,当它完成后再提交下一个。如果你想要获取第9000行,立即提交它(如果尚未提交):

	static class FutureDataRow {
		CompletableFuture<DataRow> future;
		int index;
		List<FutureDataRow> list;
		Product product;
		
		FutureDataRow(List<FutureDataRow> list, Product product){
			this.list = list;
			index = list.size();
			list.add(this);
			this.product = product;
		}
		public DataRow get(){
			submit();
			return future.join();
		}
		private synchronized void submit(){
			if(future == null) future = CompletableFuture.supplyAsync(() -> 
				calculateDataRowForProduct(product), singleThreadedExecutor);
		}
		private void background(){
			submit();
			if(index >= list.size() - 1) return;
			future.whenComplete((dr, t) -> list.get(index + 1).background());
		}
	}
	
    ...

	List<FutureDataRow> dataRows = new ArrayList<>();
	products.forEach(p -> new FutureDataRow(dataRows, p));
	dataRows.get(0).background();

如果你希望你还可以在`get`方法内部提交下一行如果你预期他们之后会导航到下一页

---

如果你使用的是多线程执行器并且想要同时运行多个后台任务你可以修改`background`方法以查找列表中的下一个未提交任务并在当前后台任务完成时启动它

```java
	private synchronized boolean background(){
		if(future != null) return false;
		submit();
		future.whenComplete((dr, t) -> {
			for(int i = index + 1; i < list.size(); i++){
				if(list.get(i).background()) return;
			}
		});
		return true;
	}

你还需要在后台启动前n个任务,而不仅仅是第一个。

	int n = 8; //活动后台任务数
	for(int i = 0; i < dataRows.size() && n > 0; i++){
		if(dataRows.get(i).background()) n--;
	}
英文:

You could avoid submitting all of the tasks to the executor at the start, instead only submit one background task and when it finishes submit the next. If you want to get the 9000th row submit it immediately (if it has not already been submitted):

static class FutureDataRow {
CompletableFuture&lt;DataRow&gt; future;
int index;
List&lt;FutureDataRow&gt; list;
Product product;
FutureDataRow(List&lt;FutureDataRow&gt; list, Product product){
this.list = list;
index = list.size();
list.add(this);
this.product = product;
}
public DataRow get(){
submit();
return future.join();
}
private synchronized void submit(){
if(future == null) future = CompletableFuture.supplyAsync(() -&gt; 
calculateDataRowForProduct(product), singleThreadedExecutor);
}
private void background(){
submit();
if(index &gt;= list.size() - 1) return;
future.whenComplete((dr, t) -&gt; list.get(index + 1).background());
}
}
...
List&lt;FutureDataRow&gt; dataRows = new ArrayList&lt;&gt;();
products.forEach(p -&gt; new FutureDataRow(dataRows, p));
dataRows.get(0).background();

If you want you could also submit the next row inside the get method if you expect that they will navigate to the next page afterwards.


If you were instead using a multithreaded executor and you wanted to run multiple background tasks concurrently you could modify the background method to find the next unsubmitted task in the list and start it when the current background task has finished.

	private synchronized boolean background(){
if(future != null) return false;
submit();
future.whenComplete((dr, t) -&gt; {
for(int i = index + 1; i &lt; list.size(); i++){
if(list.get(i).background()) return;
}
});
return true;
}

You would also need to start the first n tasks in the background instead of just the first one.

	int n = 8; //number of active background tasks
for(int i = 0; i &lt; dataRows.size() &amp;&amp; n &gt; 0; i++){
if(dataRows.get(i).background()) n--;
}

答案3

得分: 3

为了回答我的问题...

对于我的问题,有一个令人惊讶的简单(也令人无聊)解决方案。我不知道为什么花了我三天的时间才找到它,我猜它需要正确的心态,只有在漫长宁静的沙滩上漫步,凝视着宁静的星期日傍晚的日落时,你才会拥有。

所以,嗯,写这个有点尴尬,但是当我需要获取某个值(比如第9000个产品)而未来尚未计算出该值时,我可以,而不是以某种方式强制未来尽快生成该值(通过进行所有这些重新排列和调度的魔术),我可以,嗯,我可以,... 简单地... 自己计算该值!是的!等等,什么?真的吗?

就像这样:if (!future.isDone()) {future.complete(supplier.get());}

我只需要在某个包装类中存储原始的Supplier以及CompletableFuture。这就是包装类,它工作得很好,只需要一个更好的名字:

public static class FuturizedMemoizedSupplier&lt;T&gt; implements Supplier&lt;T&gt; {
private CompletableFuture&lt;T&gt; future;
private Supplier&lt;T&gt; supplier;
public FuturizedSupplier(Supplier&lt;T&gt; supplier) {
this.supplier = supplier;
this.future = CompletableFuture.supplyAsync(supplier, singleThreadExecutor);
}
public T get() {
// 如果未来尚未完成,我们只需自己计算值,并将其设置到未来中
if (!future.isDone()) {
future.complete(supplier.get());
}
supplier = null;
return future.join();
}
}

现在,我认为这里有小概率发生竞争条件的可能性,这可能导致supplier被执行两次。但实际上,我不在乎,它无论如何都会产生相同的值。

事后想法
我不知道为什么我没有更早想到这个,我完全被固定在这个想法上,必须是CompletableFuture来计算值,必须在这些后台线程之一中运行,等等,嗯,这些都不重要,也不是任何要求。

我认为这整个问题都是一个经典的例子,即_问你真正想解决的问题_而不是_想出一个半成品的破解决方案,然后问如何修复它_。最后,我一点都不关心CompletableFuture或其任何功能,它只是我脑海中出现的最容易的方式来在后台运行某些东西。

谢谢你的帮助!

英文:

To answer my own question...

There is a surprisingly simple (and surprisingly boring) solution to my problem. I have no idea why it took me three days to find it, I guess it required the right mindset, that you only have when walking along an endless tranquilizing beach looking into the sunset on a quiet Sunday evening.

So, ah, it's a little bit embarrassing to write this, but when I need to fetch a certain value (say for 9000th product), and the future has not yet computed that value, I can, instead of somehow forcing the future to produce that value asap (by doing all this repriorisation and scheduling magic), I can, well, I can, ... simply ... compute that value myself! Yes! Wait, what? Seriously, that's it?

It's something like this: if (!future.isDone()) {future.complete(supplier.get());}

I just need to store the original Supplier alongside the CompletableFuture in some wrapper class. This is the wrapper class, which works like a charm, all it needs is a better name:

public static class FuturizedMemoizedSupplier&lt;T&gt; implements Supplier&lt;T&gt; {
private CompletableFuture&lt;T&gt; future;
private Supplier&lt;T&gt; supplier;
public FuturizedSupplier(Supplier&lt;T&gt; supplier) {
this.supplier = supplier;
this.future = CompletableFuture.supplyAsync(supplier, singleThreadExecutor);
}
public T get() {
// if the future is not yet completed, we just calculate the value ourselves, and set it into the future
if (!future.isDone()) {
future.complete(supplier.get());
}
supplier = null;
return future.join();
}
}

Now, I think, there is a small chance for a race condition here, which could lead to the supplier being executed twice. But actually, I don't care, it produces the same value anyway.

Afterthoughts:
I have no idea why I didn't think of this earlier, I was completely fixated on the idea, it has to be the CompletableFuture which calculates the value, and it has to run in one of these background threads, and whatnot, and, well, none of these mattered or were in any way a requirement.

I think this whole question is a classic example of Ask what problem you really want to solve instead of coming up with a half baked broken solution, and ask how to fix that. In the end, I didn't care about CompletableFuture or any of its features at all, it was just the easiest way that came to my mind to run something in the background.

Thanks for your help!

huangapple
  • 本文由 发表于 2020年8月8日 16:33:44
  • 转载请务必保留本文链接:https://go.coder-hub.com/63313333.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定