多线程C++程序的意外输出

huangapple go评论56阅读模式
英文:

Unexpected output of multithreaded C++ program

问题

I'm studying concurrency in C++ and I'm trying to implement a multithreaded callback registration system. I came up with the following code, which is supposed to accept registration requests until an event occurs. After that, it should execute all the registered callbacks in order with which they were registered. The registration order doesn't have to be deterministic.

#include <condition_variable>
#include <functional>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>

class CallbackRegistrar{
public:
    void registerCallbackAndExecute(std::function<void()> callback) {
        if (!eventTriggered) {
            std::unique_lock<std::mutex> lock(callbackMutex);
            auto saved_id = callback_id;
            std::cout << "Pushing callback with id " << saved_id << std::endl;
            registeredCallbacks.push(std::make_pair(callback_id, callback));
            ++callback_id;
            callbackCond.wait(lock, [this, saved_id]{return releasedCallback.first == saved_id;});
            releasedCallback.second();
            callbackExecuted = true;
            eventCond.notify_one();
        }
        else {
            callback();
        }
    }
    void registerEvent() {
        eventTriggered = true;
        while (!registeredCallbacks.empty()) {
            releasedCallback = registeredCallbacks.front();
            callbackCond.notify_all();
            std::unique_lock<std::mutex> lock(eventMutex);
            eventCond.wait(lock, [this]{return callbackExecuted;});
            callbackExecuted = false;
            registeredCallbacks.pop();
        }
    }
private:
    std::queue<std::pair<unsigned, std::function<void()>> registeredCallbacks;
    bool eventTriggered{false};
    bool callbackExecuted{false};
    std::mutex callbackMutex;
    std::mutex eventMutex;
    std::condition_variable callbackCond;
    std::condition_variable eventCond;
    unsigned callback_id{1};
    std::pair<unsigned, std::function<void()>> releasedCallback;
};

int main()
{
    CallbackRegistrar registrar;
    std::thread t1(&CallbackRegistrar::registerCallbackAndExecute, std::ref(registrar), []{std::cout << "First!\n";});
    std::thread t2(&CallbackRegistrar::registerCallbackAndExecute, std::ref(registrar), []{std::cout << "Second!\n";});
    
    registrar.registerEvent();
    
    t1.join();
    t2.join();

    return 0;
}

请注意:此代码可能存在一些问题,需要进行调试和改进。如果需要帮助,请提出具体问题。

英文:

I'm studying concurrency in C++ and I'm trying to implement a multithreaded callback registration system. I came up with the following code, which is supposed to accept registration requests until an event occurs. After that, it should execute all the registered callbacks in order with which they were registered. The registration order doesn't have to be deterministic.
The code doesn't work as expected. First of all, it rarely prints the "Pushing callback with id" message. Secondly, it sometimes hangs (a deadlock caused by a race condition, I assume). I'd appreciate help in figuring out what's going on here. If you see that I overcomplicate some parts of the code or misuse some pieces, please also point it out.

#include &lt;condition_variable&gt;
#include &lt;functional&gt;
#include &lt;iostream&gt;
#include &lt;mutex&gt;
#include &lt;queue&gt;
#include &lt;thread&gt;

