英文:
How to poll() and remove item from ArrayBlockingQueue without memory allocation
问题
以下是您要的翻译内容:
我正在尝试轮询 ArrayBlockingQueue 以获取项目:
public void run() {
while (true) {
if (Thread.currentThread().isInterrupted()){
closeAppender();
break;
}
if (!TASK_BUFFER.isEmpty() && !TASK_ADDRESS.isEmpty() && !TASK_MARKER.isEmpty()){
buffer = TASK_BUFFER.poll();
remoteAdd = TASK_ADDRESS.poll();
marker = TASK_MARKER.poll();
// 如果缓冲区有数组,则获取数组
// 否则,获取 direct buffer 到 direct arr
// buffer.get(byteArr, 0, buffer.remaining());
// reset every value to "null" value
// remaining = buffer.remaining();
// runTask();
BUFFER_POOL.returnBuffer(buffer);
}
}
LOGGER.info(mainMarker,"任务调度线程已终止...");
}
然而,当我运行 jvisualvm 时,我可以看到每当我“调度”一个任务并轮询队列时,它会分配字节。我尝试注释和取消注释代码的其他部分,但只有轮询和调度会影响分配的字节。
我已经检查了这个链接,但我不是要遍历整个数组。我要一次获取一个项目进行处理,然后将其删除。应该如何正确处理这个问题?
编辑:我有一个单独的线程,用于“生成” ByteBuffer、InetSocketAddress 和 Marker 的实例,并将它们放入 ArrayBlockingQueue 中。我还检查了 poll() 和 dequeue() 的源代码,似乎没有任何会导致字节分配的操作。如果我在主线程中创建对象并将其添加到队列,那么如果此线程仅用于处理队列值,则不应分配字节。
构造函数如下:
private final BlockingQueue<ByteBuffer> TASK_BUFFER;
private final BlockingQueue<InetSocketAddress> TASK_ADDRESS;
private final BlockingQueue<Marker> TASK_MARKER;
private final BufferPool BUFFER_POOL;
private final ExcerptAppender APPENDER;
public TaskScheduler(BufferPool bufferPool, ChronicleQueue chronicleQueue){
TASK_BUFFER = new ArrayBlockingQueue<>(bufferPool.getCapacity());
TASK_ADDRESS = new ArrayBlockingQueue<>(bufferPool.getCapacity());
TASK_MARKER = new ArrayBlockingQueue<>(bufferPool.getCapacity());
BUFFER_POOL = bufferPool;
APPENDER = chronicleQueue.acquireAppender();
}
并且这将生成的值放入队列:
public void scheduleTask(final ByteBuffer buffer, final InetSocketAddress remoteAdd, final Marker marker) throws InterruptedException {
TASK_BUFFER.put(buffer);
TASK_ADDRESS.put(remoteAdd);
TASK_MARKER.put(marker);
}
英文:
I am trying to poll an ArrayBlockingQueue to get the item:
public void run() {
while (true) {
if (Thread.currentThread().isInterrupted()){
closeAppender();
break;
}
if (!TASK_BUFFER.isEmpty() && !TASK_ADDRESS.isEmpty() && !TASK_MARKER.isEmpty()){
buffer = TASK_BUFFER.poll();
remoteAdd = TASK_ADDRESS.poll();
marker = TASK_MARKER.poll();
// if (buffer.hasArray()){ //if byte buffer is used just get array
// byteArr = buffer.array();
// }else{
// byteArr = directArr; //if direct buffer is used, we get into direct arr
//// Arrays.fill(byteArr, (byte)0); //reset every value to "null" value
// buffer.get(byteArr, 0, buffer.remaining());
// }
// remaining = buffer.remaining();
// runTask();
BUFFER_POOL.returnBuffer(buffer);
}
}
LOGGER.info(mainMarker,"Task Scheduler Thread has terminated...");
}
However, when I run jvisualvm, I can see that whenever I "schedule" a task and poll the queue, it is allocating bytes. I have tried commenting and uncommenting the other parts of the code but only the polling and scheduling will affect the allocated bytes.
I have checked this link but I am not looking to iterate the whole array. I am looking to get one item at a time to process then remove it. What is the proper way to approach this?
edit: I have a separate thread that "produces" instances of ByteBuffer, InetSocketAddress and Marker and puts it inside the arrayblockingqueues. I also just checked the source code for poll() and dequeue() it doesn't seem that it does anything that should warrant the allocation of bytes. If I am creating the objects in the main thread, and adding it to queue, it shouldnt allocate bytes if this thread is only used for processing the queue values
The constructor looks like this:
private final BlockingQueue<ByteBuffer> TASK_BUFFER;
private final BlockingQueue<InetSocketAddress> TASK_ADDRESS;
private final BlockingQueue<Marker> TASK_MARKER;
private final BufferPool BUFFER_POOL;
private final ExcerptAppender APPENDER;
public TaskScheduler(BufferPool bufferPool, ChronicleQueue chronicleQueue){
TASK_BUFFER = new ArrayBlockingQueue<>(bufferPool.getCapacity());
TASK_ADDRESS = new ArrayBlockingQueue<>(bufferPool.getCapacity());
TASK_MARKER = new ArrayBlockingQueue<>(bufferPool.getCapacity());
BUFFER_POOL = bufferPool;
APPENDER = chronicleQueue.acquireAppender();
}
and this puts the produced values inside the queue:
public void scheduleTask(final ByteBuffer buffer, final InetSocketAddress remoteAdd, final Marker marker) throws InterruptedException {
TASK_BUFFER.put(buffer);
TASK_ADDRESS.put(remoteAdd);
TASK_MARKER.put(marker);
}
答案1
得分: 2
每当尝试获取ReentrantLock
时,都可能会发生内存分配。当没有争用或在自旋阶段释放锁时,lock()
方法将在无需分配时返回。但当线程添加到等待线程队列时,将为其分配一个节点对象。
这适用于所有在底层使用ReentrantLock
的类,如ArrayBlockingQueue
,但也适用于一般情况下在幕后使用AbstractQueued[Long]Synchronizer
的所有并发类。
所以你的代码看起来像是尝试的过早优化,但反而适得其反。为了避免创建一个简单的对象来保存属于任务的ByteBuffer
、InetSocketAddress
和Marker
,你需要使用三个阻塞队列,不仅需要三次poll
调用,还需要保护它们,所以在最坏情况下,你最多需要六次分配,而不是当你使用一个包含所有三个值的单一队列时只需要一次poll()
和null
检查。
但整个循环实际上是一个轮询循环,反复调用poll()
而不放弃CPU。它创建了它(显然)本应该避免的额外开销。只要有新元素时才调用take()
,可以避免这种情况。如果没有新元素,它会将线程排队,但正如前面所说,当锁存在争用时,线程在队列中排队,而在紧密循环中反复调用poll()
最终会导致争用。另一方面,当有新元素时,take()
会完全像poll()
一样工作。
英文:
Whenever an attempt to acquire a ReentrantLock
is made, there’s potentiall a memory allocation. When there’s no contention or the lock is released during the spinning phase, the lock()
method will return without allocations. But when the thread is added to the queue of waiting threads, a node object is allocated for that.
This applies to all classes using a ReentrantLock
under the hood, like ArrayBlockingQueue
, but also to all concurrency classes using AbstractQueued[Long]Synchronizer
behind the scenes in general.
So your code looks like an attempted premature optimization that backfired. To avoid creating a simple object holding the ByteBuffer
, InetSocketAddress
, and Marker
belonging to a task, you have three blocking queues requiring not only three poll
calls but also to protect them with three isEmpty()
checks, so you end up with up to six allocations in the worst case, instead of the one you had with a single poll()
and null
check when you used a single queue of objects holding all three values.
But the entire loop is literally a polling loop, repeatedly calling poll()
without ever giving up the CPU. It’s creating the very overhead it (apparently) was supposed to avoid. Just calling take()
, to only return when there is a new element would avoid this. This would queue the thread if there is no new element, but as said, a thread gets queued anyway when there’s contention at the lock and repeatedly calling poll()
in a tight loop is a recipe to get contention sooner or later. On the other hand, take()
will work exactly the same as poll()
when there is a new element.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论