我应该在线程池调度器中使用信号量还是条件变量?

huangapple go评论82阅读模式
英文:

Should i use a semaphore or condition variable in threadpool scheduler?

问题

I'm unsure whether I should use a std::semaphore or std::condition variable to handle thread waiting in a threadpool system.

我不确定在线程池系统中是否应该使用std::semaphore还是std::condition变量来处理线程等待。

I think that there is a low risk of using the semaphore and incrementing it beyond its compiler specific ::max().

我认为使用信号量并将其增加到超出其编译器特定的::max()的风险较低。

I'm using c++20.

我正在使用C++20。

Some of the following code is somewhat pseudo-code, but illustrates the program structure.

以下部分代码有些伪代码,但说明了程序结构。

class ThreadPool
{
private: 
     struct ThreadData {
     
     threadsafe_queue < fn2::unique_function<void() > > ThreadTask; //library that makes moveable function
     **std::std::counting_semaphore<1> ThreadSignal {0}; //This is what i'm unsure of**
     
     }


    std::vector<std::jthread> mThreads;
    std::vector<ThreadData> mThreadData;

    template < typename Func >
    void CreateTask(Func&& func) {

      //iterate over threads and distribute the tasks
      ...
      mThreadData[thread_id].ThreadTask.emplace_back(std::move(func));
      mThreadData[thread_id].ThreadSignal.release(); //tell the queue there is work to do

     }
public:
    //Functions that parse a function into the above CreateTask()
    ...

