Threadpool in C

huangapple go评论55阅读模式
英文:

Threadpool in C

问题

I'm here to help with the Chinese translation. Here's the translated content:

我对线程不太熟悉,所以我使用了这篇文章来学习如何创建线程池。
[https://nachtimwald.com/2019/04/12/thread-pool-in-c/](https://stackoverflow.com)

问题是,当我使用大量任务时,一切都正常工作,但如果任务数量较少,那么任务处理函数不会被调用。`void worker(void *arg)`
[![奇怪的程序执行行为](https://i.stack.imgur.com/VmAxn.png)](https://i.stack.imgur.com/VmAxn.png)

例如,如果线程数为4,我创建了24个任务,它们几乎不会被执行(请参见图片)。
而且任务数量越少,函数工作的概率就越小。`void worker(void *arg)`

***我不太确定,但我觉得主要问题是线程没有足够的时间被创建,因为任务已经在队列中。
线程是使用pthread_create()函数创建的。***

**tpool.h**
```c
#ifndef __TPOOL_H__
#define __TPOOL_H__

#include <stdbool.h>
#include <stddef.h>

struct tpool;
typedef struct tpool tpool_t;

typedef void (*thread_func_t)(void *arg);

tpool_t *tpool_create(size_t num);
void tpool_destroy(tpool_t *tm);

bool tpool_add_work(tpool_t *tm, thread_func_t func, void *arg);
void tpool_wait(tpool_t *tm);

#endif /* __TPOOL_H__ */

tpool.c

#include <pthread.h>
#include "tpool.h"

// (以下省略,保持原文不变)

main.c

#include <stdlib.h>
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>

#include "tpool.h"

static const size_t num_threads = 4;
static const size_t num_items   = 24;

void worker(void *arg)
{
    int *val = arg;
    int  old = *val;

    *val += 1000;
    printf("tid=%p, old=%d, val=%d\n", pthread_self(), old, *val);

    if (*val % 2)
        usleep(100000);
}

int main(int argc, char **argv)
{
    tpool_t *tm;
    int     *vals;
    size_t   i;

    tm   = tpool_create(num_threads);
    vals = calloc(num_items, sizeof(*vals));

    for (i = 0; i < num_items; i++) {
        vals[i] = i;
        tpool_add_work(tm, worker, vals + i);
    }

    tpool_wait(tm);

    free(vals);
    tpool_destroy(tm);
    return 0;
}

希望这可以帮助您理解代码的内容。如果您有其他翻译需求,请告诉我。

英文:

I'm new to threads, so I used this article to learn how to create a threadpool.
https://nachtimwald.com/2019/04/12/thread-pool-in-c/

The problem is that when I use a large number of tasks, everything works well, but if the number of tasks is small, then the task processing function is not called. void worker(void *arg)
Threadpool in C

For example, if the number of threads is 4, and I create 24 tasks, then they will almost never be executed. (see the picture).
And the smaller the number of tasks, the less probability the function will work. void worker(void *arg)

I'm not sure, but it seems to me that the main problem is that the thread does not have time to be created, as the task is already in the Queue.
The thread is created using pthread_create() function

tpool.h

#ifndef __TPOOL_H__
#define __TPOOL_H__

#include &lt;stdbool.h&gt;
#include &lt;stddef.h&gt;

struct tpool;
typedef struct tpool tpool_t;

typedef void (*thread_func_t)(void *arg);

tpool_t *tpool_create(size_t num);
void tpool_destroy(tpool_t *tm);

bool tpool_add_work(tpool_t *tm, thread_func_t func, void *arg);
void tpool_wait(tpool_t *tm);

#endif /* __TPOOL_H__ */

tpool.c

#include &lt;pthread.h&gt;
#include &quot;tpool.h&quot;


struct tpool_work {
    thread_func_t      func;
    void              *arg;
    struct tpool_work *next;
};
typedef struct tpool_work tpool_work_t;

struct tpool {
    tpool_work_t    *work_first;
    tpool_work_t    *work_last;
    pthread_mutex_t  work_mutex;
    pthread_cond_t   work_cond;
    pthread_cond_t   working_cond;
    size_t           working_cnt;
    size_t           thread_cnt;
    bool             stop;
};

static tpool_work_t *tpool_work_create(thread_func_t func, void *arg)
{
    tpool_work_t *work;

    if (func == NULL)
        return NULL;

    work       = malloc(sizeof(*work));
    work-&gt;func = func;
    work-&gt;arg  = arg;
    work-&gt;next = NULL;
    return work;
}

static void tpool_work_destroy(tpool_work_t *work)
{
    if (work == NULL)
        return;
    free(work);
}


static tpool_work_t *tpool_work_get(tpool_t *tm)
{
    tpool_work_t *work;

    if (tm == NULL)
        return NULL;

    work = tm-&gt;work_first;
    if (work == NULL)
        return NULL;

    if (work-&gt;next == NULL) {
        tm-&gt;work_first = NULL;
        tm-&gt;work_last  = NULL;
    } else {
        tm-&gt;work_first = work-&gt;next;
    }

    return work;
}


static void *tpool_worker(void *arg)
{
    tpool_t      *tm = arg;
    tpool_work_t *work;

    while (1) {
        pthread_mutex_lock(&amp;(tm-&gt;work_mutex));

        while (tm-&gt;work_first == NULL &amp;&amp; !tm-&gt;stop)
            pthread_cond_wait(&amp;(tm-&gt;work_cond), &amp;(tm-&gt;work_mutex));

        if (tm-&gt;stop)
            break;

        work = tpool_work_get(tm);
        tm-&gt;working_cnt++;
        pthread_mutex_unlock(&amp;(tm-&gt;work_mutex));

        if (work != NULL) {
            work-&gt;func(work-&gt;arg);
            tpool_work_destroy(work);
        }

        pthread_mutex_lock(&amp;(tm-&gt;work_mutex));
        tm-&gt;working_cnt--;
        if (!tm-&gt;stop &amp;&amp; tm-&gt;working_cnt == 0 &amp;&amp; tm-&gt;work_first == NULL)
            pthread_cond_signal(&amp;(tm-&gt;working_cond));
        pthread_mutex_unlock(&amp;(tm-&gt;work_mutex));
    }

    tm-&gt;thread_cnt--;
    pthread_cond_signal(&amp;(tm-&gt;working_cond));
    pthread_mutex_unlock(&amp;(tm-&gt;work_mutex));
    return NULL;
}


tpool_t *tpool_create(size_t num)
{
    tpool_t   *tm;
    pthread_t  thread;
    size_t     i;

    if (num == 0)
        num = 2;

    tm             = calloc(1, sizeof(*tm));
    tm-&gt;thread_cnt = num;

    pthread_mutex_init(&amp;(tm-&gt;work_mutex), NULL);
    pthread_cond_init(&amp;(tm-&gt;work_cond), NULL);
    pthread_cond_init(&amp;(tm-&gt;working_cond), NULL);

    tm-&gt;work_first = NULL;
    tm-&gt;work_last  = NULL;

    for (i=0; i&lt;num; i++) {
        pthread_create(&amp;thread, NULL, tpool_worker, tm);
        pthread_detach(thread);
    }

    return tm;
}


void tpool_destroy(tpool_t *tm)
{
    tpool_work_t *work;
    tpool_work_t *work2;

    if (tm == NULL)
        return;

    pthread_mutex_lock(&amp;(tm-&gt;work_mutex));
    work = tm-&gt;work_first;
    while (work != NULL) {
        work2 = work-&gt;next;
        tpool_work_destroy(work);
        work = work2;
    }
    tm-&gt;stop = true;
    pthread_cond_broadcast(&amp;(tm-&gt;work_cond));
    pthread_mutex_unlock(&amp;(tm-&gt;work_mutex));

    tpool_wait(tm);

    pthread_mutex_destroy(&amp;(tm-&gt;work_mutex));
    pthread_cond_destroy(&amp;(tm-&gt;work_cond));
    pthread_cond_destroy(&amp;(tm-&gt;working_cond));

    free(tm);
}


bool tpool_add_work(tpool_t *tm, thread_func_t func, void *arg)
{
    tpool_work_t *work;

    if (tm == NULL)
        return false;

    work = tpool_work_create(func, arg);
    if (work == NULL)
        return false;

    pthread_mutex_lock(&amp;(tm-&gt;work_mutex));
    if (tm-&gt;work_first == NULL) {
        tm-&gt;work_first = work;
        tm-&gt;work_last  = tm-&gt;work_first;
    } else {
        tm-&gt;work_last-&gt;next = work;
        tm-&gt;work_last       = work;
    }

    pthread_cond_broadcast(&amp;(tm-&gt;work_cond));
    pthread_mutex_unlock(&amp;(tm-&gt;work_mutex));

    return true;
}


void tpool_wait(tpool_t *tm)
{
    if (tm == NULL)
        return;

    pthread_mutex_lock(&amp;(tm-&gt;work_mutex));
    while (1) {
        if ((!tm-&gt;stop &amp;&amp; tm-&gt;working_cnt != 0) || (tm-&gt;stop &amp;&amp; tm-&gt;thread_cnt != 0)) {
            pthread_cond_wait(&amp;(tm-&gt;working_cond), &amp;(tm-&gt;work_mutex));
        } else {
            break;
        }
    }
    pthread_mutex_unlock(&amp;(tm-&gt;work_mutex));
}

main.c

#include &lt;stdlib.h&gt;
#include &lt;stdio.h&gt;
#include &lt;pthread.h&gt;
#include &lt;unistd.h&gt;

#include &quot;tpool.h&quot;

static const size_t num_threads = 4;
static const size_t num_items   = 24;

void worker(void *arg)
{
    int *val = arg;
    int  old = *val;

    *val += 1000;
    printf(&quot;tid=%p, old=%d, val=%d\n&quot;, pthread_self(), old, *val);

    if (*val%2)
        usleep(100000);
}

int main(int argc, char **argv)
{
    tpool_t *tm;
    int     *vals;
    size_t   i;

    tm   = tpool_create(num_threads);
    vals = calloc(num_items, sizeof(*vals));

    for (i=0; i&lt;num_items; i++) {
        vals[i] = i;
        tpool_add_work(tm, worker, vals+i);
    }

    tpool_wait(tm);

    /*for (i=0; i&lt;num_items; i++) {
        printf(&quot;%d\n&quot;, vals[i]);
    }*/

    free(vals);
    tpool_destroy(tm);
    return 0;
}

答案1

得分: 2

以下是翻译好的部分:

tpool_worker()中的 tm->working_cnt++; 和在 tpool_wait() 中测试 tm->working_cnt!=0 之间存在线程调度竞争。要修复它,应该在 tpool_add_work() 中将 working_cnt 在工作添加到池中时递增,而不是在工作线程在 tpool_worker() 中取消排队工作时递增。竞争可以通过将 tm->working_cnt++; 移动到 tpool_add_work() 中的 tm->work_mutex 锁定和解锁之间的某处来解决。

另外,tpool.c 需要添加 #include <stdlib.h> 这一行来声明 malloc()calloc()free()

英文:

There is a thread scheduling race between tm-&gt;working_cnt++; in tpool_worker() and testing tm-&gt;working_cnt!=0 in tpool_wait(). To fix it, working_cnt should be incremented at the time the work is added to the pool in tpool_add_work() instead of incrementing it at the time a worker thread has unqueued the work in tpool_worker(). The race can be fixed by moving the tm-&gt;working_cnt++; line to tpool_add_work() somewhere between the locking and unlocking of tm-&gt;work_mutex.


By the way, tpool.c needs the line #include &lt;stdlib.h&gt; to declare malloc(), calloc() and free().

huangapple
  • 本文由 发表于 2023年6月12日 23:01:31
  • 转载请务必保留本文链接:https://go.coder-hub.com/76457930.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定