
huangapple go评论56阅读模式

Splitting Dataset Computations among worker threads in C++


I have a 2D Vector Dataset with 1500 rows and I want to perform a computationally expensive operation on each row. I want to utilize multiple threads to accomplish this so that this executes as fast as possible.

I cant find any definitive solutions on the internet as to how this should be solved. I thought of making a thread for each row but that seems very inefficient since 1500 threads will take up a lot of unnecessary resources. Thus, it would be best to make 6 worker threads (My PC has 6 cores) and split the work.

//Computationally expensive function
double Loss(vector<double> input, vector<double> expectedOutput);

//This Calculates the Loss of all the 1500 rows in the 2D Vector and returns their average
double TotalLoss(vector<vector<double>> inputs, vector<vector<double>> expectedOutputs);

//TBD: Implement TotalLoss using multiple threads
double MultiThreadedTotalLoss(vector<vector<double>> inputs, vector<vector<double>> expectedOutputs);

I tried to use std::thread to implement the different threads but can't figure out how to split the work into 6. I thought of making 6 different vectors and dividing the original Vector but don't know if that's the best possible approach.

Any help would be appreciated.


I have a 2D Vector Dataset with 1500 rows and I want to perform a computationally expensive operation on each row. I want to utilize multiple threads to accomplish this so that this executes as fast as possible.

I cant find any definitive solutions on the internet as to how this should be solved. I thought of making a thread for each row but that seems very inefficient since 1500 threads will take up a lot of unnecessary resources. Thus, it would be best to make 6 worker threads(My PC has 6 cores) and split the work.

//Computationally expensive function
double Loss(vector&lt;double&gt; input,vector&lt;double&gt; expectedOutput);

//This Calculates the Loss of all the 1500 rows in the 2D Vector and returns their average
double TotalLoss(vector&lt;vector&lt;double&gt;&gt; inputs,vector&lt;vector&lt;double&gt;&gt; expectedOutputs);

//TBD: Implement TotalLoss using multipel threads
double MultiThreadedTotalLoss(vector&lt;vector&lt;double&gt;&gt; inputs,vector&lt;vector&lt;double&gt;&gt; expectedOutputs);

I tried to use std::thread to implement the different threads but cant figure out how to split the work into 6. I thought of making 6 different vectors and dividing the original Vector but don't know if that's the best possible approach.

Any help would be appreciated.


得分: 1


using Matrix = std::vector<std::vector<double>>;

