Java – BlockingQueue freezes multithread application

huangapple go评论62阅读模式
英文:

Java - BlockingQueue freezes multithread application

问题

I'm making an application that contains of two threads: one of them writes a value to LinkedBlockingQueue, another is reading. I'm using ScheduledExecutorService for running this operations in some period in seconds.
The problem is that my application is freezing on the method take of BlockingQueue and i cant understand why.

This is a common resourse:

class Res{
    AtomicInteger atomicInteger = new AtomicInteger(0);
    BlockingQueue<String> q = new LinkedBlockingQueue<>();
}

This is reader

Semaphore semaphore = new Semaphore(1); //this is for reader does not take two places in thread pool
Runnable reader = ()->{
    try {
        semaphore.acquire();
        System.out.println(res.q.take()+" "+res.atomicInteger.incrementAndGet());
        semaphore.release();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
};

Writer:

Runnable writer = ()->{
    res.q.add("hi");
};

Full code:

class Res{
    AtomicInteger atomicInteger = new AtomicInteger(0);
    BlockingQueue<String> q = new LinkedBlockingQueue<>();
}
public class Main {

    public static void main(String[] args) throws InterruptedException {

        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
        Res res = new Res();
        Semaphore semaphore = new Semaphore(1); //this is for reader does not take two places in thread pool
        Runnable reader = ()->{
            try {
                semaphore.acquire();
                System.out.println(res.q.take()+" "+res.atomicInteger.incrementAndGet());
                semaphore.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
        Runnable writer = ()->{
            res.q.add("hi");
        };
        Random rnd = new Random();

        for (int i = 0; i < 20; i++) {
            int time = rnd.nextInt(5)+ 2;
            executorService.schedule(writer,time, TimeUnit.SECONDS);
        }
        for (int i = 0; i < 20; i++) {
            int time = rnd.nextInt(5)+ 2;
            executorService.schedule(reader,time, TimeUnit.SECONDS);
        }

        executorService.shutdown();
    }
}

It should print twenty lines "hi [number]", but freezes on some line.
For example, my current print:

hi 1
hi 2
hi 3
hi 4
hi 5

I found out If i increase count of threads newScheduledThreadPool(20) it starts work, but how can I make it with two threads? Thanks!

英文:

I'm making an application that contains of two threads: one of them writes a value to LinkedBlockingQueue, another is reading. I'm using ScheduledExecutorService for running this operations in some period in seconds.
The problem is that my application is freezing on the method take of BlockingQueue and i cant understand why.

This is a common resourse:

class Res{
    AtomicInteger atomicInteger = new AtomicInteger(0);
    BlockingQueue&lt;String&gt; q = new LinkedBlockingQueue&lt;&gt;();
}

This is reader

Semaphore semaphore = new Semaphore(1); /this is for reader does not take two places in thread pool
Runnable reader = ()-&gt;{
    try {
        semaphore.acquire();
        System.out.println(res.q.take()+&quot; &quot;+res.atomicInteger.incrementAndGet());
        semaphore.release();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
};

Writer:

Runnable writer = ()-&gt;{
    res.q.add(&quot;hi&quot;);
};

Full code:

class Res{
    AtomicInteger atomicInteger = new AtomicInteger(0);
    BlockingQueue&lt;String&gt; q = new LinkedBlockingQueue&lt;&gt;();
}
public class Main {

    public static void main(String[] args) throws InterruptedException {

        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
        Res res = new Res();
        Semaphore semaphore = new Semaphore(1); //this is for reader does not take two places in thread pool
        Runnable reader = ()-&gt;{
            try {
                semaphore.acquire();
                System.out.println(res.q.take()+&quot; &quot;+res.atomicInteger.incrementAndGet());
                semaphore.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
        Runnable writer = ()-&gt;{
            res.q.add(&quot;hi&quot;);
        };
        Random rnd = new Random();

        for (int i = 0; i &lt; 20; i++) {
            int time = rnd.nextInt(5)+ 2;
            executorService.schedule(writer,time, TimeUnit.SECONDS);
        }
        for (int i = 0; i &lt; 20; i++) {
            int time = rnd.nextInt(5)+ 2;
            executorService.schedule(reader,time, TimeUnit.SECONDS);
        }

        executorService.shutdown();
}

It should print twenty lines "hi [number]", but freezes on some line.
For example, my current print:

hi 1
hi 2
hi 3
hi 4
hi 5

I found out If i increase count of threads newScheduledThreadPool(20) it starts work, but how can I make it with two threads? Thanks!

答案1

得分: 2

代码部分不要翻译。以下是翻译好的内容:

"虽然代码有点难以理解,但同时也很明显正在发生什么。你最多可以同时运行两个线程,这是由于Executors.newScheduledThreadPool(2);。这两个线程都是“读取者”线程。

所以Thread-1进入了try块并通过semaphore.acquire();获取了信号量许可,但队列为空,因此它在res.q.take()上阻塞。下一个线程 - Thread-2 也是读取者线程,但它无法获取许可,因为许可已经被Thread-1占用,并且在semaphore.acquire();上阻塞。由于没有其他线程的空间(线程池正在与这两个线程一起工作),没有写入者会将内容放入队列,因此无法解除Thread-1的阻塞状态(以便使res.q.take()正常工作)。

添加更多的工作线程只会推迟问题 - 你可能会陷入之前的同样困境。"

英文:

It's a bit hard to follow your code, though it is obvious at the same time what is going on. You can run two threads at a time, at most, because of Executors.newScheduledThreadPool(2);. Both of these threads are reader threads.

So Thread-1 entered the try block and acquired the semaphore permit via semaphore.acquire();, but the queue was empty - as such it blocks on res.q.take(). The next thread - Thread-2 is a reader thread too, but it can not acquire a permit, since it is already taken by Thread-1 and is blocked on semaphore.acquire();. Since you have no room for other Threads (you pool is blocked working with these two Threads), there are no writers that would put something in your queue and as such unblock Thread-1 (so that res.q.take() would work).

Adding more worker Threads just delays the problem - you could end up in the same position as you were before.

huangapple
  • 本文由 发表于 2020年4月8日 04:16:09
  • 转载请务必保留本文链接:https://go.coder-hub.com/61088676.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定