阻塞队列无法在同步的生产者/消费者方法中工作的要点是什么?

huangapple go评论74阅读模式
英文:

What is the point of BlockingQueue not being able to work in synchronized Producer/Consumer methods?

问题

当我第一次阅读有关接口 BlockingQueue 的时候,我了解到:如果队列中没有更多空间,生产者会阻塞队列中的任何其他 put() 调用。相反地,如果没有要取出的项,它会阻塞方法 take()。我认为它在内部的工作方式与 wait() 和 notify() 相同。例如,当没有更多元素可供读取时,内部会调用 wait(),直到生产者再添加一个元素并调用 notify()。或者至少在“旧的生产者/消费者模式”中是这样的。但是在阻塞队列中它并不是这样工作的。它是怎样的呢?它的要点是什么?我感到非常惊讶!

我将进行演示:

public class Testing {
    BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(3);

    synchronized void write() throws InterruptedException {
        for (int i = 0; i < 6; i++) {
            blockingQueue.put(i);
            System.out.println("Added " + i);
            Thread.sleep(1000);
        }
    }

    synchronized void read() throws InterruptedException {
        for (int i = 0; i < 6; i++) {
            System.out.println("Took: " + blockingQueue.take());
            Thread.sleep(3000);
        }
    }
}

class Test1 {
    public static void main(String[] args) {
        Testing testing = new Testing();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    testing.write();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    testing.read();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

输出:

Added 0
Added 1
Added 2
'program hangs'.
我的问题是,如果 take() 和 put() 内部不使用 wait() 或 notify(),它们是如何进行阻塞的呢?它们是否有一些循环,快速地消耗 CPU 资源?坦率地说,我感到困惑。
英文:

When I first read about interface BlockingQueue I read that: Producer blocks any more put() calls in a queue if it has no more space. And the opposite, it blocks method take(), if there are no items to take. I thought that it internally works same as wait() and notify(). For example, when there are no more elements to read internally wait() is called until Producer adds one more and calls notify()..or that's what we would do in 'old producer/consumer pattern. BUT IT DOESN'T WORK LIKE THAT IN BLOCKING QUEUE. How? What is the point? I am honestly surprised!

I will demonstrate:

public class Testing {
BlockingQueue&lt;Integer&gt; blockingQueue = new ArrayBlockingQueue&lt;&gt;(3);
synchronized void write() throws InterruptedException {
for (int i = 0; i &lt; 6; i++) {
blockingQueue.put(i);
System.out.println(&quot;Added &quot; + i);
Thread.sleep(1000);
}
}
synchronized void read() throws InterruptedException {
for (int i = 0; i &lt; 6; i++) {
System.out.println(&quot;Took: &quot; + blockingQueue.take());
Thread.sleep(3000);
}
}
}
class Test1 {
public static void main(String[] args) {
Testing testing = new Testing();
new Thread(new Runnable() {
@Override
public void run() {
try {
testing.write();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
testing.read();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}

OUTPUT:

Added 0
Added 1
Added 2

'program hangs'.

My questions is how does take() and put() BLOCK if they don't use wait() or notify() internally? Do they have some while loops that burns CPU circles fast? I am frankly confused.

答案1

得分: 2

这是ArrayBlockingQueue#put方法的当前实现:

/**
 * 将指定的元素插入到队列的尾部,如果队列已满,则等待空间可用。
 *
 * @throws InterruptedException {@inheritDoc}
 * @throws NullPointerException {@inheritDoc}
 */
public void put(E e) throws InterruptedException {
    Objects.requireNonNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

你会注意到,它在代码中使用了notFull.await();,其中notFull是一个Condition

关于Condition的文档中如下说明:

> Condition将对象监视器方法(waitnotifynotifyAll)拆分为不同的对象,通过与任意Lock实现结合,实现每个对象具有多个等待集的效果。在锁替代同步方法和语句的情况下,条件替代了对象监视器方法的使用。

英文:

Here's the current implementation of ArrayBlockingQueue#put:

/**
* Inserts the specified element at the tail of this queue, waiting
* for space to become available if the queue is full.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
Objects.requireNonNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}

You'll see that, instead of using wait() and notify(), it invokes notFull.await(); where notFull is a Condition.

The documentation of Condition states the following:

> Condition factors out the Object monitor methods (wait, notify and notifyAll) into distinct objects to give the effect of having multiple wait-sets per object, by combining them with the use of arbitrary Lock implementations. Where a Lock replaces the use of synchronized methods and statements, a Condition replaces the use of the Object monitor methods.

答案2

得分: 0

如果您查看下面的代码,您将了解如何使用BlockingQueue接口解决生产者/消费者问题。

在这里,您可以看到生产者和消费者共享同一个队列。

从主类中,您启动了生产者和消费者两个线程。

class Producer implements Runnable {
    protected BlockingQueue blockingQueue = null;

    public Producer(BlockingQueue blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {
        for (int i = 0; i < 6; i++) {
            try {
                blockingQueue.put(i);
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Added " + i);
        }
    }
}

class Consumer implements Runnable {

    protected BlockingQueue blockingQueue = null;

    public Consumer(BlockingQueue blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {
        for (int i = 0; i < 6; i++) {
            try {
                System.out.println("Took: " + blockingQueue.take());
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

class Test1 {
    public static void main(String[] args) throws InterruptedException {

        BlockingQueue queue = new ArrayBlockingQueue(3);

        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue);

        new Thread(producer).start();
        new Thread(consumer).start();

        Thread.sleep(4000);
    }
}

这段代码将输出类似于:

Took: 0
Added 0
Added 1
Added 2
Took: 1
Added 3
Added 4
Took: 2
Added 5
Took: 3
Took: 4
Took: 5
英文:

If you go through below code, you will get an idea that how producer/consumer problem will get resolve using BlokingQueue interface.

Here you are able to see that same queue has been shared by Producer and Consumer.

And from main class you are starting both thread Producer and Consumer.

class Producer implements Runnable {
protected BlockingQueue blockingQueue = null;
public Producer(BlockingQueue blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
for (int i = 0; i &lt; 6; i++) {
try {
blockingQueue.put(i);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(&quot;Added &quot; + i);
}
}
}
class Consumer implements Runnable {
protected BlockingQueue blockingQueue = null;
public Consumer(BlockingQueue blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
for (int i = 0; i &lt; 6; i++) {
try {
System.out.println(&quot;Took: &quot; + blockingQueue.take());
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Test1 {
public static void main(String[] args) throws InterruptedException {
BlockingQueue queue = new ArrayBlockingQueue(3);
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
new Thread(producer).start();
new Thread(consumer).start();
Thread.sleep(4000);
}
}

This code will print output like

Took: 0
Added 0
Added 1
Added 2
Took: 1
Added 3
Added 4
Took: 2
Added 5
Took: 3
Took: 4
Took: 5

答案3

得分: 0

1. Why did your code example using BlockingQueue get to 'program hangs'?

1.1 Conceptually
首先,如果我们可以暂时忽略实现层面的细节,比如'wait()'、'notify()'等,从概念上讲,JAVA中BlockingQueue的所有实现都按照规范工作,就像你所说的:

"如果队列中没有更多空间,生产者会阻止在队列中进行更多的put()调用。相反,如果没有要获取的项,它会阻止方法take()。"

因此,从概念上讲,你的代码示例之所以会出现程序挂起的情况,是因为:

1.1.1.
调用(同步的)write()方法的线程首先运行并独自运行,只有在此线程中的'testing.write()'返回后,调用(同步的)read()方法的第二个线程才有机会运行——这是同一对象中'synchronized'方法的本质。

1.1.2.
现在,在你的示例中,从概念上讲,'testing.write()'永远不会返回,在那个循环中,它会将前3个元素放入队列中,然后在第二个线程消耗/获取其中一些元素之前,它会在某种程度上'自旋等待',以便可以继续放入更多元素,但由于前面提到的1.1.1的原因,这永远不会发生。

1.2 Programmatically

1.2.1.
(对于生产者)在ArrayBlockingQueue#put中,我在1.1.2中提到的'自旋等待'表现为:

while (count == items.length) notFull.await();

1.2.2.
(对于消费者)在ArrayBlockingQueue#take中,它调用dequeue(),而dequeue()又调用notFull.signal(),这将结束1.2.1中的'自旋等待'。

2. 现在,回到你原帖的标题'BlockingQueue不能在同步的生产者/消费者方法中工作的意义是什么?'

2.1.
如果我从字面意义上理解这个问题,那么答案可能是*‘在JAVA中存在方便的BlockingQueue功能,不仅可以在同步的方法/块中使用它们’*,也就是说,它们当然可以存在于任何'synchronized'结构之外,并且可以促进原始的生产者/消费者实现。

2.2.
然而,如果你的意思是进一步询问 - 为什么JAVA的BlockQueue实现不能在同步的方法/块中轻松/顺利/平稳地工作?

那将是一个不同的问题,一个有效且有趣的问题,我也偶然在思考这个问题。

具体来说,参见这篇帖子以获取更多信息(请注意,在这篇帖子中,消费者线程因为队列为空并持有独占锁而“挂起”,与你的情况不同,在你的情况中,生产者线程因为队列已满并持有独占锁而“挂起”;但问题的核心应该是相同的)

英文:

(I'm sure some or all parts of my answer could be something that you have already understood, in that case, please just consider it as a clarification :)).

1. Why did your code example using BlockingQueue get to ‘program hangs’?

1.1 Conceptually
First of all, if we can leave out the implementation level detail such as ‘wait()’, ‘notify()’, etc for a second, conceptually, all implementation in JAVA of BlockingQueue do work to the specification, i.e. like you said:

> ‘Producer blocks any more put() calls in a queue if it has no more
> space. And the opposite, it blocks method take(), if there are no
> items to take.’

So, conceptually, the reason that your code example hangs is because

1.1.1.
the thread calling the (synchronized) write() runs first and alone, and not until ‘testing.write()’ returns in this thread, the 2nd thread calling the (synchronized) read() will ever have a chance to run — this is the essence of ‘synchronized’ methods in the same object.

1.1.2.
Now, in your example, conceptually, ‘testing.write()’ will never return, in that for loop, it will ‘put’ the first 3 elements onto the queue and then kinda ‘spin wait’ for the 2nd thread to consume/’take’ some of these elements so it can ‘put’ more, but that will never happen due to aforementioned reason in 1.1.1

1.2 Programmatically

1.2.1.
(For producer) In ArrayBlockingQueue#put, the ‘spin wait’ I mentioned in 1.1.2 took form of

while (count == items.length) notFull.await();

1.2.2.
(For consumer) In ArrayBlockingQueue#take, it calls dequeue(), which in turn calls notFull.signal(), which will end the ‘spin wait’ in 1.2.1

2.Now, back to your original post’s title ‘What is the point of BlockingQueue not being able to work in synchronized Producer/Consumer methods?’.

2.1.
If I take the literal meaning of this question, then an answer could be ‘there are reasons for a convenient BlockingQueue facility to exist in JAVA other than using them in synchronized methods/blocks’, i.e. they can certainly live outside of any ‘synchronized’ structure and facilitate a vanilla producer/consumer implementation.

2.2.
However, if you meant to inquire one step further - Why can’t JAVA BlockQueue implementations work easily/nicely/smoothly in synchronized methods/blocks?

That will be a different question, a valid and interesting one that I am also incidentally puzzling about.

Specifically, see this post for further information (note that in this post, the consumer thread ‘hangs’ because of EMPTY queue and its possession of the exclusive lock, as opposed to your case where the producer thread ‘hangs’ because of FULL queue and its possession of the exclusive lock; but the core of the problems should be the same)

huangapple
  • 本文由 发表于 2020年8月17日 04:47:45
  • 转载请务必保留本文链接:https://go.coder-hub.com/63441787.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定