在C++中将数据集计算分配给工作线程

huangapple go评论64阅读模式
英文:

Splitting Dataset Computations among worker threads in C++

问题

I have a 2D Vector Dataset with 1500 rows and I want to perform a computationally expensive operation on each row. I want to utilize multiple threads to accomplish this so that this executes as fast as possible.

I cant find any definitive solutions on the internet as to how this should be solved. I thought of making a thread for each row but that seems very inefficient since 1500 threads will take up a lot of unnecessary resources. Thus, it would be best to make 6 worker threads (My PC has 6 cores) and split the work.

//Computationally expensive function
double Loss(vector<double> input, vector<double> expectedOutput);

//This Calculates the Loss of all the 1500 rows in the 2D Vector and returns their average
double TotalLoss(vector<vector<double>> inputs, vector<vector<double>> expectedOutputs);

//TBD: Implement TotalLoss using multiple threads
double MultiThreadedTotalLoss(vector<vector<double>> inputs, vector<vector<double>> expectedOutputs);

I tried to use std::thread to implement the different threads but can't figure out how to split the work into 6. I thought of making 6 different vectors and dividing the original Vector but don't know if that's the best possible approach.

Any help would be appreciated.

英文:

I have a 2D Vector Dataset with 1500 rows and I want to perform a computationally expensive operation on each row. I want to utilize multiple threads to accomplish this so that this executes as fast as possible.

I cant find any definitive solutions on the internet as to how this should be solved. I thought of making a thread for each row but that seems very inefficient since 1500 threads will take up a lot of unnecessary resources. Thus, it would be best to make 6 worker threads(My PC has 6 cores) and split the work.

//Computationally expensive function
double Loss(vector&lt;double&gt; input,vector&lt;double&gt; expectedOutput);

//This Calculates the Loss of all the 1500 rows in the 2D Vector and returns their average
double TotalLoss(vector&lt;vector&lt;double&gt;&gt; inputs,vector&lt;vector&lt;double&gt;&gt; expectedOutputs);

//TBD: Implement TotalLoss using multipel threads
double MultiThreadedTotalLoss(vector&lt;vector&lt;double&gt;&gt; inputs,vector&lt;vector&lt;double&gt;&gt; expectedOutputs);

I tried to use std::thread to implement the different threads but cant figure out how to split the work into 6. I thought of making 6 different vectors and dividing the original Vector but don't know if that's the best possible approach.

Any help would be appreciated.

答案1

得分: 1

鉴于您已经有一个计算单行损失的函数,我猜测您的`TotalLoss`函数可能如下所示:

```cpp
using Matrix = std::vector<std::vector<double>>;

double TotalLoss(Matrix const &inputs, Matrix const &expected) {
    double ret = 0.0;

    for (int i=0; i<inputs.size(); i++) {
        ret += loss(inputs[i], expected[i]);
    }
    return ret;
}

假设这是相当准确的,多线程版本可能如下:

using Matrix = std::vector<std::vector<double>>;

double MultithreadedTotalLoss(Matrix const &inputs, Matrix const &expected) {
    double ret = 0.0;

    #pragma omp parallel for reduction(+:ret)
    for (int i=0; i<inputs.size(); i++) {
        ret += loss(inputs[i], expected[i]);
    }
    return ret;
}

只增加了#pragma的一行,然后就可以运行了。实际上,reduction(+:ret)部分是可选的,尽管它可以在很大程度上提高效率(它基本上告诉编译器/库保持每个线程的累加器,然后在最后将它们相加,而不是在运行时使线程争夺对单个变量的访问)。如果loss()非常昂贵,这可能不会有太大的差异。

这比显式地进行线程处理有一些优势。显而易见的优势是相对于显式编写所有线程代码来说,这相当简单。不那么明显,但通常同样重要的是,它可以/将自动查找(并使用)可用的核心数量,因此它将使用您拥有的六个核心,但如果在具有128个核心的机器上运行它,它将自动使用所有核心。从长远来看,它在继续保持涉及的基本算法易于查找、易于阅读等方面也具有优势。显式多线程很快就可能被线程管理的“东西”所主导,因此几乎难以找到执行实际工作的代码。

最大的缺点是缺乏灵活性。对于这样的代码,OpenMP可以工作得很好,但对于其他一些情况,应用起来要困难得多。


<details>
<summary>英文:</summary>

Given that you already have a function to compute the loss for one row, I&#39;m going to guess that your `TotalLoss` function would look something like this:

```cpp
using Matrix = std::vector&lt;std::vector&lt;double&gt;&gt;;

double TotalLoss(Matrix const &amp;inputs, Matrix const &amp;expected) {
    double ret = 0.0;

    for (int i=0; i&lt;inputs.size(); i++) {
        ret += loss(inputs[i], expected[i]);
    }
    return ret;
}

Assuming that's reasonably accurate, a multithreaded version could look something like this:

using Matrix = std::vector&lt;std::vector&lt;double&gt;&gt;;

double MultithreadedTotalLoss(Matrix const &amp;inputs, Matrix const &amp;expected) {
    double ret = 0.0;

    #pragma omp parallel for reduction(+:ret)
    for (int i=0; i&lt;inputs.size(); i++) {
        ret += loss(inputs[i], expected[i]);
    }
    return ret;
}