    ThreadPool() {
     //for the required number of threads create them with an internal lambda that handles the work
    ...
         mThreads.emplace_back(std::move(std::jthread([id = thread_id]() {
              while (running) {
                    **mThreadData[id].ThreadSignal.aquire(); //gets the go-signal, which is still the focus of this post**
                    //handle work, either by getting from internal queue or stealing other work
                    //thread might "over"-consume the work in general by stealing work, but if a thread runs through this part without any work to get it will eventually run into a semaphore block
              }
         }
     }
}

If for example the program allocates 1000 tasks to 2 threads. The main thread will quickly increase the signal semaphore to a very high number. I'm unsure if this is risking undefined behavior? Should I implement a condition-variable instead? The semaphores should be faster than condition variables in general, which is a reason to want to keep it. It's also really simple, since I can easily activate all threads by iterating over the semaphores and calling release on them all.

如果例如程序将1000个任务分配给2个线程。主线程会很快将信号量增加到很高的数字。我不确定这是否会导致未定义的行为?我应该改用条件变量吗?信号量通常应该比条件变量更快,这是希望保留它的原因。而且这样做非常简单,因为我可以通过迭代信号量并在所有线程上调用release来轻松激活所有线程。

Previously I've had a simple this_thread::yield() call if no task gets pop'ed off the list, but it ends up just consuming a lot of CPU time, which is the reason why I'd like the threads to "pause" and use a gateway of sorts.

以前,如果没有任务从列表中弹出,我会使用简单的this_thread::yield()调用,但最终会消耗大量的CPU时间,这就是我希望线程“暂停”并使用某种网关的原因。

Please bear with me, if I have misunderstood some concept, since I'm still a newbie.

如果我误解了一些概念,请谅解,因为我还是个新手。

英文:

I'm unsure whether I should use a std::semaphore or std::condition variable to handle thread waiting in a threadpool system.

I think that there is a low risk of using the semaphore and incrementing it beyond its compiler specific ::max().

I'm using c++20

Some of the following code is somewhat pseudo-code, but illustrates the program structure.

class ThreadPool
{
private: 
     struct ThreadData {
     
     threadsafe_queue < fn2::unique_function<void() > > ThreadTask; //library that makes moveable function
     **std::std::counting_semaphore<1> ThreadSignal {0}; //This is what i'm unsure of**
     
     }


    std::vector<std::jthread> mThreads;
    std::vector<ThreadData> mThreadData;

    template < typename Func >
    void CreateTask(Func&& func) {

      //iterate over threads and distribute the tasks
      ...
      mThreadData[thread_id].ThreadTask.emplace_back(std::move(func));
      mThreadData[thread_id].ThreadSignal.release(); //tell the queue there is work to do

     }
public:
    //Functions that parse a function into the above CreateTask()
    ...

    ThreadPool() {
     //for the required number of threads create them with an internal lambda that handles the work
    ...
         mThreads.emplace_back(std::move(std::jthread([id = thread_id]() {
              while (running) {
                    **mThreadData[id].ThreadSignal.aquire(); //gets the go-signal, which is still the focus of this post**
                    //handle work, either by getting from internal queue or stealing other work
                    //thread might "over"-consume the work in general by stealing work, but if a thread runs through this part without any work to get it will eventually run into a semaphore block
              }
         }
     }
}

If for example the program allocates 1000 tasks to 2 threads. The main thread will quickly increase the signal semaphore to a very high number. I'm unsure if this is risking undefined behaviour? Should I implement a condition-variable instead? The semaphores should be faster than condition varibles in general, which is a reason to want to keep it. It's also really simple, since i can easily activate all theads by iterating over the semaphores and calling release on them all.

Previously I've had a simple this_thread::yield() call if no task gets pop'ed off the list, but it ends up just consuming a lot of cpu-time, which is the reason why I'd like the threads to "pause" and use a gateway of sorts.

Please bear with me, if I have misunderstood some concept, since I'm still a newbie.

答案1

得分: 2

使用线程队列的工作方式是让工作线程等待一个条件变量,该变量表示队列为空的条件,因此如果将新消息放入队列,等待的工作线程可以开始,而已完成的工作线程只要队列不为空就可以继续。这样,只有队列需要进行计数。

您还可以通过添加一个条件变量来保护内存,该变量表示队列已满,并让供给线程在队列满时等待。

英文:

With a thread-queue, the way to work is to let the worker threads wait on a condition variable that tells the condition that the queue is empty, so if a new message is put on the queue, a waiting worker can start and a done worker can continue as long as the queue is not empty. This way, only the queue must do the counting.

You can also protect the memory by adding a condition variable the tells that the queue is full and let the feeding thread wait as long as the queue is full.

答案2

得分: 1

I propose an entirely generic thread pool implementation (adopted from comments to question and answers – and going beyond the scope of the question; in the sense of the question: I implemented the pool using a semaphore simply because it is slightly less complicated to use, e.g. no need for keeping an additional mutex locked before waiting or caring for spurious wakeups; have an eye for in the code… – the other approach is totally valid, too, though). It implements the thread pool as a template, providing most efficiency possible in respect to adding and removing tasks from the queue – though unfortunately this requires the generic Task base class which all potentially to be added tasks would inherit from to be defined outside of the template (its an implementation detail and normally should go inside as a nested class; C++ doesn't allow to predeclare nested classes, though...). A minor disadvantage, too, is that one cannot store arbitrary implementations in the same pointer type, but that's likely not relevant here anyway.

class Task
{
public:
    Task() { }
    virtual ~Task() { }
private:
    template <typename Queue>
    friend class ThreadPool;
    virtual void exexute() = 0;
};

template <typename Queue = std::queue<Task*>>
class ThreadPool
{
public:
    ThreadPool() : ThreadPool(std::thread::hardware_concurrency())
    { }

    ThreadPool(size_t numberOfThreads) 
        : m_notifier(0)
    {
        for(; numberOfThreads; --numberOfThreads)
        {
            m_threads.emplace_back(&ThreadPool::run, this, m_threads.size());
        }
    }

    ThreadPool(ThreadPool const&) = delete;
    ThreadPool& operator=(ThreadPool const&) = delete;

    ~ThreadPool()
    {
        m_exit = true;
        for(auto n = m_threads.size(); n; --n)
        {
            m_notifier.release();
        }
        for(auto& t : m_threads)
        {
            t.join();
        }
    }

    void append(Task* task)
    {
        if(task)
        {
            {
                std::lock_guard g(m_queueGuard);
                m_queue.push(task);
                // releasing as soon as possible; no need to keep
                // the mutex locked when releasing the semaphore
            } 
            m_notifier.release();
        }
    }

private:
    std::vector<std::thread> m_threads;
    Queue m_queue;
    std::mutex m_queueGuard;
#ifdef _MSC_VER
    std::counting_semaphore<std::numeric_limits<ptrdiff_t>::max()> m_notifier;
#else
    // oh [@*%#$+]!, neither gcc nor clang allow above (MSVC does, though)
    std::counting_semaphore<2147483647> m_notifier;
    // corresponds to INT_MAX here, but maybe that differs on other platforms...
    // apparently there's no standard way to get that, so we'd relay on compiler
    // specifics :(
#endif
    bool m_exit = false;

    void run()
    {
        for(;;)
        {
            m_notifier.acquire();
            if(m_exit)
            {
                break;
            }
            Task* task;
            {
                std::lock_guard g(m_queueGuard);
                task = static_cast<Task*>(m_queue.front());
                m_queue.pop();
            }
            delete task;
        }
        file.close();
    }
};

Recommendation: Place your own namespace around.

Usage then is simple: Implement any arbitrary task to be run by inheriting from the base Task class and add it to the pool:

class TheTask : public Task
{
public:
    TheTask()
        : m_taskId(++g_n)
    { }
private:
    inline static size_t g_n = 0;
    size_t m_taskId;

    void execute(std::ofstream& s)
    {
        s << m_taskId << std::endl;
        // without some duration indeed can get pretty unbalanced...
        std::this_thread::sleep_for(std::chrono::milliseconds(100));

    }
};

int main()
{
    ThreadPool p(4);

    for(size_t i = 0; i < 32; ++i)
    {
        p.append(new TheTask());
    }

    std::this_thread::sleep_for(std::chrono::seconds(4));
    return 0;
}

Demonstration on godbolt – though it's not possible to access the output files created there, so probably better to translate on one's own...

Further possible extension: Maybe you want to allow locally allocated tasks, these, of course, must not get deleted automatically! You might want to add a flag to the Task base class (private member) that is set appropriately, e.g.:


void append(Task& task, bool deleteOnCompletion = false)
{
    task->m_delete = deleteOnCompletion;

    // append as before
}


void append(Task* task, bool deleteOnCompletion = true)
{
    if(task)
    {
        append(*task, deleteOnCompletion);
    }
}

void run()
{
    //...

    task->execute();
    if(task->m_delete)
    {
        delete task;
    }
}
英文:

I propose an entirely generic thread pool implementation (adopted from comments to question and answers – and going beyond the scope of the question; in the sense of the question: I implemented the pool using a semaphore simply because it is slightly less complicated to use, e.g. no need for keeping an additional mutex locked before waiting or caring for spurious wakeups; have an eye for in the code… – the other approach is totally valid, too, though). It implements the thread pool as a template, providing most efficiency possible in respect to adding and removing tasks from the queue – though unfortunately this requires the generic Task base class which all potentially to be added tasks would inherit from to be defined outside of the template (its an implementation detail and normally should go inside as a nested class; C++ doesn't allow to predeclare nested classes, though...). A minor disadvantage, too, is that one cannot store arbitrary implementations in the same pointer type, but that's likely not relevant here anyway.

class Task
{
public:
Task() { }
virtual ~Task() { }
private:
template &lt;typename Queue&gt;
friend class ThreadPool;
virtual void exexute() = 0;
};
template &lt;typename Queue = std::queue&lt;Task*&gt;&gt;
class ThreadPool
{
public:
ThreadPool() : ThreadPool(std::thread::hardware_concurrency())
{ }
ThreadPool(size_t numberOfThreads) 
: m_notifier(0)
{
for(; numberOfThreads; --numberOfThreads)
{
m_threads.emplace_back(&amp;ThreadPool::run, this, m_threads.size());
}
}
ThreadPool(ThreadPool const&amp;) = delete;
ThreadPool&amp; operator=(ThreadPool const&amp;) = delete;
~ThreadPool()
{
m_exit = true;
for(auto n = m_threads.size(); n; --n)
{
m_notifier.release();
}
for(auto&amp; t : m_threads)
{
t.join();
}
}
void append(Task* task)
{
if(task)
{
{
std::lock_guard g(m_queueGuard);
m_queue.push(task);
// releasing as soon as possible; no need to keep
// the mutex locked when releasing the semaphore
} 
m_notifier.release();
}
}
private:
std::vector&lt;std::thread&gt; m_threads;
Queue m_queue;
std::mutex m_queueGuard;
#ifdef _MSC_VER
std::counting_semaphore&lt;std::numeric_limits&lt;ptrdiff_t&gt;::max()&gt; m_notifier;
#else
// oh [@*%#$+]!, neither gcc nor clang allow above (MSVC does, though)
std::counting_semaphore&lt;2147483647&gt; m_notifier;
// corresponds to INT_MAX here, but maybe that differs on other platforms...
// apparently there&#39;s no standard way to get that, so we&#39;d relay on compiler
// specifics :(
#endif
bool m_exit = false;
void run()
{
for(;;)
{
m_notifier.acquire();
if(m_exit)
{
break;
}
Task* task;
{
std::lock_guard g(m_queueGuard);
task = static_cast&lt;Task*&gt;(m_queue.front());
m_queue.pop();
}
delete task;
}
file.close();
}
};

Recommendation: Place your own namespace around.

Usage then is simple: Implement any arbitrary task to be run by inheriting from the base Task class and add it to the pool:

class TheTask : public Task
{
public:
TheTask()
: m_taskId(++g_n)
{ }
private:
inline static size_t g_n = 0;
size_t m_taskId;
void execute(std::ofstream&amp; s)
{
s &lt;&lt; m_taskId &lt;&lt; std::endl;
// without some duration indeed can get pretty unbalanced...
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
};
int main()
{
ThreadPool p(4);
for(size_t i = 0; i &lt; 32; ++i)
{
p.append(new TheTask());
}
std::this_thread::sleep_for(std::chrono::seconds(4));
return 0;
}

Demonstration on godbolt – though it's not possible to access the output files created there, so probably better to translate on one's own...

Further possible extension: Maybe you want to allow locally allocated tasks, these, of course, must not get deleted automatically! You might want to add a flag to the Task base class (private member) that is set appropriately, e.g.:


void append(Task&amp; task, bool deleteOnCompletion = false)
{
task-&gt;m_delete = deleteOnCompletion;
// append as before
}
void append(Task* task, bool deleteOnCompletion = true)
{
if(task)
{
append(*task, deleteOnCompletion);
}
}
void run()
{
//...
task-&gt;execute();
if(task-&gt;m_delete)
{
delete task;
}
}

答案3

得分: 0

根据Aconcagua的建议,通过从ThreadData结构中移除信号量,向线程池添加一个成员变量std::counting_semaphore&lt;&gt; mPoolSemaphore{0};(该信号量具有一个非常大的::max值),最后修改CreateTask()函数以增加上述信号量,并在每个线程内创建一个类似以下的循环:

while (!mDone) {
    mPoolSemaphore.acquire();		
    while (mPendingTasks.load(std::memory_order_acquire) > 0) {
        while (auto task = mThreadData[id].WorkQueue.Pop()) {
            mPendingTasks.fetch_sub(1, std::memory_order_release);
            task();
        }
        // 以下是从其他队列中窃取任务的实现
        // ...
    }
}

这样是否足够?我知道这样做每个线程可能会在一个没有任务可执行的情况下一直循环,但也许,仅仅可能,它会偶然地从另一个线程那里窃取到一个任务。

英文:

(as suggested by Aconcagua):

So by removing the semaphore from the ThreadData struct and adding a member variable std::counting_semaphore<> mPoolSemaphore{0}; to the threadpool (which has a really big ::max) and finally modifying the CreateTask() function to increase the above semaphore and making a loop (inside EACH thread) something like this:

while (!mDone) {
mPoolSemaphore.aquire();		
while (mPendingTasks.load(std::memory_order_acquire) &gt; 0) {
while (auto task = mThreadData[id].WorkQueue.Pop()) {
mPendingTasks.fetch_sub(1,std::memory_order_release);
task();
}
///hereafter: stealing from other queues implementation below
...
}
}

Would this suffice? I know that by doing like that each thread might take a spin through a while loop with nothing to do, but maybe, just maybe, it acidentally steals a job from another thread.

huangapple
  • 本文由 发表于 2023年5月17日 20:52:44
  • 转载请务必保留本文链接:https://go.coder-hub.com/76272337.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定