英文:
Single producer multiple consumers C++
问题
我正在尝试实现一个程序,其中包括一个生产者线程向std::vector
中添加对象,以及多个消费者线程从相同的向量中移除对象,直到它为空为止。我正在使用condition_variable
来让消费者知道已经产生了新对象。问题是,在最后一次迭代中(存储中剩余n个项目,其中n是消费者线程的数量),消费者线程被卡在等待条件变量上,尽管不应该满足该条件(storage
不为空,至少根据一些调试日志是这样的)。
#include <chrono>
#include <condition_variable>
#include <functional>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
#define CONSUMER_COUNT 4
#define STORAGE_SIZE CONSUMER_COUNT * 10000
class Foo {
private:
int _id;
public:
Foo(int id) : _id(id) {}
int getId() const { return _id; }
};
std::vector<Foo> storage;
std::mutex storageMutex;
std::condition_variable storageCV;
void Producer(int limit) {
for (int i = 0; i < limit; ++i) {
std::lock_guard<std::mutex> lg{storageMutex};
storage.emplace_back(Foo(i));
storageCV.notify_one();
}
storageCV.notify_all();
}
void Consumer(int id) {
while (true) {
std::unique_lock<std::mutex> ul{storageMutex};
storageCV.wait(ul, []() { return !storage.empty(); });
if (storage.empty())
return;
storage.pop_back();
}
}
int main(int argc, char *argv[]) {
std::vector<std::thread> consumers;
consumers.reserve(CONSUMER_COUNT);
auto producer = std::thread(Producer, STORAGE_SIZE);
for (int i = 0; i < CONSUMER_COUNT; ++i) {
consumers.emplace_back(std::thread(Consumer, i));
}
producer.join();
for (auto &consumer : consumers)
consumer.join();
storageCV.notify_all();
std::cout << "[MAIN] 完成!" << std::endl;
std::cout << "存储中剩余 " << storage.size() << " 个项目!"
<< std::endl;
return 0;
}
我尝试添加一个简单的布尔标志,生产者完成添加所有项目后会切换它,但然后我不确定(从逻辑上讲)如何设置消费者线程中的条件。仅仅在当前条件的顶部添加该检查是不够的,因为然后一个线程可能会停止运行,即使存储中仍然有一些项目。
英文:
I am trying to implement a program that consists of a producer thread adding objects to a std::vector
and multiple consumer threads removing objects from the same vector until it's empty. I am using a condition_variable
to let the consumers know that new objects have been produced. Problem is that in last iteration (n items left in storage where n is number of consumer threads), consumer threads get stuck waiting on a conditional variable, even though that condition should not be met (storage
is not empty -> at least that's what I figured with some debug logs).
#include <chrono>
#include <condition_variable>
#include <functional>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
#define CONSUMER_COUNT 4
#define STORAGE_SIZE CONSUMER_COUNT * 10000
class Foo {
private:
int _id;
public:
Foo(int id) : _id(id) {}
int getId() const { return _id; }
};
std::vector<Foo> storage;
std::mutex storageMutex;
std::condition_variable storageCV;
void Producer(int limit) {
for (int i = 0; i < limit; ++i) {
std::lock_guard<std::mutex> lg{storageMutex};
storage.emplace_back(Foo(i));
storageCV.notify_one();
}
storageCV.notify_all();
}
void Consumer(int id) {
while (true) {
std::unique_lock<std::mutex> ul{storageMutex};
storageCV.wait(ul, []() { return !storage.empty(); });
if (storage.empty())
return;
storage.pop_back();
}
}
int main(int argc, char *argv[]) {
std::vector<std::thread> consumers;
consumers.reserve(CONSUMER_COUNT);
auto producer = std::thread(Producer, STORAGE_SIZE);
for (int i = 0; i < CONSUMER_COUNT; ++i) {
consumers.emplace_back(std::thread(Consumer, i));
}
producer.join();
for (auto &consumer : consumers)
consumer.join();
storageCV.notify_all();
std::cout << "[MAIN] Done!" << std::endl;
std::cout << "Storage is left with " << storage.size() << " items!"
<< std::endl;
return 0;
}
I have tried adding a simple boolean flag that producer will toggle once it's done with adding all of the items, but then I am not sure (logically) how should I set the condition in consumer threads. Simply adding that check on top of current one is not enough because then a thread might stop running even though there are still some items in the storage.
答案1
得分: 1
根据您的要求,以下是代码部分的翻译:
正如您所发现的,问题在于消费者陷入了困境。问题在这里:
```cpp
storageCV.wait(ul, []() { return !storage.empty(); });
if (storage.empty())
return;
请注意,std::condition::variable::wait(lock, condition)
只是一个便利函数,这段代码等效于:
while (storage.empty())
storageCV.wait();
if (storage.empty())
return;
很容易看出,条件性的 return
是毫无意义的,因为当我们达到它时,条件始终为假。
要使条件生效,条件必须位于循环内部。
需要进行以下调整:
-
我们必须修改此循环,以便消费者不会无限等待。
-
生产者线程在所有元素被处理后必须唤醒任何当前正在等待的消费者。
-
可能有一个消费者线程当前正在:
- 就在
while (storage.empty())
之前,或者 - 就在
storageCV.wait()
之前
...所以它不会被
notify_all()
唤醒,因为它还没有开始等待,但即将开始。我们必须重复使用.notify_all()
来唤醒每个线程,甚至是那些还没有开始等待的线程。 - 就在
生产者的更改
// 我们创建一个原子计数器来跟踪剩余多少个消费者。
// 当这个计数器达到零时,生产者可以停止通知剩余的消费者。
std::atomic_int consumer_stop_counter;
// 生产者需要从主线程接收消费者的数量。
// 通过原子计数器来传递这个信息是不安全的,因为消费者可能修改它,
// 因为生产者在将所有项目推送到存储之前可能已经完成了,
// 在这种情况下,生产者会认为不再需要通知任何消费者,所有工作都已完成。
void Producer(int limit, int consumers) {
for (int i = 0; i < limit; ++i) {
std::lock_guard<std::mutex> lg{storageMutex};
storage.emplace_back(Foo(i));
storageCV.notify_one();
}
consumer_stop_counter = 0;
while (consumer_stop_counter < consumers) {
storageCV.notify_all();
// 可选:减轻繁忙等待的影响
std::this_thread::yield();
}
}
消费者的更改
void Consumer(int id) {
while (true) {
std::unique_lock<std::mutex> ul{storageMutex};
// 将 cv 的等待转换为常规的 while 循环。
while (storage.empty()) {
storageCV.wait(ul);
// 如果我们被唤醒并且存储为空,那意味着我们退出。
// 在调用 .wait() 后,我们保持锁定,因此可以安全地访问存储。
if (storage.empty()) {
consumer_stop_counter++;
return;
}
}
storage.pop_back();
}
}
请注意,这些代码中使用了 HTML 编码来表示小于号 <
和大于号 >
,以避免与代码中的比较运算符混淆。如果在实际代码中使用这些翻译,请将其替换为正常的小于号和大于号。
英文:
As you've discovered, the problem is that consumers get stuck. The problem is here:
storageCV.wait(ul, []() { return !storage.empty(); });
if (storage.empty())
return;
Note that std::condition::variable::wait(lock, condition)
is just a convenience function, and this code is equivalent to:
while (storage.empty())
storageCV.wait();
if (storage.empty())
return;
It's easy to see that the conditional return
is pointless, because when we reach it, the condition is always false.
The condition would need to be inside the loop to have any effect.
These adjustments are necessary:
-
We have to modify this loop so that it's possible for a consumer to not wait infinitely.
-
The producer thread has to wake up any currently waiting consumers once all the elements have been processed.
-
It's possible that a consumer thread is currently:
- right before
while (storage.empty())
, or - right before
storageCV.wait()
... so it is not going to be woken up by
notify_all()
, because it's not waiting yet, but is about to. We have to repeatedly.notify_all()
to wake up every thread, even those that are not waiting yet. - right before
Changes to Producer
// We create an atomic counter to keep track of how many
// consumers remain.
// When this hits zero, the producer can stop notifying the
// remaining consumers.
std::atomic_int consumer_stop_counter;
// The producer needs to receive the number of consumers from the main thread.
// It wouldn't be safe to communicate this via an atomic counter which
// is modified by the consumers, because it's possible that the producer
// is done pushing all of its items to storage before any consumers
// "register themselves".
// In such a scenario, the producer would think that there are no
// consumers that need to be notified anymore and all works has been completed.
void Producer(int limit, int consumers) {
for (int i = 0; i < limit; ++i) {
std::lock_guard<std::mutex> lg{storageMutex};
storage.emplace_back(Foo(i));
storageCV.notify_one();
}
consumer_stop_counter = 0;
while (consumer_stop_counter < consumers) {
storageCV.notify_all();
// optional: mitigate effects of busy wait
std::this_thread::yield();
}
}
Changes to Consumer
void Consumer(int id) {
while (true) {
std::unique_lock<std::mutex> ul{storageMutex};
// Transform cv waiting into a regular while-loop.
while (storage.empty()) {
storageCV.wait(ul);
// If we got woken up and the storage is empty, that means that we exit.
// We hold the lock after calling .wait(), so it is safe to access the storage.
if (storage.empty()) {
consumer_stop_counter++;
return;
}
}
storage.pop_back();
}
}
</details>
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论