Java Concurrency in practice “Listing 12.5. Producer-consumer test program for BoundedBuffer.” cyclic barrier await understanding?

huangapple go评论63阅读模式
英文:

Java Concurrency in practice “Listing 12.5. Producer-consumer test program for BoundedBuffer.” cyclic barrier await understanding?

问题

抱歉,代码部分我无法翻译。以下是您提供的文本的翻译:

我正在阅读《Java并发实战》并遇到了以下的代码片段。第12.5节 https://jcip.net/listings/PutTakeTest.java

// 第12.5节。有界缓存的生产者-消费者测试程序。
package net.jcip.examples;

import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

import junit.framework.TestCase;

/**
 * PutTakeTest
 * <p/>
 * 有界缓存的生产者-消费者测试程序
 * 作者:Brian Goetz 和 Tim Peierls
 */
public class PutTakeTest extends TestCase {
    protected static final ExecutorService pool = Executors.newCachedThreadPool();
    protected CyclicBarrier barrier;
    protected final SemaphoreBoundedBuffer<Integer> bb;
    protected final int nTrials, nPairs;
    protected final AtomicInteger putSum = new AtomicInteger(0);
    protected final AtomicInteger takeSum = new AtomicInteger(0);

    public static void main(String[] args) throws Exception {
        new PutTakeTest(10, 10, 100000).test(); // 示例参数
        pool.shutdown();
    }

    public PutTakeTest(int capacity, int npairs, int ntrials) {
        this.bb = new SemaphoreBoundedBuffer<Integer>(capacity);
        this.nTrials = ntrials;
        this.nPairs = npairs;
        this.barrier = new CyclicBarrier(npairs * 2 + 1);
    }

    void test() {
        try {
            for (int i = 0; i < nPairs; i++) {
                pool.execute(new Producer());
                pool.execute(new Consumer());
            }
            barrier.await(); // 等待所有线程准备就绪
            barrier.await(); // 等待所有线程完成
            assertEquals(putSum.get(), takeSum.get());
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    static int xorShift(int y) {
        y ^= (y << 6);
        y ^= (y >>> 21);
        y ^= (y << 7);
        return y;
    }

    class Producer implements Runnable {
        public void run() {
            try {
                int seed = (this.hashCode() ^ (int) System.nanoTime());
                int sum = 0;
                barrier.await();
                for (int i = nTrials; i > 0; --i) {
                    bb.put(seed);
                    sum += seed;
                    seed = xorShift(seed);
                }
                putSum.getAndAdd(sum);
                barrier.await();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    class Consumer implements Runnable {
        public void run() {
            try {
                barrier.await();
                int sum = 0;
                for (int i = nTrials; i > 0; --i) {
                    sum += bb.take();
                }
                takeSum.getAndAdd(sum);
                barrier.await();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }
}

我发现很难理解在主线程或可运行线程中如何第二次调用循环屏障。据我理解,循环屏障会阻塞线程,直到所有线程上调用了await,并且屏障计数与构造函数中传递的值相匹配。当第一次在线程上等待屏障时,循环屏障中的等待计数将是所需值(npairs * 2 + 1)的一半。在生产者和消费者中,控制是如何执行put sum和take sum的计算,并在主线程上连续执行的呢?

如果这个问题听起来很幼稚,我提前道歉。

英文:

I am reading Java Concurrency in Practice and encounter the following code snippet.
Listing 12.5 https://jcip.net/listings/PutTakeTest.java

// Listing 12.5. Producer-consumer test program for BoundedBuffer.
package net.jcip.examples;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import junit.framework.TestCase;
/**
* PutTakeTest
* &lt;p/&gt;
* Producer-consumer test program for BoundedBuffer
*
* @author Brian Goetz and Tim Peierls
*/
public class PutTakeTest extends TestCase {
protected static final ExecutorService pool = Executors.newCachedThreadPool();
protected CyclicBarrier barrier;
protected final SemaphoreBoundedBuffer&lt;Integer&gt; bb;
protected final int nTrials, nPairs;
protected final AtomicInteger putSum = new AtomicInteger(0);
protected final AtomicInteger takeSum = new AtomicInteger(0);
public static void main(String[] args) throws Exception {
new PutTakeTest(10, 10, 100000).test(); // sample parameters
pool.shutdown();
}
public PutTakeTest(int capacity, int npairs, int ntrials) {
this.bb = new SemaphoreBoundedBuffer&lt;Integer&gt;(capacity);
this.nTrials = ntrials;
this.nPairs = npairs;
this.barrier = new CyclicBarrier(npairs * 2 + 1);
}
void test() {
try {
for (int i = 0; i &lt; nPairs; i++) {
pool.execute(new Producer());
pool.execute(new Consumer());
}
barrier.await(); // wait for all threads to be ready
barrier.await(); // wait for all threads to finish
assertEquals(putSum.get(), takeSum.get());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
static int xorShift(int y) {
y ^= (y &lt;&lt; 6);
y ^= (y &gt;&gt;&gt; 21);
y ^= (y &lt;&lt; 7);
return y;
}
class Producer implements Runnable {
public void run() {
try {
int seed = (this.hashCode() ^ (int) System.nanoTime());
int sum = 0;
barrier.await();
for (int i = nTrials; i &gt; 0; --i) {
bb.put(seed);
sum += seed;
seed = xorShift(seed);
}
putSum.getAndAdd(sum);
barrier.await();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
class Consumer implements Runnable {
public void run() {
try {
barrier.await();
int sum = 0;
for (int i = nTrials; i &gt; 0; --i) {
sum += bb.take();
}
takeSum.getAndAdd(sum);
barrier.await();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}

I am finding it hard to understand how cyclic barrier is invoked in main or runnable thread second time. As per my understanding, Cyclic barrier will block the thread until await is invoked on all the threads and the barrier count matches the value passed in constructor. When the barrier is awaited first time on threads, the await count in cyclic barrier will be half of desired value of (npairs * 2 + 1). How is the control executing the put sum and take sum calculation in producer and consumers, and consecutive execution on main thread?

Apologies in advance if this question sounds naive.

答案1

得分: 0

主线程在调用屏障上的await之前启动了npairs个生产者和npairs个消费者。每个生产者和消费者都调用了await,因此加上主线程,它允许所有线程通过屏障。

英文:

The main thread starts npairs producers and npairs consumers before calling await on a barrier. Each if producers and consumers calls await so together with a main thread it allows all threads to go through barrier.

huangapple
  • 本文由 发表于 2020年9月27日 02:02:32
  • 转载请务必保留本文链接:https://go.coder-hub.com/64080960.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定