class CallbackRegistrar{
public:
    void registerCallbackAndExecute(std::function&lt;void()&gt; callback) {
        if (!eventTriggered) {
            std::unique_lock&lt;std::mutex&gt; lock(callbackMutex);
            auto saved_id = callback_id;
            std::cout &lt;&lt; &quot;Pushing callback with id &quot; &lt;&lt; saved_id &lt;&lt; std::endl;
            registeredCallbacks.push(std::make_pair(callback_id, callback));
            ++callback_id;
            callbackCond.wait(lock, [this, saved_id]{return releasedCallback.first == saved_id;});
            releasedCallback.second();
            callbackExecuted = true;
            eventCond.notify_one();
        }
        else {
            callback();
        }
    }
    void registerEvent() {
        eventTriggered = true;
        while (!registeredCallbacks.empty()) {
            releasedCallback = registeredCallbacks.front();
            callbackCond.notify_all();
            std::unique_lock&lt;std::mutex&gt; lock(eventMutex);
            eventCond.wait(lock, [this]{return callbackExecuted;});
            callbackExecuted = false;
            registeredCallbacks.pop();
        }
    }
private:
    std::queue&lt;std::pair&lt;unsigned, std::function&lt;void()&gt;&gt;&gt; registeredCallbacks;
    bool eventTriggered{false};
    bool callbackExecuted{false};
    std::mutex callbackMutex;
    std::mutex eventMutex;
    std::condition_variable callbackCond;
    std::condition_variable eventCond;
    unsigned callback_id{1};
    std::pair&lt;unsigned, std::function&lt;void()&gt;&gt; releasedCallback;
};

int main()
{
    CallbackRegistrar registrar;
    std::thread t1(&amp;CallbackRegistrar::registerCallbackAndExecute, std::ref(registrar), []{std::cout &lt;&lt; &quot;First!\n&quot;;});
    std::thread t2(&amp;CallbackRegistrar::registerCallbackAndExecute, std::ref(registrar), []{std::cout &lt;&lt; &quot;Second!\n&quot;;});
    
    registrar.registerEvent();
    
    t1.join();
    t2.join();

    return 0;
}

答案1

得分: 1

以下是您要翻译的内容:

"这个答案已经根据OP在评论中提供的更多信息进行了编辑,编辑位于答案底部。

除了评论中提供的出色建议之外,我在您的代码中发现的主要问题是callbackCond条件变量等待条件的设置。如果releasedCallback.first不等于savedId会发生什么?

当我运行您的代码(使用线程安全队列和eventTriggered作为原子变量)时,发现问题在于这个等待函数,如果在该函数中放置一个打印语句,您会发现会得到类似以下的内容:

releasedCallback.first: 0, savedId: 1

然后它会永远等待。

事实上,我发现您的代码中使用的条件变量实际上是不需要的。您只需要一个条件变量,它可以存在于您稍后将构建的线程安全队列中;)

一旦您有了线程安全队列,上面的代码可以简化为:

class CallbackRegistrar{
public:
  using NumberedCallback = std::pair<unsigned int, std::function<void()>>;

  void postCallback(std::function<void()> callback) {

    if (!eventTriggered)
    {
      std::unique_lock<std::mutex> lock(mutex);
      auto saved_id = callback_id;
      std::cout << "Pushing callback with id " << saved_id << std::endl;
      registeredCallbacks.push(std::make_pair(callback_id, callback));
      ++callback_id;
    }
    else
    {
      while (!registeredCallbacks.empty())
      {
        NumberedCallback releasedCallback;
        registeredCallbacks.waitAndPop(releasedCallback);
        releasedCallback.second();
      }
      callback();
    }
  }
  void registerEvent() {
    eventTriggered = true;
  }
private:
  ThreadSafeQueue<NumberedCallback> registeredCallbacks;
  std::atomic<bool> eventTriggered{false};
  std::mutex mutex;
  unsigned int callback_id{1};
};

int main()
{
  CallbackRegistrar registrar;
  std::vector<std::thread> threads;

  for (int i = 0; i < 10; i++)
  {
    threads.push_back(std::thread(&CallbackRegistrar::postCallback, 
                                  std::ref(registrar), 
                                  [i]{std::cout << std::to_string(i) << "\n";}
                                  ));
  }

  registrar.registerEvent();

  for (auto& thread : threads)
  {
    thread.join();
  }

  return 0;
}

我不确定这是否完全符合您的要求,但它不会导致死锁。无论如何,这是一个很好的起点,但您需要自己实现ThreadSafeQueue。

编辑

这个编辑是对OP的评论做出的回应,评论中提到“一旦事件发生,所有回调应按它们被推送到队列中的顺序,并且由注册它们的同一线程执行”。

