从多个线程中的阻塞队列中读取数据

huangapple go评论75阅读模式
英文:

Reading from blocking queue with multiple threads

问题

我有一个使用阻塞队列的生产者-消费者模型,其中4个线程从目录中读取文件并将其放入阻塞队列,另外4个线程(消费者)从阻塞队列中读取。

我的问题是每次只有一个消费者从Blockingqueue中读取,而其他3个消费者线程没有在读取:

final BlockingQueue<byte[]> queue = new LinkedBlockingQueue<>(QUEUE_SIZE);

CompletableFuture<Void> completableFutureProducer = produceUrls(files, queue, checker);

private CompletableFuture<Validator> consumeData(
        final Response checker,
        final CompletableFuture<Void> urls
) {
    return CompletableFuture.supplyAsync(checker, 4)
            .whenComplete((result, err) -> {
                if (err != null) {
                    LOG.error("consuming url worker failed!", err);
                    urls.cancel(true);
                }
    });
}

completableFutureProducer.join();
completableFutureConsumer.join();

这是我的代码。有人能告诉我我做错了什么吗?或者提供正确的代码。为什么只有一个消费者从阻塞队列中读取。

添加Response类从阻塞队列中读取的代码:

@Slf4j
public final class Response implements Supplier<Check> {
    private final BlockingQueue<byte[]> data;
    private final AtomicBoolean producersComplete;
    private final Calendar calendar = Calendar.getInstance();

    public Response(
            final BlockingQueue<byte[]> data
    ) {
        this.data = data;
        producersComplete = new AtomicBoolean();
    }

    public void notifyProducersDone() {
        producersComplete.set(true);
    }

    @Override
    public Check get() {
        try {
            Check check = null;
            try {
                while (!data.isEmpty() || !producersComplete.get()) {
                    final byte[] item = data.poll(1, TimeUnit.SECONDS);
                    if (item != null) {
                        LOG.info("{}", new String(item));
                        // 这里只有一个线程打印结果。
                        validator = validateData(item);
                    }
                }

            } catch (InterruptedException | IOException e) {
                Thread.currentThread().interrupt();
                throw new WriteException("数据验证时发生异常", e);

            }
            return check;
        } finally {
            LOG.info("从BlockingQueue中读取数据完成");
        }
    }
}

希望这些代码可以帮助您找到问题所在。请检查是否正确地调用notifyProducersDone()以通知消费者完成。如果问题仍然存在,请提供更多关于produceUrlsvalidateData的代码以获得更详细的帮助。

英文:

I have a producer-consumer model using a blocking queue where 4 threads read files from a directory puts it to the blocking queue and 4 threads(consumer) reads from blocking queue.

My problem is every time only one consumer reads from the Blockingqueue and the other 3 consumer threads are not reading:

        final BlockingQueue&lt;byte[]&gt; queue = new LinkedBlockingQueue&lt;&gt;(QUEUE_SIZE);
CompletableFuture&lt;Void&gt; completableFutureProducer = produceUrls(files, queue, checker);
//not providing code for produceData , it is working file with all 4 //threads writing to Blocking queue. Here is the consumer code.
private CompletableFuture&lt;Validator&gt; consumeData(
final Response checker,
final CompletableFuture&lt;Void&gt; urls
) {
return CompletableFuture.supplyAsync(checker, 4)
.whenComplete((result, err) -&gt; {
if (err != null) {
LOG.error(&quot;consuming url worker failed!&quot;, err);
urls.cancel(true);
}
});
}
completableFutureProducer.join();
completableFutureConsumer.join();

This is my code. Can someone tell me what I am doing wrong? Or help with correct code.
Why is one consumer reading from the Blocking queue.

Adding code for Response class reading from Blocking queue :

    @Slf4j
public final class Response implements Supplier&lt;Check&gt; {
private final BlockingQueue&lt;byte[]&gt; data;
private final AtomicBoolean producersComplete;
private final Calendar calendar = Calendar.getInstance();
public ResponseCode(
final BlockingQueue&lt;byte[]&gt; data
) {
this.data = data;
producersDone = new AtomicBoolean();
}
public void notifyProducersDone() {
producersComplete.set(true);
}
@Override
public Check get() {
try {
Check check = null;
try {
while (!data.isEmpty() || !producersDone.get()) {
final byte[] item = data.poll(1, TimeUnit.SECONDS);
if (item != null) {
LOG.info(&quot;{}&quot;,new String(item));
// I see only one thread printing result here .
validator = validateData(item);
}
}
} catch (InterruptedException | IOException e) {
Thread.currentThread().interrupt();
throw new WriteException(&quot;Exception occurred while data validation&quot;, e);
} 
return check;
} finally {
LOG.info(&quot;Done reading data from BlockingQueue&quot;);
}
}
}

答案1

得分: 2

这很难仅从这个信息来诊断,但检查data.isEmpty()可能不正确,因为队列可能会暂时为空(但稍后可能会有项目)。所以你的线程可能会在遇到暂时空的队列时立即退出。

相反,你可以在生产者完成并且从poll得到空结果时退出。这样,线程只有在真正没有更多项目可处理时才退出。

不过有点奇怪的是你只返回了最后一个项目的结果。你确定这是你想要的吗?

编辑: 我最近做了类似的事情。这里是一个从文件中读取,以多线程方式转换行,然后写入不同文件的类(行的顺序保持不变)。
它也使用了BlockingQueue。它与你的代码非常相似,但出于上述原因不检查queue.isEmpty()。对我来说,它运行得很好。

英文:

It's hard to diagnose from this alone, but it's probably not correct to check for data.isEmpty() because the queue may happen to be temporarily empty (but later get items). So your threads might exit as soon as they encounter a temporarily empty queue.

Instead, you can exit if producers were done AND you got an empty result from the poll. That way the threads only exit when there are truly no more items to process.

It's a bit odd though that you are returning the result of the last item (alone). Are you sure this is what you want?

EDIT: I've done something very similar recently. Here is a class that reads from a file, transforms the lines in a multi-threaded way, then writes to a different file (the order of lines are preserved).
It also uses a BlockingQueue. It's very similar to your code, but it doesn't check for quue.isEmpty() for the aforementioned reason. It works fine for me.

答案2

得分: 2

4+4个线程并不多,所以最好不要使用像CompletableFuture这样的异步工具。一个简单的多线程程序会更简单并且运行更快。

如果有:

BlockingQueue<byte[]> data;

不要使用 data.poll()

使用 data.take();

英文:

4+4 threads is not that many, so you better do not use asynchronous tools like CompletableFuture. Simple multithreaded program would be simpler and work faster.

Having

 BlockingQueue&lt;byte[]&gt; data;

don't use data.poll();

use data.take();

答案3

得分: 0

当您的队列中有1个项目,而有4个消费者时,其中一个将轮询项目以使队列为空。然后,其余3个消费者中的每一个都会检查queue.isEmpty(),因为队列为空,所以退出循环。

英文:

When you have lets say 1 item in the queue, and 4 consumers, one of them will poll the item rendering queue to be empty. Then 3 of the rest of the consumers checks if queue.isEmpty(), and since it is - quits the loop.

huangapple
  • 本文由 发表于 2020年8月10日 00:41:56
  • 转载请务必保留本文链接:https://go.coder-hub.com/63328901.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定