One extra line for the #pragma, and off you go. For that matter, even the reduction(+:ret) part of that is optional, though it can help efficiency a fair amount (it basically tells the compiler/library to keep a per-thread accumulator, then add those together at the end, rather that having the threads fight over access to a single variable as they're running). If loss() is really expensive, this probably won't make much difference though.

This has a few advantages over doing the threading explicitly. The obvious one is that it's pretty simple compared to writing all the threading code explicitly. Less obviously, but often nearly as important is that it can/will automatically find (and use) the number of cores available, so it'll use the six you have, but if you run it on a machine with, say, 128, it'll automatically use all of them. In the long term, it also has an advantage in continuing to keep the basic algorithm involved easy to find, easy to read, etc. Explicit multithreading can pretty quickly end up dominated by the thread management "stuff", so it's almost hard to find the code that does the real work.

The big disadvantage is lack of flexibility. For code like this, OpenMP can work really well--but for some other situations, it's much more difficult to apply.

答案2

得分: 1

你可以使用下面的线程池示例。它不是完美的,但应该给你一个想法 在C++中将数据集计算分配给工作线程

#include <iostream>
#include <thread>
#include <queue>
#include <vector>
#include <atomic>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <chrono>

using namespace std::chrono_literals;

class ThreadPool
{
public:
    ThreadPool(int no_of_threads) : m_pool(no_of_threads)
    {
        for (int i = 0; i < no_of_threads; i++)
        {
            m_pool[i] = std::thread(&ThreadPool::thread_func, this, i);
            m_pool[i].detach();
        }
    }

    void add_task(std::function<void(int)> task_fn)
    {
        std::unique_lock<std::mutex> lck(m_mutex);
        m_task_queue.push(task_fn);
        m_cv.notify_all();
    }

    void stop_processing()
    {
        m_stop_all_threads = true;
    }
private:
    void thread_func(int thread_id)
    {
        while (!m_stop_all_threads)
        {
            std::unique_lock<std::mutex> lck(m_mutex);
            // wait for some task to be added in queue
            if (!m_cv.wait_for(lck, 100us, [this]() { return !m_task_queue.empty(); }))
                continue;
            // pick up task, update queue
            auto fn = m_task_queue.front();
            m_task_queue.pop();
            lck.unlock();
            // execute task
            fn(thread_id);
            // std::this_thread::sleep_for(1us);
        }
    }

    std::vector<std::thread> m_pool;
    std::atomic<bool> m_stop_all_threads{false};
    std::mutex m_mutex;
    std::condition_variable m_cv;
    std::queue< std::function<void(int)> > m_task_queue;
};

int main()
{    
    ThreadPool pool(5);

    int i = 0;
    while (i < 100)
    {
        pool.add_task([x = i](int id) { std::cout << "This is task " << x << " in thread " << id << '\n'; });
        i++;
    }
    std::this_thread::sleep_for(5ms);
    pool.stop_processing();
}
英文:

You can use the below ThreadPool example. It's not perfect, but should give you an idea 在C++中将数据集计算分配给工作线程

#include &lt;iostream&gt;
#include &lt;thread&gt;
#include &lt;queue&gt;
#include &lt;vector&gt;
#include &lt;atomic&gt;
#include &lt;mutex&gt;
#include &lt;condition_variable&gt;
#include &lt;functional&gt;
#include &lt;chrono&gt;
using namespace std::chrono_literals;
class ThreadPool
{
public:
ThreadPool(int no_of_threads) : m_pool(no_of_threads)
{
for (int i = 0; i &lt; no_of_threads; i++)
{
m_pool[i] = std::thread(&amp;ThreadPool::thread_func, this, i);
m_pool[i].detach();
}
}
void add_task(std::function&lt;void(int)&gt; task_fn)
{
std::unique_lock&lt;std::mutex&gt; lck(m_mutex);
m_task_queue.push(task_fn);
m_cv.notify_all();
}
void stop_processing()
{
m_stop_all_threads = true;
}
private:
void thread_func(int thread_id)
{
while (!m_stop_all_threads)
{
std::unique_lock&lt;std::mutex&gt; lck(m_mutex);
// wait for some task to be added in queue
if (!m_cv.wait_for(lck, 100us, [this]() { return !m_task_queue.empty(); }))
continue;
// pick up task, update queue
auto fn = m_task_queue.front();
m_task_queue.pop();
lck.unlock();
// execute task
fn(thread_id);
// std::this_thread::sleep_for(1us);
}
}
std::vector&lt;std::thread&gt; m_pool;
std::atomic&lt;bool&gt; m_stop_all_threads{false};
std::mutex m_mutex;
std::condition_variable m_cv;
std::queue&lt; std::function&lt;void(int)&gt; &gt; m_task_queue;
};
int main()
{    
ThreadPool pool(5);
int i = 0;
while (i &lt; 100)
{
pool.add_task([x = i](int id) { std::cout &lt;&lt; &quot;This is task &quot; &lt;&lt; x &lt;&lt; &quot; in thread &quot; &lt;&lt; id &lt;&lt; &#39;\n&#39;; });
i++;
}
std::this_thread::sleep_for(5ms);
pool.stop_processing();
}

Currently, the m_task_queue is expecting a std::function&lt;void(int)&gt;. You can change it to whatever signature you require.

huangapple
  • 本文由 发表于 2023年3月8日 14:49:12
  • 转载请务必保留本文链接:https://go.coder-hub.com/75670081.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定