这在原始问题帖中没有提到。然而,如果这是所需的行为,那么我们需要在postCallback方法中使用条件变量等待。我认为这也是为什么OP最初在postCallback方法中使用条件变量的原因。

在下面的代码中,我对回调进行了一些编辑,它们现在接受输入参数。我这样做是为了在代码运行时打印一些有用的信息,以便更容易看到它是如何工作的,尤其是条件变量等待是如何工作的。

基本思路与您之前所做的类似,我只是剔除了不需要的部分。

class CallbackRegistrar{
public:
  using NumberedCallback = std::pair<unsigned int, std::function<void(int, int)>>;

  void postCallback(std::function<void(int, int)> callback, int threadId) {

    if (!m_eventTriggered)
    {
      // 锁住 m_mutex
      std::unique_lock<std::mutex> lock(m_mutex);

      // 保存当前的回调ID并将回调推送到队列
      auto savedId = m_currentCallbackId++;
      std::cout << "Pushing callback with ID " << savedId << "\n";
      m_registeredCallbacks.push(std::make_pair(savedId, callback));

      // 等待,直到我们线程的回调位于队列中的下一个位置,
      // 这将发生在最后调用的回调的ID比我们保存的回调小1时。
      m_conditionVariable.wait(lock, [this, savedId, threadId] () -> bool
      {
        std::cout << "Waiting on thread " << threadId << " last: " << m_lastCalledCallbackId << ", saved - 1: " << (savedId - 1) << "\n";
        return (m_lastCalledCallbackId == (savedId - 1));
      });

      // 一旦我们完成等待,从队列中获取回调
      NumberedCallback retrievedCallback;
      m_registeredCallbacks.waitAndPop(retrievedCallback);

      // 更新最后的回调ID并调用回调
      m_lastCalledCallbackId = retrievedCallback.first;
      retrievedCallback.second(m_lastCalledCallbackId, threadId);

      // 通知一个等待的线程
      m_conditionVariable.notify_one();
    }
    else
    {
      // 如果事件已经触发,立即调用回调
      callback(-1, threadId);
    }
  }

  void registerEvent() {
    // 这是我们需要做的一切。
    m_eventTriggered = true;
  }

private:
  ThreadSafeQueue<NumberedCallback> m_registeredCallbacks;
  std::atomic<bool> m_eventTriggered{ false};
  std::mutex m_mutex;
  std::condition_variable m_conditionVariable;
  unsigned int m_currentCallbackId{ 1};
  std::atomic<unsigned int> m_lastCalledCallbackId{ 0};
};

主函数与上述相同,只是我创建了100个线程而不是10个,并且我让回调打印有关其调用方式的信息。

