c++: 条件变量所有权

huangapple go评论64阅读模式
英文:

c++: condition variable ownership

问题

我在执行线程同步时遇到了问题。

我有一个类,与这个答案中提出的ThreadQueue实现非常相似,为了完整起见,我将在这里简要介绍一下:

#include <mutex>
#include <queue>
#include <condition_variable>

template <typename T>
class ThreadQueue {
  std::queue<T> q_;
  std::mutex mtx;
  std::condition_variable cv;

public:
  void enqueue (const T& t) {
    {
      std::lock_guard<std::mutex> lck(mtx);
      q_.push(t);
    }
    cv.notify_one();
  }

  T dequeue () {
    std::unique_lock<std::mutex> lck(mtx);
    cv.wait(lck, [this] { return !q_.empty(); });
    T t = q_.front();
    q_.pop();
    return t;
  }
};

我有一个消费者,不断地从该类的共享实例中提取第一个可用项,比如ThreadQueue<int> my_queue;,直到接收到退出信号,例如:

std::atomic_bool quit(false);

void worker(){
  std::cout << "[worker] starting..." << std::endl;
  while(!quit.load()) {
      std::cout << "[worker] extract element from the queue" << std::endl;
      auto el = my_queue.dequeue();
       
      std::cout << "[worker] consume extracted element" << std::endl;
      std::cout << el << std::endl;
  }
    
  std::cout << "[worker] exiting" << std::endl;
}

假设程序在任何原因之前必须终止(例如,程序在生产者插入元素之前终止),在这种情况下,worker将卡在auto el = my_queue.dequeue();这一行上,并且无法终止。
这种情况的示例如下:

int main() {
  std::thread t(worker);
  std::this_thread::sleep_for(std::chrono::seconds(1));
  
  std::cout << "[main] terminating..." << std::endl;
  quit.store(true);
  t.join();
  std::cout << "[main] terminated!" << std::endl;
  return 0;
}

显然,可以通过在队列中推送一个虚拟元素来“解锁”worker,但这似乎不是一个优雅的解决方案。

因此,我在思考是否应该将空队列上的线程同步从ThreadQueue类中移出,并在worker内部完成,即将条件变量的“所有权”从ThreadQueue容器中移出。

一般来说,像ThreadQueue这样的类是不是一种不好的设计?

如果不是,是否有任何解决方案可以让条件变量封装在ThreadQueue中,从而将线程同步的责任从该类的用户身上移除(请记住,我受限于使用C++11)?

完整的最小工作示例在这里

英文:

I am facing an issue while performing thread synchronisation.

I have a class very similar to the ThreadQueue implementation proposed in this answer, which I'll briefly report here for completeness:

#include &lt;mutex&gt;
#include &lt;queue&gt;
#include &lt;condition_variable&gt;

template &lt;typename T&gt;
class ThreadQueue {
  std::queue&lt;T&gt; q_;
  std::mutex mtx;
  std::condition_variable cv;

public:
  void enqueue (const T&amp; t) {
    {
      std::lock_guard&lt;std::mutex&gt; lck(mtx);
      q_.push(t);
    }
    cv.notify_one();
  }

  T dequeue () {
    std::unique_lock&lt;std::mutex&gt; lck(mtx);
    cv.wait(lck, [this] { return !q_.empty(); });
    T t = q_.front();
    q_.pop();
    return t;
  }
};

I have a consumer that continuously extracts the first available item of a shared instance of that class, say ThreadQueue&lt;int&gt; my_queue;, until it receives a signal to quit, for instance:

std::atomic_bool quit(false);

void worker(){
  std::cout &lt;&lt; &quot;[worker] starting...&quot; &lt;&lt; std::endl;
  while(!quit.load()) {
      std::cout &lt;&lt; &quot;[worker] extract element from the queue&quot; &lt;&lt; std::endl;
      auto el = my_queue.dequeue();
       
      std::cout &lt;&lt; &quot;[worker] consume extracted element&quot; &lt;&lt; std::endl;
      std::cout &lt;&lt; el &lt;&lt; std::endl;
  }
    
  std::cout &lt;&lt; &quot;[worker] exiting&quot; &lt;&lt; std::endl;
}

