英文:
How does Thread.interrupt affect ArrayBlockingQueue::put in Java?
问题
I am using a ArrayBlockingQueue
to transport work items from a Supplier to multiple workers.
我正在使用ArrayBlockingQueue
来从供应商传递工作项给多个工作者。
I am also using Thread.interrupt
to signal the Supplier to stop supplying new work items.
我还使用Thread.interrupt
来通知供应商停止提供新的工作项。
However, my process of Interrupting the Supplier followed by clearing the queue doesn't reliably clear my work queue.
然而,我的中断供应商的过程,然后清空队列并不能可靠地清空我的工作队列。
The function I use to clear the queue looks as follows:
我用来清空队列的函数如下:
private static void clear(Thread supplier) {
supplier.interrupt();
queue.clear();
}
while my Supplier looks like this
而我的供应商看起来像这样
try {
while (true) {
Runnable r = () -> { /* Ignored for now */};
queue.put(r);
}
} catch (InterruptedException iex) {
}
When asked for the interaction between Thread.interrupt
and ArrayBlockingQueue::put
, the Javadoc says the following:
当询问Thread.interrupt
和ArrayBlockingQueue::put
之间的交互作用时,Javadoc如下所示:
public void put(E e)
throws InterruptedException
Inserts the specified element at the tail of this queue,
waiting for space to become available if the queue is full.
(...)
Throws:
InterruptedException - if interrupted while waiting
Well, that leaves a lot of questions. Sure, if the Supplier in is waiting because the queue is full, and gets interrupted while waiting, it will throw the Exception.
嗯,这还有很多问题。当然,如果供应商因队列已满而等待,并在等待时被中断,它将抛出异常。
But what if the queue is full and the Supplier is already carrying the 'interrupted' flag? Will it wait? Will it throw the Exception? What if the queue isn't full, and the Supplier gets interrupted while in this function? Will it throw the Exception? Will it modify the queue? What if the queue isn't full and the Supplier already carries the 'interrupted' flag?
但是如果队列已满,而供应商已经携带着'中断'标志呢?它会等待吗?它会抛出异常吗?如果队列不满,而供应商在此函数中被中断会怎么样?它会抛出异常吗?它会修改队列吗?如果队列不满,而供应商已经携带'中断'标志会怎么样?
Compare this with ReentrantLock::lockInterruptible
: There the documentation says
将其与ReentrantLock::lockInterruptible
进行比较:文档中说道
public void lockInterruptibly()
throws InterruptedException
Acquires the lock unless the current thread is interrupted.
(...)
If the current thread:
has its interrupted status set on entry to this method; or
is interrupted while acquiring the lock,
then InterruptedException is thrown and the current thread's interrupted status is cleared.
In this implementation, as this method is an explicit interruption point,
preference is given to responding to the interrupt over normal or reentrant acquisition of the lock.
Not wanting to give up here, I started to look into the implementation of ArrayBlockingQueue here. This is interesting to me because ArrayBlockingQueue
uses a ReentrantLock
internally - and we have already seen the documentation for ReentrantLock to be much more detailed. Even better ArrayBlockingQueue::put
uses the already cited ReentrantLock::lockInterruptibly
- which is perfect because I am going to interrupt the Thread that is calling put
. And ArrayBlockingQueue::clear
calles ReentrantLock::lock
on the same lock.
So if I am not mistaken, with the lock guarding the ArrayBlockingQueue
, there is only one way how there could still be an item in the queue after calling clear
: If somehow the Supplier would enter the queue.put
- and thereby ReentrantLock::lockInterruptable
after the Worker has finished with their queue.clear
. But the worker first calls supplier.interrupt()
, so the supplier should already have their interruptedFlag set, and according to the documentation in ReentrantLock
that should give me the InterruptedException instead of acquiring the lock, making it impossible to modify the queue and instead ending the supply function.
To test this, I wrote the following example class:
为了不放弃,我开始查看ArrayBlockingQueue的实现此处。这对我来说很有趣,因为ArrayBlockingQueue
在内部使用了ReentrantLock
- 而且我们已经看到ReentrantLock
的文档更详细。更好的是,ArrayBlockingQueue::put
使用了已经引用的ReentrantLock::lockInterruptibly
- 这很完美,因为我要中断调用put
的线程。而ArrayBlockingQueue::clear
在同一把锁上调用了ReentrantLock::lock
。
所以如果我没有弄错的话,使用锁来保护ArrayBlockingQueue
,在调用clear
后队列中仍然可能有一个项目的唯一方法是:如果某种方式供应商进入了queue.put
- 然后在工作线程完成了他们的queue.clear
之后,ReentrantLock::lockInterruptible
。但工作线程首先调用supplier.interrupt()
,所以供应商应该已经设置了他们的interruptedFlag,并且根据ReentrantLock
中的文档,这应该导致抛出InterruptedException而不是获取锁,从而不可能修改队列,而是结束供应函数。
为了测试这一点,我编写了以下示例类:
英文:
I am using a ArrayBlockingQueue
to transport work items from a Supplier to multiple workers.
I am also using Thread.interrupt
to signal the Supplier to stop supplying new work items.
However, my process of Interrupting the Supplier followed by clearing the queue doesn't reliably clear my work queue.
The function I use to clear the queue looks as follows:
private static void clear(Thread supplier) {
supplier.interrupt();
queue.clear();
}
while my Supplier looks like this
try {
while (true) {
Runnable r = () -> { /* Ignored for now */};
queue.put(r);
}
} catch (InterruptedException iex) {
}
When asked for the interaction between Thread.interrupt
and ArrayBlockingQueue::put
, the Javadoc says the following:
public void put(E e)
throws InterruptedException
Inserts the specified element at the tail of this queue,
waiting for space to become available if the queue is full.
(...)
Throws:
InterruptedException - if interrupted while waiting
Well, that leaves a lot of questions. Sure, if the Supplier in is waiting because the queue is full, and gets interrupted while waiting, it will throw the Exception.
But what if the queue is full and the Supplier is already carrying the 'interrupted' flag? Will it wait? Will it throw the Exception? What if the queue isn't full, and the Supplier gets interrupted while in this function? Will it throw the Exception? Will it modify the queue? What if the queue isn't full and the Supplier already carries the 'interrupted' flag?
Compare this with ReentrantLock::lockInterruptable
: There the documentation says
public void lockInterruptibly()
throws InterruptedException
Acquires the lock unless the current thread is interrupted.
(...)
If the current thread:
has its interrupted status set on entry to this method; or
is interrupted while acquiring the lock,
then InterruptedException is thrown and the current thread's interrupted status is cleared.
In this implementation, as this method is an explicit interruption point,
preference is given to responding to the interrupt over normal or reentrant acquisition of the lock.
Not wanting to give up here, I started to look into the implementation of ArrayBlockingQueue here. This is interesting to me because ArrayBlockingQueue
uses a ReentrantLock
internally - and we have already seen the documentation for ReentrantLock to be much more detailed. Even better ArrayBlockingQueue::put
uses the already cited ReentrantLock::lockInterruptibly
- which is perfect because I am going to interrupt the Thread that is calling put
. And ArrayBlockingQueue::clear
calles ReentrantLock::lock
on the same lock.
So if I am not mistaken, with the lock guarding the ArrayBlockingQueue
, there is only one way how there could still be an item in the queue after calling clear
: If somehow the Supplier would enter the queue.put
- and thereby ReentrantLock::lockInterruptable
after the Worker has finished with their queue.clear
. But the worker first calls supplier.interrupt()
, so the supplier should already have their interruptedFlag set, and according to the documentation in ReentrantLock
that should give me the InterruptedException instead of acquiring the lock, making it impossible to modify the queue and instead ending the supply function.
To test this, I wrote the following example class:
import java.util.*;
import java.util.stream.*;
import java.util.concurrent.*;
public class ABQ {
private static void work() {
while(true) {
try {
queue.take().run();
} catch (InterruptedException iex) {
// Top level interrupt -> Do nothing
}
}
}
private static void clear(Thread caller, int level) {
caller.interrupt();
queue.clear();
tripped.set(level);
}
private static ArrayBlockingQueue<Runnable> queue;
private static ThreadLocal<Integer> tripped = ThreadLocal.withInitial(() -> -1);
public static void main(String[] args) {
queue = new ArrayBlockingQueue<Runnable>(5);
Thread[] workers = new Thread[5];
for (int i=0; i < 5; ++i) {
workers[i] = new Thread(ABQ::work);
workers[i].start();
}
final Thread caller = Thread.currentThread();
final Random rdm = new Random();
for (int i=0; ;i++) {
final int finalI = i;
try {
Thread.sleep(2000);
System.err.println("2s sleep over");
} catch (InterruptedException iex) {
continue;
}
try {
while (true) {
Runnable r = () -> {
int waitTime;
boolean success;
if (tripped.get() >= finalI) {
throw new RuntimeException("Assertion failed, work for "+finalI+" but already tripped "+tripped.get());
}
synchronized (rdm) {
waitTime = rdm.nextInt(1000) + 200;
success = rdm.nextBoolean();
}
try {
Thread.sleep(waitTime);
if (success) {
clear(caller, finalI);
}
} catch (InterruptedException iex) {
// Inner interrupt -> also ignore
}
};
queue.put(r);
}
} catch (InterruptedException iex) {
System.err.println("EOL");
}
}
}
}
This repeatably fills the queue with work items until it gets interrupted, and the proceeds to the next batch.
Each worker is aware of the highest batch they ended and will complain if they find another item for this batch. And on my machine, all of the workers will fail on this assertion.
So my process of clearing the queue does not work guaranteed. Why? And how do I make it work?
答案1
得分: 2
当一个线程要等待一个 条件 时,它会释放锁,并在被唤醒时重新获取锁。这是另一个线程可以将条件改为“已满足”的唯一方式,因为那个线程必须能够获取锁以改变条件。
在等待条件后重新获取锁与通过 lockInterruptibly()
最初获取锁是不同的。
我们可以很容易地演示这两种情况:
-
在进入
put
之前被中断:ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(1); System.out.println("添加一个非满队列并处于中断状态"); Thread.currentThread().interrupt(); try { queue.put("first string"); } catch(InterruptedException ex) { System.out.println("捕获到 " + ex); } queue.put("second string"); System.out.println("添加一个满队列并处于中断状态"); Thread.currentThread().interrupt(); try { queue.put("third string"); } catch(InterruptedException ex) { System.out.println("捕获到 " + ex); } System.out.println("队列: " + queue);
-
在等待满队列变为“非满”时被中断
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(5); Thread main = Thread.currentThread(); new Thread(() -> { LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(3)); for(StackTraceElement ste: main.getStackTrace()) System.out.println("\t在 "+ste); main.interrupt(); queue.clear(); }).start(); for(int i = 0; i < 6; i++) { queue.put("item " + i); } System.out.println("队列: " + queue); System.out.println(Thread.currentThread().isInterrupted()? "中断了": "未中断");
在 sun.misc.Unsafe.park(Native Method) 在 java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) 在 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) 在 java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:353) 在 ABQ.main(ABQ.java:38) 队列: [item 5] 中断了
有时,我会得到:
在 sun.misc.Unsafe.park(Native Method)
在 java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
在 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
在 java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:353)
在 ABQ.main(ABQ.java:38)
异常: 在线程 "main" 中发生 java.lang.InterruptedException
在 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
在 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
在 java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:353)
在 ABQ.main(ABQ.java:38)
当你消除等待条件时,例如用 while(!queue.offer(r)) Thread.sleep(10);
替换 queue.put(r);
时,你会得到中断检测和队列处理的预期顺序。但我不推荐这样做。
你的整体设计是可疑的,因为描述的逻辑只适用于一个消费者线程。在你的示例代码中,处理较小 i
的工作线程可能会清除队列,而生产者已经处于较后的迭代中。也许 Phaser
或 CyclicBarrier
比手动处理中断更适合你的实际任务。
英文:
When a thread is going to wait for a condition it will release the lock and reacquire it when being signaled. This is the only way, another thread could change the condition to “fulfilled”, as that other thread must be able to acquire the lock to change the condition.
Reacquiring the lock after waiting for a condition is different to initially acquiring the lock via lockInterruptibly()
.
We can easily demonstrate both scenarios:
-
Interrupted before entering
put
:ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(1); System.out.println("adding a non-full queue with interrupted state"); Thread.currentThread().interrupt(); try { queue.put("first string"); } catch(InterruptedException ex) { System.out.println("got " + ex); } queue.put("second string"); System.out.println("adding a full queue with interrupted state"); Thread.currentThread().interrupt(); try { queue.put("third string"); } catch(InterruptedException ex) { System.out.println("got " + ex); } System.out.println("queue: " + queue);
adding a non-full queue with interrupted state got java.lang.InterruptedException adding a full queue with interrupted state got java.lang.InterruptedException queue: [second string]
-
Interrupted while waiting for a full queue to become “not full”
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(5); Thread main = Thread.currentThread(); new Thread(() -> { LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(3)); for(StackTraceElement ste: main.getStackTrace()) System.out.println("\tat "+ste); main.interrupt(); queue.clear(); }).start(); for(int i = 0; i < 6; i++) { queue.put("item " + i); } System.out.println("queue: " + queue); System.out.println(Thread.currentThread().isInterrupted()? "interrupted": "not interrupted");
at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:353) at ABQ.main(ABQ.java:38) queue: [item 5] interrupted
Though, sometimes I get
at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:353) at ABQ.main(ABQ.java:38) Exception in thread "main" java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048) at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:353) at ABQ.main(ABQ.java:38)
When you eliminate the wait for a condition, e.g. replace queue.put(r);
with while(!queue.offer(r)) Thread.sleep(10);
you get the intended ordering of interrupt detection and queue processing. But I don’t recommend it.
Your overall design is suspicious as the described logic only works for one consumer thread. In your example code, worker threads processing a smaller i
may clear the queue while the producer is already in a later iteration. Maybe a Phaser
or CyclicBarrier
might be a better fit to your actual task than dealing with interruption manually.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论