英文:
Java Concurrency in practice “Listing 12.5. Producer-consumer test program for BoundedBuffer.” cyclic barrier await understanding?
问题
抱歉,代码部分我无法翻译。以下是您提供的文本的翻译:
我正在阅读《Java并发实战》并遇到了以下的代码片段。第12.5节 https://jcip.net/listings/PutTakeTest.java
// 第12.5节。有界缓存的生产者-消费者测试程序。
package net.jcip.examples;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import junit.framework.TestCase;
/**
* PutTakeTest
* <p/>
* 有界缓存的生产者-消费者测试程序
* 作者:Brian Goetz 和 Tim Peierls
*/
public class PutTakeTest extends TestCase {
protected static final ExecutorService pool = Executors.newCachedThreadPool();
protected CyclicBarrier barrier;
protected final SemaphoreBoundedBuffer<Integer> bb;
protected final int nTrials, nPairs;
protected final AtomicInteger putSum = new AtomicInteger(0);
protected final AtomicInteger takeSum = new AtomicInteger(0);
public static void main(String[] args) throws Exception {
new PutTakeTest(10, 10, 100000).test(); // 示例参数
pool.shutdown();
}
public PutTakeTest(int capacity, int npairs, int ntrials) {
this.bb = new SemaphoreBoundedBuffer<Integer>(capacity);
this.nTrials = ntrials;
this.nPairs = npairs;
this.barrier = new CyclicBarrier(npairs * 2 + 1);
}
void test() {
try {
for (int i = 0; i < nPairs; i++) {
pool.execute(new Producer());
pool.execute(new Consumer());
}
barrier.await(); // 等待所有线程准备就绪
barrier.await(); // 等待所有线程完成
assertEquals(putSum.get(), takeSum.get());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
static int xorShift(int y) {
y ^= (y << 6);
y ^= (y >>> 21);
y ^= (y << 7);
return y;
}
class Producer implements Runnable {
public void run() {
try {
int seed = (this.hashCode() ^ (int) System.nanoTime());
int sum = 0;
barrier.await();
for (int i = nTrials; i > 0; --i) {
bb.put(seed);
sum += seed;
seed = xorShift(seed);
}
putSum.getAndAdd(sum);
barrier.await();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
class Consumer implements Runnable {
public void run() {
try {
barrier.await();
int sum = 0;
for (int i = nTrials; i > 0; --i) {
sum += bb.take();
}
takeSum.getAndAdd(sum);
barrier.await();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}
我发现很难理解在主线程或可运行线程中如何第二次调用循环屏障。据我理解,循环屏障会阻塞线程,直到所有线程上调用了await,并且屏障计数与构造函数中传递的值相匹配。当第一次在线程上等待屏障时,循环屏障中的等待计数将是所需值(npairs * 2 + 1)的一半。在生产者和消费者中,控制是如何执行put sum和take sum的计算,并在主线程上连续执行的呢?
如果这个问题听起来很幼稚,我提前道歉。
英文:
I am reading Java Concurrency in Practice and encounter the following code snippet.
Listing 12.5 https://jcip.net/listings/PutTakeTest.java
// Listing 12.5. Producer-consumer test program for BoundedBuffer.
package net.jcip.examples;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import junit.framework.TestCase;
/**
* PutTakeTest
* <p/>
* Producer-consumer test program for BoundedBuffer
*
* @author Brian Goetz and Tim Peierls
*/
public class PutTakeTest extends TestCase {
protected static final ExecutorService pool = Executors.newCachedThreadPool();
protected CyclicBarrier barrier;
protected final SemaphoreBoundedBuffer<Integer> bb;
protected final int nTrials, nPairs;
protected final AtomicInteger putSum = new AtomicInteger(0);
protected final AtomicInteger takeSum = new AtomicInteger(0);
public static void main(String[] args) throws Exception {
new PutTakeTest(10, 10, 100000).test(); // sample parameters
pool.shutdown();
}
public PutTakeTest(int capacity, int npairs, int ntrials) {
this.bb = new SemaphoreBoundedBuffer<Integer>(capacity);
this.nTrials = ntrials;
this.nPairs = npairs;
this.barrier = new CyclicBarrier(npairs * 2 + 1);
}
void test() {
try {
for (int i = 0; i < nPairs; i++) {
pool.execute(new Producer());
pool.execute(new Consumer());
}
barrier.await(); // wait for all threads to be ready
barrier.await(); // wait for all threads to finish
assertEquals(putSum.get(), takeSum.get());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
static int xorShift(int y) {
y ^= (y << 6);
y ^= (y >>> 21);
y ^= (y << 7);
return y;
}
class Producer implements Runnable {
public void run() {
try {
int seed = (this.hashCode() ^ (int) System.nanoTime());
int sum = 0;
barrier.await();
for (int i = nTrials; i > 0; --i) {
bb.put(seed);
sum += seed;
seed = xorShift(seed);
}
putSum.getAndAdd(sum);
barrier.await();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
class Consumer implements Runnable {
public void run() {
try {
barrier.await();
int sum = 0;
for (int i = nTrials; i > 0; --i) {
sum += bb.take();
}
takeSum.getAndAdd(sum);
barrier.await();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}
I am finding it hard to understand how cyclic barrier is invoked in main or runnable thread second time. As per my understanding, Cyclic barrier will block the thread until await is invoked on all the threads and the barrier count matches the value passed in constructor. When the barrier is awaited first time on threads, the await count in cyclic barrier will be half of desired value of (npairs * 2 + 1). How is the control executing the put sum and take sum calculation in producer and consumers, and consecutive execution on main thread?
Apologies in advance if this question sounds naive.
答案1
得分: 0
主线程在调用屏障上的await
之前启动了npairs
个生产者和npairs
个消费者。每个生产者和消费者都调用了await
,因此加上主线程,它允许所有线程通过屏障。
英文:
The main thread starts npairs
producers and npairs
consumers before calling await
on a barrier. Each if producers and consumers calls await
so together with a main thread it allows all threads to go through barrier.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论