Suppose the program has to terminate (for any reason) before any producer can insert elements in the queue; in this case the worker would be stuck on the line auto el = my_queue.dequeue(); and cannot terminate.
An exemple of this case is the following:

int main() {
  std::thread t(worker);
  std::this_thread::sleep_for(std::chrono::seconds(1));
  
  std::cout &lt;&lt; &quot;[main] terminating...&quot; &lt;&lt; std::endl;
  quit.store(true);
  t.join();
  std::cout &lt;&lt; &quot;[main] terminated!&quot; &lt;&lt; std::endl;
  return 0;
}

Clearly, the worker can be "unlocked" by pushing a dummy element in the queue, but it does not seem an elegant solution.

I am thus wondering whether the thread syncronisation on the empty queue should be taken out of the ThreadQueue class and done inside the worker instead, i.e. moving the "ownership" of the condition variable outside the ThreadQueue container.

In general, is a class such as ThreadQueue always a bad design?

In case it's not, is there any solution that allows to keep the condition variable encapsulated in ThreadQueue, hence removing the responsibility of thread syncronisation from the users of that class (bearing in mind I am limited to usage of C++11)?

Full MWE here

答案1

得分: 1

包含互斥锁的对象也应该拥有条件变量。因此,ThreadQueue 代码看起来很好。但是不清楚在请求异步停止时 dequeue() 应该返回什么。

解决这个问题的一种常见方法是在队列本身引入一个 quit 标志或一个特殊的值,然后提供一个 stop() 方法,以及让 dequeue() 通过使用 std::optional<T> 作为返回值来表示关闭的队列的方法。

template <typename T>
class ThreadQueue {
  std::queue<T> q_;
  std::mutex mtx;
  std::condition_variable cv;
  bool quit = false;

public:
  void enqueue (const T& t) {
    {
      std::lock_guard<std::mutex> lck(mtx);
      q_.push(t);
    }
    cv.notify_one();
  }

  std::optional<T> dequeue () {
    std::unique_lock<std::mutex> lck(mtx);
    cv.wait(lck, [this] { return quit || !q_.empty(); });
    if (quit) {
        return {};
    }
    T t = q_.front();
    q_.pop();
    return t;
  }

  void stop() {
    std::unique_lock<std::mutex> lck(mtx);
    quit = true;
    cv.notify_all();
  }

};

然后,当 dequeue() 返回一个空的 optional 时,工作线程可以优雅地退出。

void worker() {
  std::cout << "[worker] starting..." << std::endl;
  while (true) {
      std::cout << "[worker] extract element from the queue" << std::endl;
      auto el = my_queue.dequeue();
      if (!el) {
        std::cout << "[worker] exiting" << std::endl;
        break;
      }
      std::cout << "[worker] consume extracted element" << std::endl;
      std::cout << *el << std::endl;
  }
    
  std::cout << "[worker] exiting" << std::endl;
}

int main() {
  std::thread t(worker);
  std::this_thread::sleep_for(std::chrono::seconds(1));
  
  std::cout << "[main] terminating..." << std::endl;
  my_queue.stop();
  t.join();
  std::cout << "[main] terminated!" << std::endl;
  return 0;
}
英文:

The object that contains the mutex should also own the condition variable. So the ThreadQueue code looks good. But it is unclear what dequeue() should return when an asynchronous stop is requested.

A common way to solve this is to introduce either a quit flag or a sentinel value to the queue itself, a stop() method and a way for dequeue() to signal a closed queue, for example, using std::optional&lt;T&gt; as return value.

template &lt;typename T&gt;
class ThreadQueue {
  std::queue&lt;T&gt; q_;
  std::mutex mtx;
  std::condition_variable cv;
  bool quit = false;

public:
  void enqueue (const T&amp; t) {
    {
      std::lock_guard&lt;std::mutex&gt; lck(mtx);
      q_.push(t);
    }
    cv.notify_one();
  }