double TotalLoss(Matrix const &inputs, Matrix const &expected) {
    double ret = 0.0;

    for (int i=0; i<inputs.size(); i++) {
        ret += loss(inputs[i], expected[i]);
    return ret;


using Matrix = std::vector<std::vector<double>>;

double MultithreadedTotalLoss(Matrix const &inputs, Matrix const &expected) {
    double ret = 0.0;

    #pragma omp parallel for reduction(+:ret)
    for (int i=0; i<inputs.size(); i++) {
        ret += loss(inputs[i], expected[i]);
    return ret;





Given that you already have a function to compute the loss for one row, I&#39;m going to guess that your `TotalLoss` function would look something like this:

using Matrix = std::vector&lt;std::vector&lt;double&gt;&gt;;

double TotalLoss(Matrix const &amp;inputs, Matrix const &amp;expected) {
    double ret = 0.0;

    for (int i=0; i&lt;inputs.size(); i++) {
        ret += loss(inputs[i], expected[i]);
    return ret;

Assuming that's reasonably accurate, a multithreaded version could look something like this:

using Matrix = std::vector&lt;std::vector&lt;double&gt;&gt;;

double MultithreadedTotalLoss(Matrix const &amp;inputs, Matrix const &amp;expected) {
    double ret = 0.0;

    #pragma omp parallel for reduction(+:ret)
    for (int i=0; i&lt;inputs.size(); i++) {
        ret += loss(inputs[i], expected[i]);
    return ret;

One extra line for the #pragma, and off you go. For that matter, even the reduction(+:ret) part of that is optional, though it can help efficiency a fair amount (it basically tells the compiler/library to keep a per-thread accumulator, then add those together at the end, rather that having the threads fight over access to a single variable as they're running). If loss() is really expensive, this probably won't make much difference though.

This has a few advantages over doing the threading explicitly. The obvious one is that it's pretty simple compared to writing all the threading code explicitly. Less obviously, but often nearly as important is that it can/will automatically find (and use) the number of cores available, so it'll use the six you have, but if you run it on a machine with, say, 128, it'll automatically use all of them. In the long term, it also has an advantage in continuing to keep the basic algorithm involved easy to find, easy to read, etc. Explicit multithreading can pretty quickly end up dominated by the thread management "stuff", so it's almost hard to find the code that does the real work.

The big disadvantage is lack of flexibility. For code like this, OpenMP can work really well--but for some other situations, it's much more difficult to apply.


得分: 1

你可以使用下面的线程池示例。它不是完美的,但应该给你一个想法 在C++中将数据集计算分配给工作线程

#include <iostream>
#include <thread>
#include <queue>
#include <vector>
#include <atomic>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <chrono>

using namespace std::chrono_literals;

class ThreadPool
    ThreadPool(int no_of_threads) : m_pool(no_of_threads)
        for (int i = 0; i < no_of_threads; i++)
            m_pool[i] = std::thread(&ThreadPool::thread_func, this, i);

    void add_task(std::function<void(int)> task_fn)
        std::unique_lock<std::mutex> lck(m_mutex);

    void stop_processing()
        m_stop_all_threads = true;
    void thread_func(int thread_id)
        while (!m_stop_all_threads)
            std::unique_lock<std::mutex> lck(m_mutex);
            // wait for some task to be added in queue
            if (!m_cv.wait_for(lck, 100us, [this]() { return !m_task_queue.empty(); }))
            // pick up task, update queue
            auto fn = m_task_queue.front();
            // execute task
            // std::this_thread::sleep_for(1us);

    std::vector<std::thread> m_pool;
    std::atomic<bool> m_stop_all_threads{false};
    std::mutex m_mutex;
    std::condition_variable m_cv;
    std::queue< std::function<void(int)> > m_task_queue;

int main()
    ThreadPool pool(5);

    int i = 0;
    while (i < 100)
        pool.add_task([x = i](int id) { std::cout << "This is task " << x << " in thread " << id << '\n'; });

You can use the below ThreadPool example. It's not perfect, but should give you an idea 在C++中将数据集计算分配给工作线程

#include &lt;iostream&gt;
#include &lt;thread&gt;
#include &lt;queue&gt;
#include &lt;vector&gt;
#include &lt;atomic&gt;
#include &lt;mutex&gt;
#include &lt;condition_variable&gt;
#include &lt;functional&gt;
#include &lt;chrono&gt;
using namespace std::chrono_literals;
class ThreadPool
ThreadPool(int no_of_threads) : m_pool(no_of_threads)
for (int i = 0; i &lt; no_of_threads; i++)
m_pool[i] = std::thread(&amp;ThreadPool::thread_func, this, i);
void add_task(std::function&lt;void(int)&gt; task_fn)
std::unique_lock&lt;std::mutex&gt; lck(m_mutex);
void stop_processing()
m_stop_all_threads = true;
void thread_func(int thread_id)
while (!m_stop_all_threads)
std::unique_lock&lt;std::mutex&gt; lck(m_mutex);
// wait for some task to be added in queue
if (!m_cv.wait_for(lck, 100us, [this]() { return !m_task_queue.empty(); }))
// pick up task, update queue
auto fn = m_task_queue.front();
// execute task
// std::this_thread::sleep_for(1us);
std::vector&lt;std::thread&gt; m_pool;
std::atomic&lt;bool&gt; m_stop_all_threads{false};
std::mutex m_mutex;
std::condition_variable m_cv;
std::queue&lt; std::function&lt;void(int)&gt; &gt; m_task_queue;
int main()
ThreadPool pool(5);
int i = 0;
while (i &lt; 100)
pool.add_task([x = i](int id) { std::cout &lt;&lt; &quot;This is task &quot; &lt;&lt; x &lt;&lt; &quot; in thread &quot; &lt;&lt; id &lt;&lt; &#39;\n&#39;; });

Currently, the m_task_queue is expecting a std::function&lt;void(int)&gt;. You can change it to whatever signature you require.

  • 本文由 发表于 2023年3月8日 14:49:12
  • 转载请务必保留本文链接:https://go.coder-hub.com/75670081.html



:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:
