并行线程需要达到一个点,等待另一个线程,然后恢复 – 最佳方法?

huangapple go评论76阅读模式
英文:

parallel threads need to reach a point, wait on another thread, and then resume - best approach?

问题

以下是翻译好的部分:

这是(简化的)情境:

  • 我有线程 A、B、C 在并行运行。
  • 一旦这 3 个线程都达到了它们执行的第 7 步,它们必须等待。(每个线程将在不同的时间达到第 7 步,事先无法知道)。
  • 然后线程 D 启动并运行直到完成。
  • 只有在 D 完成后,A、B 和 C 才能恢复并运行直到完成。

在这种情况下,使用什么工具或设计方法比较好?

到目前为止,我查阅的并发和信号量示例似乎只涉及 2 个线程,或者假设并行线程在做类似的事情,只是共享一个变量或进行消息传递。我还没有找到适用于上述情况的任何方法。我将继续研究,并会根据任何发现进行更新。

如果 CountDownLatch 不需要使其他线程退出,那么它可能适用。或者,如果可以反向使用,让 n 个线程等待,直到线程 D 退出。

我在并发方面相对较新(大学课程没有为它提供足够的时间),所以请多多包涵。在这里附上一个链接,以防有类似问题的人看到这个帖子:HowToDoInJava - Concurrency

英文:

Here's the (simplified) scenario:

  • I have threads A, B, C running in parallel.
  • Once all 3 threads have reached Step 7 of their execution, they must wait. (Each thread will reach Step 7 at a different time, which can't be known in advance).
  • Thread D then kicks off and runs to completion.
  • Only after D completes can A, B, and C resume and run to completion.

What would be a good tool or design to approach this with?

The concurrency and semaphore examples I've looked at so far seem to only deal with 2 threads, or assume that the parallel threads are doing similar things but just sharing a var or message-passing. I haven't yet found anything that works with the scenario above. I'm continuing to look into it and will update with any findings.

A CountDownLatch might work if it didn't need to have the other threads exit. Or if it could work in reverse - make n threads wait until thread D exits.

I'm fairly new to concurrency (college classes do not give it anywhere near the time it needs), so bear with me. Dropping this link here in case anyone with a similar question stumbles on this thread: HowToDoInJava - Concurrency

答案1

得分: 2

import java.util.concurrent.Phaser;

class Scratch {
    public static class ThreadABCWorker implements Runnable {
        String threadName;
        Phaser phaser;

        public ThreadABCWorker(String threadName, Phaser phaser) {
            this.threadName =  threadName;
            this.phaser = phaser;
        }

        @Override
        public void run() {
            System.out.println("Thread " + threadName + " has started");
            // do steps 1-7 as part of phase 0
            phaser.arriveAndAwaitAdvance();
            // All the work for phase 1 is done in Thread D, so just arrive again and wait for D to do its thing
            phaser.arriveAndAwaitAdvance();
            System.out.println("Continue Thread" + threadName);
        }
    }

    public static void main(String[] args) {
        var phaser = new Phaser(4);
        var threadA = new Thread(new ThreadABCWorker("A", phaser));
        var threadB = new Thread(new ThreadABCWorker("B", phaser));
        var threadC = new Thread(new ThreadABCWorker("C", phaser));
        var threadD = new Thread(() -> {
            phaser.arriveAndAwaitAdvance(); // D shouldn't start doing its thing until phase 1
            System.out.println("Thread D has started");

            try {
                System.out.println("sleep 100");
                Thread.sleep(100);
                System.out.println("Thread D has finished");
                phaser.arriveAndDeregister(); // All done, let the other threads continue
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        threadA.start();
        threadB.start();
        threadC.start();
        threadD.start();
    }
}

示例输出:

Thread A has started
Thread C has started
Thread B has started
Thread D has started
sleep 100
Thread D has finished
Continue ThreadB
Continue ThreadA
Continue ThreadC
英文:

I would use a Phaser

import java.util.concurrent.Phaser;
class Scratch {
public static class ThreadABCWorker implements Runnable {
String threadName;
Phaser phaser;
public ThreadABCWorker(String threadName, Phaser phaser) {
this.threadName =  threadName;
this.phaser = phaser;
}
@Override
public void run() {
System.out.println("Thread " + threadName + " has started");
// do steps 1-7 as part of phase 0
phaser.arriveAndAwaitAdvance();
// All the work for phase 1 is done in Thread D, so just arrive again and wait for D to do its thing
phaser.arriveAndAwaitAdvance();
System.out.println("Continue Thread" + threadName);
}
}
public static void main(String[] args) {
var phaser = new Phaser(4);
var threadA = new Thread(new ThreadABCWorker("A", phaser));
var threadB = new Thread(new ThreadABCWorker("B", phaser));
var threadC = new Thread(new ThreadABCWorker("C", phaser));
var threadD = new Thread(() -> {
phaser.arriveAndAwaitAdvance(); // D shouldn't start doing its thing until phase 1
System.out.println("Thread D has started");
try {
System.out.println("sleep 100");
Thread.sleep(100);
System.out.println("Thread D has finished");
phaser.arriveAndDeregister(); // All done, let ths other threads continue
} catch (InterruptedException e) {
e.printStackTrace();
}
});
threadA.start();
threadB.start();
threadC.start();
threadD.start();
}
}

Example output:

Thread A has started
Thread C has started
Thread B has started
Thread D has started
sleep 100
Thread D has finished
Continue ThreadB
Continue ThreadA
Continue ThreadC

答案2

得分: 1

这是我使用CountDownLatch本身想出的方法。它不一定只在线程完成时进行倒数。

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class ATech {

    private static long startThreadTime;
    private static CountDownLatch primaryCountDownLatch = new CountDownLatch(3);
    private static CountDownLatch secondaryCountDownLatch = new CountDownLatch(1);

    public static void main(String[] args) {
        List<Thread> threads = Arrays.asList(
                new Thread(new ThreadType1("A", 1, 5)),
                new Thread(new ThreadType1("B", 5, 1)),
                new Thread(new ThreadType1("C", 10, 10)),
                new Thread(new ThreadType2("D", 5))
        );

        startThreadTime = System.currentTimeMillis();
        System.out.println("在时间0左右启动线程");

        threads.forEach(Thread::start);

        try {
            for (Thread thread : threads) {
                thread.join();
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

        System.out.println("所有线程在时间 " + (System.currentTimeMillis() - startThreadTime) + " 完成");
    }

    static class ThreadType1 implements Runnable {

        public ThreadType1(String name, int executionTimePreWaitInSeconds, int executionTimePostWaitInSeconds) {
            this.execTimePreWait = executionTimePreWaitInSeconds;
            this.execTimePostWait = executionTimePostWaitInSeconds;
            this.name = name;
        }

        int execTimePreWait;
        int execTimePostWait;
        String name;

        @Override
        public void run() {
            System.out.println("执行线程 " + name + "。等待 " + execTimePreWait + " 秒");

            try {
                Thread.sleep(execTimePreWait * 1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }

            // 完成我们正在做的任何事情,现在通知另一个线程我们完成了(暂时)
            System.out.println("线程 " + name + " 在时间 " + (System
                    .currentTimeMillis() - startThreadTime) + " 完成。等待 latch");

            primaryCountDownLatch.countDown();

            try {
                secondaryCountDownLatch.await();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);

            }

            System.out.println(
                    "线程 " + name + " 在时间 " + (System.currentTimeMillis() - startThreadTime) + " 再次唤醒");
            System.out.println("线程 " + name + " 将休眠 " + execTimePostWait + " 秒");

            try {
                Thread.sleep(execTimePostWait * 1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }

            System.out.println(
                    "线程 " + name + " 在时间 " + (System.currentTimeMillis() - startThreadTime) + " 完全完成");
        }
    }

    static class ThreadType2 implements Runnable {

        String name;
        int execTime;

        public ThreadType2(String name, int executionTimeInSeconds) {
            this.name = name;
            this.execTime = executionTimeInSeconds;
        }

        @Override
        public void run() {
            System.out.println("线程 " + name + " 等待其他线程完成");
            try {
                primaryCountDownLatch.await();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }

            System.out.println("线程 " + name + " 在时间 " + (System.currentTimeMillis() - startThreadTime) + " 醒来");
            System.out.println("线程 " + name + " 将工作 " + execTime + " 秒");

            try {
                Thread.sleep(execTime * 1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }

            System.out.println("线程 " + name + " 在时间 " + (System.currentTimeMillis() - startThreadTime) + " 完成");
            secondaryCountDownLatch.countDown();
        }
    }
}

示例输出:

在时间0左右启动线程
执行线程 A。等待 1 秒
执行线程 C。等待 10 秒
执行线程 B。等待 5 秒
线程 D 等待其他线程完成
线程 A 在时间 1033 完成。等待 latch
线程 B 在时间 5034 完成。等待 latch
线程 C 在时间 10034 完成。等待 latch
线程 D 在时间 10034 醒来
线程 D 将工作 5 秒
线程 D 在时间 15034 完成
线程 A 在时间 15034 再次唤醒
线程 C 在时间 15034 再次唤醒
线程 A 将休眠 5 秒
线程 B 再次唤醒在时间 15034
线程 B 将休眠 1 秒
线程 C 将休眠 10 秒
线程 B 在时间 16035 完全完成
线程 A 在时间 20034 完全完成
线程 C 在时间 25034 完全完成
所有线程在时间 25034 完成

进程以退出代码 0 结束。

英文:

Here's what I came up with using the CountDownLatch itself. It doesn't necessarily have to countdown only when threads complete.

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
* Sample class that utilizes very simple thread creation/execution.
* &lt;p&gt;
* DISCLAIMER: This class isn&#39;t meant to show all cross-cutting concerns. It just focuses on the task presented.
* Naming conventions, access modifiers, etc. may not be optimal.
*/
public class ATech {
private static long startThreadTime;
private static CountDownLatch primaryCountDownLatch = new CountDownLatch(3);
private static CountDownLatch secondaryCountDownLatch = new CountDownLatch(1);
public static void main(String[] args) {
List&lt;Thread&gt; threads = Arrays.asList(
new Thread(new ThreadType1(&quot;A&quot;, 1, 5)),
new Thread(new ThreadType1(&quot;B&quot;, 5, 1)),
new Thread(new ThreadType1(&quot;C&quot;, 10, 10)),
new Thread(new ThreadType2(&quot;D&quot;, 5))
);
startThreadTime = System.currentTimeMillis();
System.out.println(&quot;Starting threads at (about) time 0&quot;);
threads.forEach(Thread::start);
try {
for (Thread thread : threads) {
thread.join();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(&quot;All threads completed at time &quot; + (System.currentTimeMillis() - startThreadTime));
}
static class ThreadType1 implements Runnable {
public ThreadType1(String name, int executionTimePreWaitInSeconds, int executionTimePostWaitInSeconds) {
this.execTimePreWait = executionTimePreWaitInSeconds;
this.execTimePostWait = executionTimePostWaitInSeconds;
this.name = name;
}
int execTimePreWait;
int execTimePostWait;
String name;
@Override
public void run() {
System.out.println(&quot;Execution thread &quot; + name + &quot;. Waiting for &quot; + execTimePreWait + &quot; seconds&quot;);
try {
Thread.sleep(execTimePreWait * 1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// Done doing whatever we were doing, now we let the other thread now we&#39;re done (for now)
System.out.println(&quot;Thread &quot; + name + &quot; completed at time &quot; + (System
.currentTimeMillis() - startThreadTime) + &quot;. Waiting for latch&quot;);
primaryCountDownLatch.countDown();
try {
secondaryCountDownLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(
&quot;Thread &quot; + name + &quot; awoken again at time &quot; + (System.currentTimeMillis() - startThreadTime));
System.out.println(&quot;Thread &quot; + name + &quot; will sleep for &quot; + execTimePostWait + &quot; seconds&quot;);
try {
Thread.sleep(execTimePostWait * 1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(
&quot;Thread &quot; + name + &quot; completed fully at time &quot; + (System.currentTimeMillis() - startThreadTime));
}
}
static class ThreadType2 implements Runnable {
String name;
int execTime;
public ThreadType2(String name, int executionTimeInSeconds) {
this.name = name;
this.execTime = executionTimeInSeconds;
}
@Override
public void run() {
System.out.println(&quot;Thread &quot; + name + &quot; waiting for other threads to complete&quot;);
try {
primaryCountDownLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(&quot;Thread &quot; + name + &quot; woke up at time &quot; + (System.currentTimeMillis() - startThreadTime));
System.out.println(&quot;Thread &quot; + name + &quot; will work for &quot; + execTime + &quot; seconds&quot;);
try {
Thread.sleep(execTime * 1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(&quot;Thread &quot; + name + &quot; completed at time &quot; + (System.currentTimeMillis() - startThreadTime));
secondaryCountDownLatch.countDown();
}
}
}

Sample output:

Starting threads at (about) time 0
Execution thread A. Waiting for 1 seconds
Execution thread C. Waiting for 10 seconds
Execution thread B. Waiting for 5 seconds
Thread D waiting for other threads to complete
Thread A completed at time 1033. Waiting for latch
Thread B completed at time 5034. Waiting for latch
Thread C completed at time 10034. Waiting for latch
Thread D woke up at time 10034
Thread D will work for 5 seconds
Thread D completed at time 15034
Thread A awoken again at time 15034
Thread C awoken again at time 15034
Thread A will sleep for 5 seconds
Thread B awoken again at time 15034
Thread B will sleep for 1 seconds
Thread C will sleep for 10 seconds
Thread B completed fully at time 16035
Thread A completed fully at time 20034
Thread C completed fully at time 25034
All threads completed at time 25034
Process finished with exit code 0

huangapple
  • 本文由 发表于 2020年9月2日 08:08:43
  • 转载请务必保留本文链接:https://go.coder-hub.com/63696969.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定