  std::optional&lt;T&gt; dequeue () {
    std::unique_lock&lt;std::mutex&gt; lck(mtx);
    cv.wait(lck, [this] { return quit || !q_.empty(); });
    if (quit) {
        return {};
    }
    T t = q_.front();
    q_.pop();
    return t;
  }

  void stop() {
    std::unique_lock&lt;std::mutex&gt; lck(mtx);
    quit = true;
    cv.notify_all();
  }

};

Then when dequeue() returns an empty optional, the worker can exit gracefully.

void worker() {
  std::cout &lt;&lt; &quot;[worker] starting...&quot; &lt;&lt; std::endl;
  while (true) {
      std::cout &lt;&lt; &quot;[worker] extract element from the queue&quot; &lt;&lt; std::endl;
      auto el = my_queue.dequeue();
      if (!el) {
        std::cout &lt;&lt; &quot;[worker] exiting&quot; &lt;&lt; std::endl;
        break;
      }
      std::cout &lt;&lt; &quot;[worker] consume extracted element&quot; &lt;&lt; std::endl;
      std::cout &lt;&lt; *el &lt;&lt; std::endl;
  }
    
  std::cout &lt;&lt; &quot;[worker] exiting&quot; &lt;&lt; std::endl;
}

int main() {
  std::thread t(worker);
  std::this_thread::sleep_for(std::chrono::seconds(1));
  
  std::cout &lt;&lt; &quot;[main] terminating...&quot; &lt;&lt; std::endl;
  my_queue.stop();
  t.join();
  std::cout &lt;&lt; &quot;[main] terminated!&quot; &lt;&lt; std::endl;
  return 0;
}

答案2

得分: 0

这是对你的类进行的一个快速粗糙修改,添加了停止函数:

template <typename T>
class ThreadQueue {
  std::queue<T> q_;
  std::mutex mtx;
  std::condition_variable cv;
  std::atomic<bool> running = true;

public:
  void enqueue(const T& t) {
    {
      std::lock_guard<std::mutex> lck(mtx);
      q_.push(t);
    }
    cv.notify_one();
  }

  T dequeue() {
    std::unique_lock<std::mutex> lck(mtx);
    cv.wait(lck, [this] { return !q_.empty() || !running; });
    if (!running){return {};} // 清理部分 1
    T t = q_.front();
    q_.pop();
    return t;
  }

  bool is_running()
  {
    return running;
  }

  void stop()
  {
    running = false;
    cv.notify_all(); // 清理部分 2
  }
};

查看实际示例:https://godbolt.org/z/bje6Gj7o4

显然,需要根据你的要求进行进一步整理。

英文:

This is a quick hacky mod to your class to add stop function:

template &lt;typename T&gt;
class ThreadQueue {
  std::queue&lt;T&gt; q_;
  std::mutex mtx;
  std::condition_variable cv;
  std::atomic&lt;bool&gt; running = true;

public:
  void enqueue (const T&amp; t) {
    {
      std::lock_guard&lt;std::mutex&gt; lck(mtx);
      q_.push(t);
    }
    cv.notify_one();
  }

  T dequeue () {
    std::unique_lock&lt;std::mutex&gt; lck(mtx);
    cv.wait(lck, [this] { return !q_.empty() || !running; });
    if (!running){return {};} // tidy-up part 1
    T t = q_.front();
    q_.pop();
    return t;
  }

  bool is_running()
  {
    return running;
  }

  void stop()
  {
    running = false;
    cv.notify_all(); // tidy-up part 2
  }
};

see live example: https://godbolt.org/z/bje6Gj7o4

Obviously needs tidying up as you require

huangapple
  • 本文由 发表于 2023年2月18日 00:45:00
  • 转载请务必保留本文链接:https://go.coder-hub.com/75487046.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定