for (int createdThreadId = 0; createdThreadId < 100; createdThreadId++)
{
  threads.push_back(std::thread(&CallbackRegistrar::postCallback,
                                std::ref(registrar),
                                [createdThreadId](int registeredCallbackId, int callingThreadId)
                                {
                                  if (registeredCallbackId < 0)


<details>
<summary>英文:</summary>

*This answer has been edited in response to more information being provided by the OP in a comment, the edit is at the bottom of the answer.*

Along with the excellent suggestions in the comments, the main problem that I have found in your code is with the `callbackCond` condition variable wait condition that you have set up. What happens if `releasedCallback.first` does not equal `savedId`? 

When I have run your code (with a thread-safe queue and `eventTriggered` as an atomic) I found that the problem was in this wait function, if you put a print statement in that function you will find that you get something like this:

```text
releasedCallback.first: 0, savedId: 1

This then waits forever.

In fact, I've found that the condition variables used in your code aren't actually needed. You only need one, and it can live inside the thread-safe queue that you are going to build after some searching 多线程C++程序的意外输出

After you have the thread-safe queue, the code from above can be reduced to:

class CallbackRegistrar{
public:
  using NumberedCallback = std::pair&lt;unsigned int, std::function&lt;void()&gt;&gt;;

  void postCallback(std::function&lt;void()&gt; callback) {

    if (!eventTriggered)
    {
      std::unique_lock&lt;std::mutex&gt; lock(mutex);
      auto saved_id = callback_id;
      std::cout &lt;&lt; &quot;Pushing callback with id &quot; &lt;&lt; saved_id &lt;&lt; std::endl;
      registeredCallbacks.push(std::make_pair(callback_id, callback));
      ++callback_id;
    }
    else
    {
      while (!registeredCallbacks.empty())
      {
        NumberedCallback releasedCallback;
        registeredCallbacks.waitAndPop(releasedCallback);
        releasedCallback.second();
      }
      callback();
    }
  }
  void registerEvent() {
    eventTriggered = true;
  }
private:
  ThreadSafeQueue&lt;NumberedCallback&gt; registeredCallbacks;
  std::atomic&lt;bool&gt; eventTriggered{false};
  std::mutex mutex;
  unsigned int callback_id{1};
};

int main()
{
  CallbackRegistrar registrar;
  std::vector&lt;std::thread&gt; threads;

  for (int i = 0; i &lt; 10; i++)
  {
    threads.push_back(std::thread(&amp;CallbackRegistrar::postCallback, 
                                  std::ref(registrar), 
                                  [i]{std::cout &lt;&lt; std::to_string(i) &lt;&lt;&quot;\n&quot;;}
                                  ));
  }

  registrar.registerEvent();

  for (auto&amp; thread : threads)
  {
    thread.join();
  }

  return 0;
}

I'm not sure if this does exactly what you want, but it doesn't deadlock. It's a good starting point in any case, but you need to bring your own implementation of ThreadSafeQueue.

Edit

This edit is in response to the comment by the OP stating that "once the event occurs, all the callbacks should be executed in [the] order that they've been pushed to the queue and by the same thread that registered them".

This was not mentioned in the original question post. However, if that is the required behaviour then we need to have a condition variable wait in the postCallback method. I think this is also the reason why the OP had the condition variable in the postCallback method in the first place.

In the code below I have made a few edits to the callbacks, they now take input parameters. I did this to print some useful information while the code is running so that it is easier to see how it works, and, importantly how the condition variable wait is working.

The basic idea is similar to what you had done, I've just trimmed out the stuff you didn't need.

class CallbackRegistrar{
public:
  using NumberedCallback = std::pair&lt;unsigned int, std::function&lt;void(int, int)&gt;&gt;;

  void postCallback(std::function&lt;void(int, int)&gt; callback, int threadId) {

    if (!m_eventTriggered)
    {
      // Lock the m_mutex
      std::unique_lock&lt;std::mutex&gt; lock(m_mutex);

      // Save the current callback ID and push the callback to the queue
      auto savedId = m_currentCallbackId++;
      std::cout &lt;&lt; &quot;Pushing callback with ID &quot; &lt;&lt; savedId &lt;&lt; &quot;\n&quot;;
      m_registeredCallbacks.push(std::make_pair(savedId, callback));

      // Wait until our thread&#39;s callback is next in the queue,
      // this will occur when the ID of the last called callback is one less than our saved callback.
      m_conditionVariable.wait(lock, [this, savedId, threadId] () -&gt; bool
      {
        std::cout &lt;&lt; &quot;Waiting on thread &quot; &lt;&lt; threadId &lt;&lt; &quot; last: &quot; &lt;&lt; m_lastCalledCallbackId &lt;&lt; &quot;, saved - 1: &quot; &lt;&lt; (savedId - 1) &lt;&lt; &quot;\n&quot;;
        return (m_lastCalledCallbackId == (savedId - 1));
      });

      // Once we are finished waiting, get the callback out of the queue
      NumberedCallback retrievedCallback;
      m_registeredCallbacks.waitAndPop(retrievedCallback);

      // Update last callback ID and call the callback
      m_lastCalledCallbackId = retrievedCallback.first;
      retrievedCallback.second(m_lastCalledCallbackId, threadId);

      // Notify one waiting thread
      m_conditionVariable.notify_one();
    }
    else
    {
      // If the event is already triggered, call the callback straight away
      callback(-1, threadId);
    }
  }

  void registerEvent() {
    // This is all we have to do here.
    m_eventTriggered = true;
  }

private:
  ThreadSafeQueue&lt;NumberedCallback&gt; m_registeredCallbacks;
  std::atomic&lt;bool&gt; m_eventTriggered{ false};
  std::mutex m_mutex;
  std::condition_variable m_conditionVariable;
  unsigned int m_currentCallbackId{ 1};
  std::atomic&lt;unsigned int&gt; m_lastCalledCallbackId{ 0};
};

The main function is as above, except I am creating 100 threads instead of 10, and I have made the callback print out information about how it was called.

for (int createdThreadId = 0; createdThreadId &lt; 100; createdThreadId++)
{
  threads.push_back(std::thread(&amp;CallbackRegistrar::postCallback,
                                std::ref(registrar),
                                [createdThreadId](int registeredCallbackId, int callingThreadId)
                                {
                                  if (registeredCallbackId &lt; 0)
                                  {
                                    std::cout &lt;&lt; &quot;Callback &quot; &lt;&lt; createdThreadId;
                                    std::cout &lt;&lt; &quot; called immediately, from thread: &quot; &lt;&lt; callingThreadId &lt;&lt; &quot;\n&quot;;
                                  }
                                  else
                                  {
                                    std::cout &lt;&lt; &quot;Callback &quot; &lt;&lt; createdThreadId;
                                    std::cout &lt;&lt; &quot; called from thread &quot; &lt;&lt; callingThreadId;
                                    std::cout &lt;&lt; &quot; after being registered as &quot; &lt;&lt; registeredCallbackId &lt;&lt; &quot;\n&quot;;
                                  }
                                },
                                createdThreadId));
}

I am not entirely sure why you want to do this, as it seems to defeat the point of having multiple threads, although I may be missing something there. But, regardless, I hope this helps you to understand better the problem you are trying to solve.

答案2

得分: 0

我发现这段代码更多的实验后,发现了为什么“Pushing callback with id”的部分很少被打印。这是因为主线程中对registrar.registerEvent的调用通常比从不同线程调用registerCallbackAndExecute要快。因此,几乎从不满足条件if (!eventTriggered)eventTriggered已在registerEvent方法中设置为true),因此所有对registerCallbackAndExecute的调用都会进入else分支并立即执行。

然后,有时程序也无法完成,因为registerEventregisterCallbackAndExecute之间存在竞争条件。有时,在检查if (!eventTriggered)之后但在将回调推送到队列之前调用了registerEvent。然后,registerEvent立即完成(因为队列为空),而调用registerCallbackAndExecute的线程正在将回调推送到队列。然后,后者线程将永远等待事件(事件已经发生)。

英文:

Experimenting with this code some more, I found out why the "Pushing callback with id " part was rarely printed. It's because the call to registrar.registerEvent from the main thread was usually faster than the calls to registerCallbackAndExecute from separate threads. Because of that, the condition if (!eventTriggered) was almost never fulfilled (eventTriggered had been set to true in the registerEvent method) and hence all calls to registerCallbackAndExecute were falling into the else branch and executing straightaway.
Then, the program sometimes also didn't finish, because of a race condition between registerEvent and registerCallbackAndExecute. Sometimes, registerEvent was being called after the check if (!eventTriggered) but before pushing the callback to the queue. Then, registerEvent completed instantly (as the queue was empty) while the thread calling registerCallbackAndExecute was pushing the callback to the queue. The latter thread then kept waiting forever for the event (that had already happened) to happen.

huangapple
  • 本文由 发表于 2023年2月14日 01:41:17
  • 转载请务必保留本文链接:https://go.coder-hub.com/75439426.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定