并行线程需要达到一个点,等待另一个线程,然后恢复 – 最佳方法?

huangapple go评论94阅读模式
英文:

parallel threads need to reach a point, wait on another thread, and then resume - best approach?

问题

以下是翻译好的部分:

这是(简化的)情境:

  • 我有线程 A、B、C 在并行运行。
  • 一旦这 3 个线程都达到了它们执行的第 7 步,它们必须等待。(每个线程将在不同的时间达到第 7 步,事先无法知道)。
  • 然后线程 D 启动并运行直到完成。
  • 只有在 D 完成后,A、B 和 C 才能恢复并运行直到完成。

在这种情况下,使用什么工具或设计方法比较好?

到目前为止,我查阅的并发和信号量示例似乎只涉及 2 个线程,或者假设并行线程在做类似的事情,只是共享一个变量或进行消息传递。我还没有找到适用于上述情况的任何方法。我将继续研究,并会根据任何发现进行更新。

如果 CountDownLatch 不需要使其他线程退出,那么它可能适用。或者,如果可以反向使用,让 n 个线程等待,直到线程 D 退出。

我在并发方面相对较新(大学课程没有为它提供足够的时间),所以请多多包涵。在这里附上一个链接,以防有类似问题的人看到这个帖子:HowToDoInJava - Concurrency

英文:

Here's the (simplified) scenario:

  • I have threads A, B, C running in parallel.
  • Once all 3 threads have reached Step 7 of their execution, they must wait. (Each thread will reach Step 7 at a different time, which can't be known in advance).
  • Thread D then kicks off and runs to completion.
  • Only after D completes can A, B, and C resume and run to completion.

What would be a good tool or design to approach this with?

The concurrency and semaphore examples I've looked at so far seem to only deal with 2 threads, or assume that the parallel threads are doing similar things but just sharing a var or message-passing. I haven't yet found anything that works with the scenario above. I'm continuing to look into it and will update with any findings.

A CountDownLatch might work if it didn't need to have the other threads exit. Or if it could work in reverse - make n threads wait until thread D exits.

I'm fairly new to concurrency (college classes do not give it anywhere near the time it needs), so bear with me. Dropping this link here in case anyone with a similar question stumbles on this thread: HowToDoInJava - Concurrency

答案1

得分: 2

  1. import java.util.concurrent.Phaser;
  2. class Scratch {
  3. public static class ThreadABCWorker implements Runnable {
  4. String threadName;
  5. Phaser phaser;
  6. public ThreadABCWorker(String threadName, Phaser phaser) {
  7. this.threadName = threadName;
  8. this.phaser = phaser;
  9. }
  10. @Override
  11. public void run() {
  12. System.out.println("Thread " + threadName + " has started");
  13. // do steps 1-7 as part of phase 0
  14. phaser.arriveAndAwaitAdvance();
  15. // All the work for phase 1 is done in Thread D, so just arrive again and wait for D to do its thing
  16. phaser.arriveAndAwaitAdvance();
  17. System.out.println("Continue Thread" + threadName);
  18. }
  19. }
  20. public static void main(String[] args) {
  21. var phaser = new Phaser(4);
  22. var threadA = new Thread(new ThreadABCWorker("A", phaser));
  23. var threadB = new Thread(new ThreadABCWorker("B", phaser));
  24. var threadC = new Thread(new ThreadABCWorker("C", phaser));
  25. var threadD = new Thread(() -> {
  26. phaser.arriveAndAwaitAdvance(); // D shouldn't start doing its thing until phase 1
  27. System.out.println("Thread D has started");
  28. try {
  29. System.out.println("sleep 100");
  30. Thread.sleep(100);
  31. System.out.println("Thread D has finished");
  32. phaser.arriveAndDeregister(); // All done, let the other threads continue
  33. } catch (InterruptedException e) {
  34. e.printStackTrace();
  35. }
  36. });
  37. threadA.start();
  38. threadB.start();
  39. threadC.start();
  40. threadD.start();
  41. }
  42. }

示例输出:

  1. Thread A has started
  2. Thread C has started
  3. Thread B has started
  4. Thread D has started
  5. sleep 100
  6. Thread D has finished
  7. Continue ThreadB
  8. Continue ThreadA
  9. Continue ThreadC
英文:

I would use a Phaser

  1. import java.util.concurrent.Phaser;
  2. class Scratch {
  3. public static class ThreadABCWorker implements Runnable {
  4. String threadName;
  5. Phaser phaser;
  6. public ThreadABCWorker(String threadName, Phaser phaser) {
  7. this.threadName = threadName;
  8. this.phaser = phaser;
  9. }
  10. @Override
  11. public void run() {
  12. System.out.println("Thread " + threadName + " has started");
  13. // do steps 1-7 as part of phase 0
  14. phaser.arriveAndAwaitAdvance();
  15. // All the work for phase 1 is done in Thread D, so just arrive again and wait for D to do its thing
  16. phaser.arriveAndAwaitAdvance();
  17. System.out.println("Continue Thread" + threadName);
  18. }
  19. }
  20. public static void main(String[] args) {
  21. var phaser = new Phaser(4);
  22. var threadA = new Thread(new ThreadABCWorker("A", phaser));
  23. var threadB = new Thread(new ThreadABCWorker("B", phaser));
  24. var threadC = new Thread(new ThreadABCWorker("C", phaser));
  25. var threadD = new Thread(() -> {
  26. phaser.arriveAndAwaitAdvance(); // D shouldn't start doing its thing until phase 1
  27. System.out.println("Thread D has started");
  28. try {
  29. System.out.println("sleep 100");
  30. Thread.sleep(100);
  31. System.out.println("Thread D has finished");
  32. phaser.arriveAndDeregister(); // All done, let ths other threads continue
  33. } catch (InterruptedException e) {
  34. e.printStackTrace();
  35. }
  36. });
  37. threadA.start();
  38. threadB.start();
  39. threadC.start();
  40. threadD.start();
  41. }
  42. }

Example output:

  1. Thread A has started
  2. Thread C has started
  3. Thread B has started
  4. Thread D has started
  5. sleep 100
  6. Thread D has finished
  7. Continue ThreadB
  8. Continue ThreadA
  9. Continue ThreadC

答案2

得分: 1

这是我使用CountDownLatch本身想出的方法。它不一定只在线程完成时进行倒数。

  1. import java.util.Arrays;
  2. import java.util.List;
  3. import java.util.concurrent.CountDownLatch;
  4. public class ATech {
  5. private static long startThreadTime;
  6. private static CountDownLatch primaryCountDownLatch = new CountDownLatch(3);
  7. private static CountDownLatch secondaryCountDownLatch = new CountDownLatch(1);
  8. public static void main(String[] args) {
  9. List<Thread> threads = Arrays.asList(
  10. new Thread(new ThreadType1("A", 1, 5)),
  11. new Thread(new ThreadType1("B", 5, 1)),
  12. new Thread(new ThreadType1("C", 10, 10)),
  13. new Thread(new ThreadType2("D", 5))
  14. );
  15. startThreadTime = System.currentTimeMillis();
  16. System.out.println("在时间0左右启动线程");
  17. threads.forEach(Thread::start);
  18. try {
  19. for (Thread thread : threads) {
  20. thread.join();
  21. }
  22. } catch (InterruptedException e) {
  23. throw new RuntimeException(e);
  24. }
  25. System.out.println("所有线程在时间 " + (System.currentTimeMillis() - startThreadTime) + " 完成");
  26. }
  27. static class ThreadType1 implements Runnable {
  28. public ThreadType1(String name, int executionTimePreWaitInSeconds, int executionTimePostWaitInSeconds) {
  29. this.execTimePreWait = executionTimePreWaitInSeconds;
  30. this.execTimePostWait = executionTimePostWaitInSeconds;
  31. this.name = name;
  32. }
  33. int execTimePreWait;
  34. int execTimePostWait;
  35. String name;
  36. @Override
  37. public void run() {
  38. System.out.println("执行线程 " + name + "。等待 " + execTimePreWait + " 秒");
  39. try {
  40. Thread.sleep(execTimePreWait * 1000);
  41. } catch (InterruptedException e) {
  42. throw new RuntimeException(e);
  43. }
  44. // 完成我们正在做的任何事情,现在通知另一个线程我们完成了(暂时)
  45. System.out.println("线程 " + name + " 在时间 " + (System
  46. .currentTimeMillis() - startThreadTime) + " 完成。等待 latch");
  47. primaryCountDownLatch.countDown();
  48. try {
  49. secondaryCountDownLatch.await();
  50. } catch (InterruptedException e) {
  51. throw new RuntimeException(e);
  52. }
  53. System.out.println(
  54. "线程 " + name + " 在时间 " + (System.currentTimeMillis() - startThreadTime) + " 再次唤醒");
  55. System.out.println("线程 " + name + " 将休眠 " + execTimePostWait + " 秒");
  56. try {
  57. Thread.sleep(execTimePostWait * 1000);
  58. } catch (InterruptedException e) {
  59. throw new RuntimeException(e);
  60. }
  61. System.out.println(
  62. "线程 " + name + " 在时间 " + (System.currentTimeMillis() - startThreadTime) + " 完全完成");
  63. }
  64. }
  65. static class ThreadType2 implements Runnable {
  66. String name;
  67. int execTime;
  68. public ThreadType2(String name, int executionTimeInSeconds) {
  69. this.name = name;
  70. this.execTime = executionTimeInSeconds;
  71. }
  72. @Override
  73. public void run() {
  74. System.out.println("线程 " + name + " 等待其他线程完成");
  75. try {
  76. primaryCountDownLatch.await();
  77. } catch (InterruptedException e) {
  78. throw new RuntimeException(e);
  79. }
  80. System.out.println("线程 " + name + " 在时间 " + (System.currentTimeMillis() - startThreadTime) + " 醒来");
  81. System.out.println("线程 " + name + " 将工作 " + execTime + " 秒");
  82. try {
  83. Thread.sleep(execTime * 1000);
  84. } catch (InterruptedException e) {
  85. throw new RuntimeException(e);
  86. }
  87. System.out.println("线程 " + name + " 在时间 " + (System.currentTimeMillis() - startThreadTime) + " 完成");
  88. secondaryCountDownLatch.countDown();
  89. }
  90. }
  91. }

示例输出:

  1. 在时间0左右启动线程
  2. 执行线程 A。等待 1
  3. 执行线程 C。等待 10
  4. 执行线程 B。等待 5
  5. 线程 D 等待其他线程完成
  6. 线程 A 在时间 1033 完成。等待 latch
  7. 线程 B 在时间 5034 完成。等待 latch
  8. 线程 C 在时间 10034 完成。等待 latch
  9. 线程 D 在时间 10034 醒来
  10. 线程 D 将工作 5
  11. 线程 D 在时间 15034 完成
  12. 线程 A 在时间 15034 再次唤醒
  13. 线程 C 在时间 15034 再次唤醒
  14. 线程 A 将休眠 5
  15. 线程 B 再次唤醒在时间 15034
  16. 线程 B 将休眠 1
  17. 线程 C 将休眠 10
  18. 线程 B 在时间 16035 完全完成
  19. 线程 A 在时间 20034 完全完成
  20. 线程 C 在时间 25034 完全完成
  21. 所有线程在时间 25034 完成

进程以退出代码 0 结束。

英文:

Here's what I came up with using the CountDownLatch itself. It doesn't necessarily have to countdown only when threads complete.

  1. import java.util.Arrays;
  2. import java.util.List;
  3. import java.util.concurrent.CountDownLatch;
  4. /**
  5. * Sample class that utilizes very simple thread creation/execution.
  6. * &lt;p&gt;
  7. * DISCLAIMER: This class isn&#39;t meant to show all cross-cutting concerns. It just focuses on the task presented.
  8. * Naming conventions, access modifiers, etc. may not be optimal.
  9. */
  10. public class ATech {
  11. private static long startThreadTime;
  12. private static CountDownLatch primaryCountDownLatch = new CountDownLatch(3);
  13. private static CountDownLatch secondaryCountDownLatch = new CountDownLatch(1);
  14. public static void main(String[] args) {
  15. List&lt;Thread&gt; threads = Arrays.asList(
  16. new Thread(new ThreadType1(&quot;A&quot;, 1, 5)),
  17. new Thread(new ThreadType1(&quot;B&quot;, 5, 1)),
  18. new Thread(new ThreadType1(&quot;C&quot;, 10, 10)),
  19. new Thread(new ThreadType2(&quot;D&quot;, 5))
  20. );
  21. startThreadTime = System.currentTimeMillis();
  22. System.out.println(&quot;Starting threads at (about) time 0&quot;);
  23. threads.forEach(Thread::start);
  24. try {
  25. for (Thread thread : threads) {
  26. thread.join();
  27. }
  28. } catch (InterruptedException e) {
  29. throw new RuntimeException(e);
  30. }
  31. System.out.println(&quot;All threads completed at time &quot; + (System.currentTimeMillis() - startThreadTime));
  32. }
  33. static class ThreadType1 implements Runnable {
  34. public ThreadType1(String name, int executionTimePreWaitInSeconds, int executionTimePostWaitInSeconds) {
  35. this.execTimePreWait = executionTimePreWaitInSeconds;
  36. this.execTimePostWait = executionTimePostWaitInSeconds;
  37. this.name = name;
  38. }
  39. int execTimePreWait;
  40. int execTimePostWait;
  41. String name;
  42. @Override
  43. public void run() {
  44. System.out.println(&quot;Execution thread &quot; + name + &quot;. Waiting for &quot; + execTimePreWait + &quot; seconds&quot;);
  45. try {
  46. Thread.sleep(execTimePreWait * 1000);
  47. } catch (InterruptedException e) {
  48. throw new RuntimeException(e);
  49. }
  50. // Done doing whatever we were doing, now we let the other thread now we&#39;re done (for now)
  51. System.out.println(&quot;Thread &quot; + name + &quot; completed at time &quot; + (System
  52. .currentTimeMillis() - startThreadTime) + &quot;. Waiting for latch&quot;);
  53. primaryCountDownLatch.countDown();
  54. try {
  55. secondaryCountDownLatch.await();
  56. } catch (InterruptedException e) {
  57. throw new RuntimeException(e);
  58. }
  59. System.out.println(
  60. &quot;Thread &quot; + name + &quot; awoken again at time &quot; + (System.currentTimeMillis() - startThreadTime));
  61. System.out.println(&quot;Thread &quot; + name + &quot; will sleep for &quot; + execTimePostWait + &quot; seconds&quot;);
  62. try {
  63. Thread.sleep(execTimePostWait * 1000);
  64. } catch (InterruptedException e) {
  65. throw new RuntimeException(e);
  66. }
  67. System.out.println(
  68. &quot;Thread &quot; + name + &quot; completed fully at time &quot; + (System.currentTimeMillis() - startThreadTime));
  69. }
  70. }
  71. static class ThreadType2 implements Runnable {
  72. String name;
  73. int execTime;
  74. public ThreadType2(String name, int executionTimeInSeconds) {
  75. this.name = name;
  76. this.execTime = executionTimeInSeconds;
  77. }
  78. @Override
  79. public void run() {
  80. System.out.println(&quot;Thread &quot; + name + &quot; waiting for other threads to complete&quot;);
  81. try {
  82. primaryCountDownLatch.await();
  83. } catch (InterruptedException e) {
  84. throw new RuntimeException(e);
  85. }
  86. System.out.println(&quot;Thread &quot; + name + &quot; woke up at time &quot; + (System.currentTimeMillis() - startThreadTime));
  87. System.out.println(&quot;Thread &quot; + name + &quot; will work for &quot; + execTime + &quot; seconds&quot;);
  88. try {
  89. Thread.sleep(execTime * 1000);
  90. } catch (InterruptedException e) {
  91. throw new RuntimeException(e);
  92. }
  93. System.out.println(&quot;Thread &quot; + name + &quot; completed at time &quot; + (System.currentTimeMillis() - startThreadTime));
  94. secondaryCountDownLatch.countDown();
  95. }
  96. }
  97. }

Sample output:

  1. Starting threads at (about) time 0
  2. Execution thread A. Waiting for 1 seconds
  3. Execution thread C. Waiting for 10 seconds
  4. Execution thread B. Waiting for 5 seconds
  5. Thread D waiting for other threads to complete
  6. Thread A completed at time 1033. Waiting for latch
  7. Thread B completed at time 5034. Waiting for latch
  8. Thread C completed at time 10034. Waiting for latch
  9. Thread D woke up at time 10034
  10. Thread D will work for 5 seconds
  11. Thread D completed at time 15034
  12. Thread A awoken again at time 15034
  13. Thread C awoken again at time 15034
  14. Thread A will sleep for 5 seconds
  15. Thread B awoken again at time 15034
  16. Thread B will sleep for 1 seconds
  17. Thread C will sleep for 10 seconds
  18. Thread B completed fully at time 16035
  19. Thread A completed fully at time 20034
  20. Thread C completed fully at time 25034
  21. All threads completed at time 25034
  22. Process finished with exit code 0

huangapple
  • 本文由 发表于 2020年9月2日 08:08:43
  • 转载请务必保留本文链接:https://go.coder-hub.com/63696969.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定