如何在Java中并行运行代码?

huangapple go评论74阅读模式
英文:

How do I run something parallel in Java?

问题

我正在尝试打印在一个范围内的所有可能组合例如如果我的`lowerBound`是3而我的`max`是5我想要以下组合(5,4 - 5,3 - 4,3)我已经使用下面找到的`helper()`函数来实现了这一点

当然如果我的`max`非常大这会有很多组合而且需要很长时间这就是为什么我尝试实现一个`ForkJoinPool`,以便任务并行运行为此我创建了一个新的`ForkJoinPool`。然后我循环遍历所有可能的r值其中r是组合中的数字数量在上面的示例中`r=3`)。对于每个r值我都创建一个新的`HelperCalculator`,它扩展了`RecursiveTask<Void>`。在这里我递归调用了`helper()`函数每次调用这个函数时我都会创建一个新的`HelperCalculator`,然后对其使用`.fork()`。

问题如下它没有正确地生成所有可能的组合实际上它根本没有生成任何组合我已经尝试在`calculator.fork()`之后添加`calculator.join()`,但那只会无限期地继续下去直到出现`OutOfMemory`错误

显然关于ForkJoinPool我有一些误解但是经过几天的尝试我已经看不到问题出在哪里了

我的主要函数

ForkJoinPool pool = (ForkJoinPool) Executors.newWorkStealingPool();
for (int r = 1; r < 25; r++) {
int lowerBound = 7;
int[] data = new int[r];
int max = 25;
calculator = new HelperCalculator(data, 0, max, 0, s, n, lowerBound);
pool.execute(calculator);
calculator.join();
}
pool.shutdown();

HelperCalculator类:

```@Override
protected Void compute() {
    helper(data, end, start, index, s, lowerBound);
    return null;
}

//生成所有可能的组合
public void helper(int[] data, int end, int start, int index, int s, int lowerBound) {
    //如果数组已填满,请打印它
    if (index == data.length) {
        System.out.println(Arrays.toString(data));
    } else if (start >= end) {
        data[index] = start;
        if(data[0] >= lowerBound) {
            HelperCalculator calculator = new HelperCalculator(data, end, start-1, index+1, s, n, lowerBound);
            calculator.fork();
            calculators.add(calculator);
            HelperCalculator calculator2 = new HelperCalculator(data, end, start-1, index, s, n, lowerBound);
            calculator2.fork();
            calculators.add(calculator2);
        }
    }
}

如何使每个HelperCalculator并行运行,以便有23个同时在使用ForkJoinPool?或者也许我应该使用不同的解决方案?

我已经尝试在calculators列表上调用join()isDone(),但这样它不会等待适当地完成,程序会直接退出。

由于有人不理解算法,这里是它:

public static void main(String[] args) {
    for(int r = 3; r > 0; r--) {
        int[] data = new int[r];
        helper(data, 0, 2, 0);
    }
}

public static void helper(int[] data, int end, int start, int index) {
    if (index == data.length) {
        System.out.println(Arrays.toString(data));
    } else if (start >= end) {
        data[index] = start;
        helper(data, end, start - 1, index + 1);
        helper(data, end, start - 1, index);
    }
}

这个的输出是:

[2, 1, 0]
[2, 1]
[2, 0]
[1, 0]
[2]
[1]
[0]
英文:

I am trying to print all possible combinations within a range. For example if my lowerBound is 3 and my max is 5, I want the following combinations: (5,4 - 5,3 - 4,3). I've implemented this with the helper() function found below.

Of course if my max is very big this is a lot of combinations and this will take a long time. That's why I'm trying to implement a ForkJoinPool, so that the tasks run parallel. For this I create a new ForkJoinPool. Then I loop over all possible values of r(Where r is the amount of numbers in the combination, in the above example r=3). For every value of r I create a new HelperCalculator, which extends RecursiveTask&lt;Void&gt;. In there I recursively call the helper() function. Every time I call this I create a new HelperCalculator and i use .fork() on that.

The problem is as follows. It is not correctly generating all possible combinations. It actually generates no combinations at all. I've tried adding calculator.join() after calculator.fork(), but that just goes on infinitely till I get an OutOfMemory error.

Obviously there is something I'm misunderstanding about the ForkJoinPool, but I can't see what anymore, after trying for days.

My main function:

            ForkJoinPool pool = (ForkJoinPool) Executors.newWorkStealingPool();
            for (int r = 1; r &lt; 25; r++) {
                int lowerBound = 7;
                int[] data = new int[r];
                int max = 25;
                calculator = new HelperCalculator(data, 0, max, 0, s, n, lowerBound);
                pool.execute(calculator);
                calculator.join();
            }
            pool.shutdown();

The HelperCalculator class:

    protected Void compute() {
        helper(data, end, start, index, s, lowerBound);
        return null;
    }

    //Generate all possible combinations
    public void helper(int[] data , int end, int start, int index,int s, int lowerBound) {
        //If the array is filled, print it
        if (index == data.length) {
                System.out.println(Arrays.toString(data));
        } else if (start &gt;= end) {
            data[index] = start;
            if(data[0] &gt;= lowerBound) {
                HelperCalculator calculator = new HelperCalculator(data,end, start-1, index+1, s, n, lowerBound);
                calculator.fork();
                calculators.add(calculator);
                HelperCalculator calculator2 = new HelperCalculator(data, end, start-1, index, s, n, lowerBound);
                calculator2.fork();
                calculators.add(calculator2);
            }
        }

How do I make every HelperCalculator run parallel, so that there are 23 running at the same time using a ForkJoinPool? Or should I perhaps use a different solution?

I've tried calling join() and isDone() on the calculators list, but then it doesn't wait for it to finish properly and the program just exits.

Because someone doesn't understand the algorithm, here it is:

    public static void main(String[] args) {
            for(int r = 3; r &gt; 0; r--) {
                int[] data = new int[r];
                helper(data, 0, 2, 0);
            }
    }

    public static void helper(int[] data , int end, int start, int index) {
        if (index == data.length) {
            System.out.println(Arrays.toString(data));
        } else if (start &gt;= end) {
            data[index] = start;
                helper(data, end, start - 1, index + 1);
                helper(data, end, start - 1, index);
            }
        }
    }

The output of this is:

[2, 1, 0]
[2, 1]
[2, 0]
[1, 0]
[2]
[1]
[0]

答案1

得分: 4

以下是翻译好的内容:

一些你正在分叉的任务尝试使用相同的数组来评估不同的组合。您可以通过为每个任务创建一个不同的数组来解决此问题,或者通过将并行性限制在那些已经拥有自己数组的任务上,即那些具有不同长度的任务上。

但还有另一种可能性;根本不使用数组。您可以将组合存储到int值中,因为每个int值都是位的组合。这不仅可以节省大量内存,还可以通过简单地增加值来轻松迭代所有可能的组合,因为迭代所有int数字也等同于迭代所有可能的位组合。我们需要实现的唯一一件事是为特定的int值生成正确的字符串,方法是根据它们的位置将位解释为数字。

首次尝试,我们可以采取简单的方法并使用已有的类:

public static void main(String[] args) {
    long t0 = System.nanoTime();
    combinations(10, 25);
    long t1 = System.nanoTime();
    System.out.println((t1 - t0) / 1_000_000 + " ms");
    System.out.flush();
}

static void combinations(int start, int end) {
    for (int i = 1, stop = (1 << (end - start)) - 1; i <= stop; i++) {
        System.out.println(
            BitSet.valueOf(new long[]{i}).stream()
                  .mapToObj(b -> String.valueOf(b + start))
                  .collect(Collectors.joining(", ", "[", "]"))
        );
    }
}

这个方法使用了一个排他性的结束,所以对于您的示例,您需要像这样调用它:combinations(0, 3),它将打印:

[0]
[1]
[0, 1]
[2]
[0, 2]
[1, 2]
[0, 1, 2]
3 ms

对于上面的combinations(10, 25)示例,它打印出所有的组合,然后在我的机器上显示3477 ms。这听起来像是一个优化的机会,但我们首先应该考虑哪些操作产生了哪些开销。

在这里,迭代组合已经被减少为一个微不足道的操作。创建字符串的开销要大一个数量级。但与实际的打印相比,这仍然微不足道,打印包括数据传输到操作系统,并且根据系统的不同,实际的呈现可能会增加我们的时间。由于这是在保持锁定的情况下完成的,在同时尝试同时打印的所有线程将被阻塞,使其成为一个不可并行化的操作。

让我们通过创建一个新的PrintStream,在换行时禁用自动刷新,并使用一个极大的缓冲区,可以容纳整个输出,来识别成本的部分:

public static void main(String[] args) {
    System.setOut(new PrintStream(
        new BufferedOutputStream(new FileOutputStream(FileDescriptor.out), 1 << 20), false));
    long t0 = System.nanoTime();
    combinations(10, 25);
    long t1 = System.nanoTime();
    System.out.flush();
    long t2 = System.nanoTime();
    System.out.println((t1 - t0) / 1_000_000 + " ms");
    System.out.println((t2 - t0) / 1_000_000 + " ms");
    System.out.flush();
}
static void combinations(int start, int end) {
    for (int i = 1, stop = (1 << (end - start)) - 1; i <= stop; i++) {
        System.out.println(
            BitSet.valueOf(new long[]{i}).stream()
                  .mapToObj(b -> String.valueOf(b + start))
                  .collect(Collectors.joining(", ", "[", "]"))
        );
    }
}

在我的机器上,它打印出大约如下的内容:

93 ms
3340 ms

显示代码在非可并行化打印上花费了超过三秒的时间,而在计算上仅花费了约100毫秒。为了完整起见,以下代码将为生成的String进行更深层次的处理:

static void combinations(int start, int end) {
    for (int i = 1, stop = (1 << (end - start)) - 1; i <= stop; i++) {
        System.out.println(bits(i, start));
    }
}
static String bits(int bits, int offset) {
    StringBuilder sb = new StringBuilder().append('[');
    for (;;) {
        int bit = Integer.lowestOneBit(bits), num = Integer.numberOfTrailingZeros(bit);
        sb.append(num + offset);
        bits -= bit;
        if (bits == 0) break;
        sb.append(", ");
    }
    return sb.append(']').toString();
}

这将在我的机器上减少一半的计算时间,同时对总时间几乎没有影响,这应该不会让人感到意外。


但出于教育目的,忽略潜在加速的缺失,让我们讨论如何并行化这个操作。

顺序代码已经将任务转化为从一个起始值迭代到一个结束值的形式。现在,我们将此代码重写为一个ForkJoinTask(或适当的子类),该任务表示带有开始和结束值的迭代。然后,我们添加了将此操作分割为两个部分的能力,通过将范围分成中间部分,这样我们就得到了两个任务,每个任务都在范围的一半上进行迭代。这可以重复进行,直到我们决定有足够的潜在并行作业,并在本地执行当前迭代。在本地处理之后,我们必须等待我们分割的任何任务完成,以确保根任务的完成意味着所有子任务的完成。



<details>
<summary>英文:</summary>

Some of the tasks you are forking attempt to use the same array for evaluating different combinations. You can solve the issue by creating a distinct array for each task or by limiting the parallelism to those tasks which already have an array on their own, i.e. those with different length.

But theres another possibility; dont use arrays at all. You can store combinations into `int` values, as each `int` value is a combination of bits. This does not only save a lot of memory, but you can also easily iterate over all possible combinations by just incrementing the value, as iterating over all `int` numbers also iterates over all possible bit combinations&#185;. The only thing we need to implement is generating the right string for a particular `int` value by interpreting the bits as numbers according to their position.

For a first attempt, we can take the easy way and use already existing classes:

    public static void main(String[] args) {
        long t0 = System.nanoTime();
        combinations(10, 25);
        long t1 = System.nanoTime();
        System.out.println((t1 - t0)/1_000_000+&quot; ms&quot;);
        System.out.flush();
    }
    static void combinations(int start, int end) {
        for(int i = 1, stop = (1 &lt;&lt; (end - start)) - 1; i &lt;= stop; i++) {
            System.out.println(
                BitSet.valueOf(new long[]{i}).stream()
                      .mapToObj(b -&gt; String.valueOf(b + start))
                      .collect(Collectors.joining(&quot;, &quot;, &quot;[&quot;, &quot;]&quot;))
            );
        }
    }
The method uses an exclusive end, so for your example, you have to call it like `combinations(0, 3)` and it will print

&lt;!-- language: lang-none --&gt;

    [0]
    [1]
    [0, 1]
    [2]
    [0, 2]
    [1, 2]
    [0, 1, 2]
    3 ms
&lt;!-- --&gt;
&lt;sup&gt;of course, timing may vary&lt;/sup&gt;

For the `combinations(10, 25)` example above, it prints all combinations, followed by `3477 ms` on my machine. This sounds like an opportunity to optimize, but we should first think about which operations impose which costs.

Iterating over the combinations has been reduced to a trivial operation here. Creating the string is an order of magnitude more expensive. But this is still nothing compared to the actual printing which includes a data transfer to the operating system and, depending on the system, the actual rendering may add to our time. Since this is done while holding a lock within `PrintStream`, all threads attempting to print at the same time would be blocked, making it a nonparallelizable operation.

Lets identify the fraction of the cost, by creating a new `PrintStream`, disabling the auto-flush on line breaks and using an insanely large buffer, capable of holding the entire output:

    public static void main(String[] args) {
        System.setOut(new PrintStream(
            new BufferedOutputStream(new FileOutputStream(FileDescriptor.out),1&lt;&lt;20),false));
        long t0 = System.nanoTime();
        combinations(10, 25);
        long t1 = System.nanoTime();
        System.out.flush();
        long t2 = System.nanoTime();
        System.out.println((t1 - t0)/1_000_000+&quot; ms&quot;);
        System.out.println((t2 - t0)/1_000_000+&quot; ms&quot;);
        System.out.flush();
    }
    static void combinations(int start, int end) {
        for(int i = 1, stop = (1 &lt;&lt; (end - start)) - 1; i &lt;= stop; i++) {
            System.out.println(
                BitSet.valueOf(new long[]{i}).stream()
                      .mapToObj(b -&gt; String.valueOf(b + start))
                      .collect(Collectors.joining(&quot;, &quot;, &quot;[&quot;, &quot;]&quot;))
            );
        }
    }
On my machine, it prints something in the order of

&lt;!-- language: lang-none --&gt;

    93 ms
    3340 ms

Showing that the code spent more than three seconds on the nonparallelizable printing and only about 100 milliseconds on the calculation. For completeness, the following code goes a level down for the `String` generation:

    static void combinations(int start, int end) {
        for(int i = 1, stop = (1 &lt;&lt; (end - start)) - 1; i &lt;= stop; i++) {
            System.out.println(bits(i, start));
        }
    }
    static String bits(int bits, int offset) {
        StringBuilder sb = new StringBuilder().append(&#39;[&#39;);
        for(;;) {
            int bit = Integer.lowestOneBit(bits), num = Integer.numberOfTrailingZeros(bit);
            sb.append(num + offset);
            bits -= bit;
            if(bits == 0) break;
            sb.append(&quot;, &quot;);
        }
        return sb.append(&#39;]&#39;).toString();
    }
which halves the calculation time on my machine, while having no noticable impact on the total time, which shouldnt come as a surprise now.

---

But for education purposes, ignoring the lack of potential acceleration, lets discuss how we would parallelize this operation.

The sequential code did already bring the task into a form which boils down to an iteration from a start value to an end value. Now, we rewrite this code to a `ForkJoinTask` (or suitable subclass) which represents an iteration with a start and end value. Then, we add the ability to split this operation into two, by splitting the range in the middle, so we get two tasks iterating over each half of the range. This can be repeated until we decide to have enough potentially parallel jobs and perform the current iteration locally. After the local processing we have to wait for the completion of any task we split off, to ensure that the completion of the root task implies the completion of all subtasks.

    public class Combinations extends RecursiveAction {
        public static void main(String[] args) {
            System.setOut(new PrintStream(new BufferedOutputStream(
                new FileOutputStream(FileDescriptor.out),1&lt;&lt;20),false));
            ForkJoinPool pool = (ForkJoinPool) Executors.newWorkStealingPool();
            long t0 = System.nanoTime();
            Combinations job = Combinations.get(10, 25);
            pool.execute(job);
            job.join();
            long t1 = System.nanoTime();
            System.out.flush();
            long t2 = System.nanoTime();
            System.out.println((t1 - t0)/1_000_000+&quot; ms&quot;);
            System.out.println((t2 - t0)/1_000_000+&quot; ms&quot;);
            System.out.flush();
        }

        public static Combinations get(int min, int max) {
            return new Combinations(min, 1, (1 &lt;&lt; (max - min)) - 1);
        }

        final int offset, from;
        int to;

        private Combinations(int offset, int from, int to) {
            this.offset = offset;
            this.from = from;
            this.to = to;
        }

        @Override
        protected void compute() {
            ArrayDeque&lt;Combinations&gt; spawned = new ArrayDeque&lt;&gt;();
            while(getSurplusQueuedTaskCount() &lt; 2) {
                int middle = (from + to) &gt;&gt;&gt; 1;
                if(middle == from) break;
                Combinations forked = new Combinations(offset, middle, to);
                forked.fork();
                spawned.addLast(forked);
                to = middle - 1;
            }
            performLocal();
            for(;;) {
                Combinations forked = spawned.pollLast();
                if(forked == null) break;
                if(forked.tryUnfork()) forked.performLocal(); else forked.join();
            }
        }

        private void performLocal() {
            for(int i = from, stop = to; i &lt;= stop; i++) {
                System.out.println(bits(i, offset));
            }
        }

        static String bits(int bits, int offset) {
            StringBuilder sb = new StringBuilder().append(&#39;[&#39;);
            for(;;) {
                int bit=Integer.lowestOneBit(bits), num=Integer.numberOfTrailingZeros(bit);
                sb.append(num + offset);
                bits -= bit;
                if(bits == 0) break;
                sb.append(&quot;, &quot;);
            }
            return sb.append(&#39;]&#39;).toString();
        }
    }

The [`getSurplusQueuedTaskCount()`][Surplus] provides us with a hint about the saturation of the worker threads, in other words, whether forking more jobs might be beneficial. The returned number is compared with a threshold that is typically a small number, the more heterogeneous the jobs and hence, the expected workload, the higher should be the threshold to allow more work-stealing when jobs complete earlier than others. In our case, the workload is expected to be very well balanced.

There are two ways of splitting. Examples often create two or more forked subtasks, followed by joining them. This may lead to a large number of tasks just waiting for others. The alternative is to fork a subtask and alter the current task, to represent the other. Here, the forked task represents the `[middle, to]` range whereas the current task is modified to represent the `[from, middle]` range.

After forking enough tasks, the remaining range is processed locally in the current thread. Then, the task will wait for all forked subtasks, with one optimization: it will [try to *unfork*] the subtasks, to process them locally if no other worker thread has stolen them yet.

This works smoothly, but unfortunately, as expected, it does not accelerate the operation, as the most expensive part is the printing.

---

&#185; Using an `int` to represent all combinations reduces the supported range length to 31, but keep in mind that such a range length implies `2&#179;&#185; - 1` combinations, which is quite a lot to iterate over. If that still feels like a limitation, you may change the code to use `long` instead. The then-supported range length of 63, in other words `2&#179; - 1` combinations, is enough to keep to computer busy until the end of the universe.

[Surplus]: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinTask.html#getSurplusQueuedTaskCount--
[try to *unfork*]: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinTask.html#tryUnfork--


</details>



huangapple
  • 本文由 发表于 2020年4月8日 20:06:20
  • 转载请务必保留本文链接:https://go.coder-hub.com/61100284.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定