阻塞队列的`poll`方法在多线程情况下返回`null`。

huangapple go评论77阅读模式
英文:

Blocking Queue poll method retuns null during multiple threads

问题

我们已经在我们的应用程序中使用阻塞队列、执行器框架和Future实现了多线程。

每当用户请求某些数据时,我们将一个任务提交到执行器框架,该任务连接到数据库,查询数据并将数据流返回。

我们有一个方法,它读取这些数据并写入到ServletOutputStream中。

public long writeData(ServletOutputStream sos, BlockingQueue<T> blockingQueue, Future<Boolean> future) {

    try (final JsonWriter writer = new JsonWriter(new OutputStreamWriter(sos, UTF_8))) {

        int counter = 0;
        while (true) {
            Optional.ofNullable(blockingQueue.poll()).ifPresent(entityObj -> {
                gson.toJson(entityObj, entityObj.getClass(), writer);
                counter++;
            });

            if (blockingQueue.isEmpty() && future.isDone() && future.get()) {
                if (counter == 0) {
                    Log.error("数据未正确读取");
                }
                break;
            }
        }
    } catch (Exception e) {
        log.error(e);
    }
    return counter;
}

当执行blockingQueue.poll()时,有时数据仍未被repository线程加载。在下一个if块到达时,阻塞队列为空并且future已经完成,因此控制流程会退出while循环。

没有响应被写入流中。有没有办法处理这种奇怪的行为。当有很多记录时,这种情况就不会发生。

英文:

We have implemented multithreading in our application using blockingqueue, executor framework and future.

When ever user asks for some data, we submit a task to the executor framework which connects to database, queries data and streams the data back.

We have a method, Which reads this data and writes to servletoutputstrean.


public long writeData(ServletOutputStream sos, BlockingQueue &lt; T &gt; blockingQueue, Future &lt; Boolean &gt; future) {

    try (final JsonWriter writer = new JsonWriter(new OutputStreamWriter(sos, UTF_8))) {

        int counter = 0;
        while (true) {
            Optional.ofNullable(blockingqueue.poll()).ifPresent(entityobj - &gt; {
                gson.toJson(entityobj, entityobj.getclass(), writer);
                Counter++;
            });



            if (blockingqueue.isEmpty() &amp;&amp; future.isDone() &amp;&amp; future.get()) {
                if (count == 0) {
                    Log.error(&quot;data not read properly&quot;);
                }
                break;
            }
        }
    } catch (Exception e) {
        log.error(e);
    }
    return counter;
}

When blockingqueue.poll() is being executed, there are times data is still not loaded by the repositorythread. By the time the next if block comes, blocking queue is empty and future is completed so control gets out of while loop.

No response is written to stream. Anyway to handle this weird behavior
This doesn't happen when there are lot of records.

答案1

得分: 3

我认为 writeData 充当了一个等待数据出现在 BlockingQueue 上然后 "写入" 的工作线程。你的问题在于,有时候将数据插入到 BlockingQueue 的线程(此处未显示代码)需要一段时间从任何地方加载数据,导致 writeData 工作线程认为它已经完成并退出。

最佳解决方案是向 BlockingQueue 写入一些 "流结束" 的对象,以信号表示没有更多的数据,而不是依赖队列中没有数据的情况。

你不知道 BlockingQueue&lt;T&gt; 中的对象类型是什么(只是 T),所以有几种方法可以做到这一点。

第一种方法是让提供 T 的任何代码也提供一个代表流结束的 T 实例,你可以使用 equals 或者等式运算符 == 来检查。是否实际可行将取决于你拥有什么样的 T

另一个选择是将其作为 BlockingQueue&lt;Optional&lt;T&gt;&gt;,当没有更多数据时,让其他线程向队列中写入 Optional.empty()

在任何情况下,然后你可以使用带有超时值的 poll() 调用,并在超时时采取适当的行动(失败、检查插入队列的线程是否仍然存活等等)。

英文:

I think that writeData is operating as a worker that waits for data to show up on the BlockingQueue and then "writes" it. Your problem is that sometimes it takes the thread that is inserting data into the BlockingQueue (code not shown here) a while to load the data from wherever it comes from, causing your writeData worker to think that it's done and exit.

The best solution is to write some "end of stream" object to the BlockingQueue to signal when there is no more data, rather than relying on the absence of data in the queue.

You don't know exactly what type of object is in a BlockingQueue&lt;T&gt; (it's just T), so there are a couple of ways to do this.

The first way is to have whatever code provides T also provide an instance of T that will represent the end of the stream that you can check with either equals or even the equality operator ==. Whether or not this is practical will depend on what kinds of T you have.

The other option is to make it a BlockingQueue&lt;Optional&lt;T&gt;&gt; and have the other thread write Optional.empty() to the queue when there is no more data.

In either case you can then call poll() with a timeout value and take appropriate action if it times out (failing, checking if the thread that is inserting into the queue is still alive, etc.).

答案2

得分: 1

当执行blockingqueue.poll()时,有时数据仍未被存储库线程加载。在下一个if块到来时,阻塞队列为空,未来任务已完成,因此控制流程退出while循环。

您的代码中存在竞争条件,使情况变得复杂:

if (blockingqueue.isEmpty() && /* 竞争点 */ future.isDone() && future.get()) {

有可能在blockingqueue.isEmpty()返回true之后,作业在future.isDone()调用之前就已经完成,导致代码过早退出,队列中仍然有一个元素。

@Willis提到的“流结束”对象是一个不错的选择,但一个简单的解决方案是对代码进行重构:

boolean futureDone = future.isDone();
entity = blockingqueue.poll();
if (entity == null) {
   if (futureDone) {
      break;
   }
} else {
   // 处理实体
}

这确保您在从阻塞队列获取最后一个项之前始终检查未来任务是否已完成,从而消除了竞争条件。您可能需要多进行一次轮询,但这没问题。

另外,如果这段代码实际上一直在忙等待,您应该在轮询中设置一些超时,以降低其速度,以免占用CPU:

entity = blockingqueue.poll(50, TimeUnit.MILLISECONDS);
英文:

> When blockingqueue.poll() is being executed, there are times data is still not loaded by the repositorythread. By the time the next if block comes, blocking queue is empty and future is completed so control gets out of while loop.

You have a race condition in your code which is complicating matters:

if (blockingqueue.isEmpty() &amp;&amp; /* race here */ future.isDone() &amp;&amp; future.get()) {

There is a possibility that the blockingqueue.isEmpty() returns true and then the job finishes before the future.isDone() call happens causing the code to quit prematurely with an element left in the queue.

An "end of stream" object that @Willis mentioned is a good option but a simple solution would be to refactor your code like:

boolean futureDone = future.isDone();
entity = blockingqueue.poll();
if (entity == null) {
   if (futureDone) {
      break;
   }
} else {
   // process entity
}

This ensures that you always check to see if the future is done before getting the last item from the blocking-queue which removes the race. You might have to do a poll one more time but that fine.

Btw, if this code is really spinning, you should put some sort of timeout in poll to slow it down and not eat a CPU:

entity = blockingqueue.poll(50, TimeUnit.MILLISECONDS);

huangapple
  • 本文由 发表于 2020年7月24日 03:13:41
  • 转载请务必保留本文链接:https://go.coder-hub.com/63061559.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定