英文:
Threadpool in C
问题
I'm here to help with the Chinese translation. Here's the translated content:
我对线程不太熟悉,所以我使用了这篇文章来学习如何创建线程池。
[https://nachtimwald.com/2019/04/12/thread-pool-in-c/](https://stackoverflow.com)
问题是,当我使用大量任务时,一切都正常工作,但如果任务数量较少,那么任务处理函数不会被调用。`void worker(void *arg)`
[![奇怪的程序执行行为](https://i.stack.imgur.com/VmAxn.png)](https://i.stack.imgur.com/VmAxn.png)
例如,如果线程数为4,我创建了24个任务,它们几乎不会被执行(请参见图片)。
而且任务数量越少,函数工作的概率就越小。`void worker(void *arg)`
***我不太确定,但我觉得主要问题是线程没有足够的时间被创建,因为任务已经在队列中。
线程是使用pthread_create()函数创建的。***
**tpool.h**
```c
#ifndef __TPOOL_H__
#define __TPOOL_H__
#include <stdbool.h>
#include <stddef.h>
struct tpool;
typedef struct tpool tpool_t;
typedef void (*thread_func_t)(void *arg);
tpool_t *tpool_create(size_t num);
void tpool_destroy(tpool_t *tm);
bool tpool_add_work(tpool_t *tm, thread_func_t func, void *arg);
void tpool_wait(tpool_t *tm);
#endif /* __TPOOL_H__ */
tpool.c
#include <pthread.h>
#include "tpool.h"
// (以下省略,保持原文不变)
main.c
#include <stdlib.h>
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include "tpool.h"
static const size_t num_threads = 4;
static const size_t num_items = 24;
void worker(void *arg)
{
int *val = arg;
int old = *val;
*val += 1000;
printf("tid=%p, old=%d, val=%d\n", pthread_self(), old, *val);
if (*val % 2)
usleep(100000);
}
int main(int argc, char **argv)
{
tpool_t *tm;
int *vals;
size_t i;
tm = tpool_create(num_threads);
vals = calloc(num_items, sizeof(*vals));
for (i = 0; i < num_items; i++) {
vals[i] = i;
tpool_add_work(tm, worker, vals + i);
}
tpool_wait(tm);
free(vals);
tpool_destroy(tm);
return 0;
}
希望这可以帮助您理解代码的内容。如果您有其他翻译需求,请告诉我。
英文:
I'm new to threads, so I used this article to learn how to create a threadpool.
https://nachtimwald.com/2019/04/12/thread-pool-in-c/
The problem is that when I use a large number of tasks, everything works well, but if the number of tasks is small, then the task processing function is not called. void worker(void *arg)
For example, if the number of threads is 4, and I create 24 tasks, then they will almost never be executed. (see the picture).
And the smaller the number of tasks, the less probability the function will work. void worker(void *arg)
I'm not sure, but it seems to me that the main problem is that the thread does not have time to be created, as the task is already in the Queue.
The thread is created using pthread_create() function
tpool.h
#ifndef __TPOOL_H__
#define __TPOOL_H__
#include <stdbool.h>
#include <stddef.h>
struct tpool;
typedef struct tpool tpool_t;
typedef void (*thread_func_t)(void *arg);
tpool_t *tpool_create(size_t num);
void tpool_destroy(tpool_t *tm);
bool tpool_add_work(tpool_t *tm, thread_func_t func, void *arg);
void tpool_wait(tpool_t *tm);
#endif /* __TPOOL_H__ */
tpool.c
#include <pthread.h>
#include "tpool.h"
struct tpool_work {
thread_func_t func;
void *arg;
struct tpool_work *next;
};
typedef struct tpool_work tpool_work_t;
struct tpool {
tpool_work_t *work_first;
tpool_work_t *work_last;
pthread_mutex_t work_mutex;
pthread_cond_t work_cond;
pthread_cond_t working_cond;
size_t working_cnt;
size_t thread_cnt;
bool stop;
};
static tpool_work_t *tpool_work_create(thread_func_t func, void *arg)
{
tpool_work_t *work;
if (func == NULL)
return NULL;
work = malloc(sizeof(*work));
work->func = func;
work->arg = arg;
work->next = NULL;
return work;
}
static void tpool_work_destroy(tpool_work_t *work)
{
if (work == NULL)
return;
free(work);
}
static tpool_work_t *tpool_work_get(tpool_t *tm)
{
tpool_work_t *work;
if (tm == NULL)
return NULL;
work = tm->work_first;
if (work == NULL)
return NULL;
if (work->next == NULL) {
tm->work_first = NULL;
tm->work_last = NULL;
} else {
tm->work_first = work->next;
}
return work;
}
static void *tpool_worker(void *arg)
{
tpool_t *tm = arg;
tpool_work_t *work;
while (1) {
pthread_mutex_lock(&(tm->work_mutex));
while (tm->work_first == NULL && !tm->stop)
pthread_cond_wait(&(tm->work_cond), &(tm->work_mutex));
if (tm->stop)
break;
work = tpool_work_get(tm);
tm->working_cnt++;
pthread_mutex_unlock(&(tm->work_mutex));
if (work != NULL) {
work->func(work->arg);
tpool_work_destroy(work);
}
pthread_mutex_lock(&(tm->work_mutex));
tm->working_cnt--;
if (!tm->stop && tm->working_cnt == 0 && tm->work_first == NULL)
pthread_cond_signal(&(tm->working_cond));
pthread_mutex_unlock(&(tm->work_mutex));
}
tm->thread_cnt--;
pthread_cond_signal(&(tm->working_cond));
pthread_mutex_unlock(&(tm->work_mutex));
return NULL;
}
tpool_t *tpool_create(size_t num)
{
tpool_t *tm;
pthread_t thread;
size_t i;
if (num == 0)
num = 2;
tm = calloc(1, sizeof(*tm));
tm->thread_cnt = num;
pthread_mutex_init(&(tm->work_mutex), NULL);
pthread_cond_init(&(tm->work_cond), NULL);
pthread_cond_init(&(tm->working_cond), NULL);
tm->work_first = NULL;
tm->work_last = NULL;
for (i=0; i<num; i++) {
pthread_create(&thread, NULL, tpool_worker, tm);
pthread_detach(thread);
}
return tm;
}
void tpool_destroy(tpool_t *tm)
{
tpool_work_t *work;
tpool_work_t *work2;
if (tm == NULL)
return;
pthread_mutex_lock(&(tm->work_mutex));
work = tm->work_first;
while (work != NULL) {
work2 = work->next;
tpool_work_destroy(work);
work = work2;
}
tm->stop = true;
pthread_cond_broadcast(&(tm->work_cond));
pthread_mutex_unlock(&(tm->work_mutex));
tpool_wait(tm);
pthread_mutex_destroy(&(tm->work_mutex));
pthread_cond_destroy(&(tm->work_cond));
pthread_cond_destroy(&(tm->working_cond));
free(tm);
}
bool tpool_add_work(tpool_t *tm, thread_func_t func, void *arg)
{
tpool_work_t *work;
if (tm == NULL)
return false;
work = tpool_work_create(func, arg);
if (work == NULL)
return false;
pthread_mutex_lock(&(tm->work_mutex));
if (tm->work_first == NULL) {
tm->work_first = work;
tm->work_last = tm->work_first;
} else {
tm->work_last->next = work;
tm->work_last = work;
}
pthread_cond_broadcast(&(tm->work_cond));
pthread_mutex_unlock(&(tm->work_mutex));
return true;
}
void tpool_wait(tpool_t *tm)
{
if (tm == NULL)
return;
pthread_mutex_lock(&(tm->work_mutex));
while (1) {
if ((!tm->stop && tm->working_cnt != 0) || (tm->stop && tm->thread_cnt != 0)) {
pthread_cond_wait(&(tm->working_cond), &(tm->work_mutex));
} else {
break;
}
}
pthread_mutex_unlock(&(tm->work_mutex));
}
main.c
#include <stdlib.h>
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include "tpool.h"
static const size_t num_threads = 4;
static const size_t num_items = 24;
void worker(void *arg)
{
int *val = arg;
int old = *val;
*val += 1000;
printf("tid=%p, old=%d, val=%d\n", pthread_self(), old, *val);
if (*val%2)
usleep(100000);
}
int main(int argc, char **argv)
{
tpool_t *tm;
int *vals;
size_t i;
tm = tpool_create(num_threads);
vals = calloc(num_items, sizeof(*vals));
for (i=0; i<num_items; i++) {
vals[i] = i;
tpool_add_work(tm, worker, vals+i);
}
tpool_wait(tm);
/*for (i=0; i<num_items; i++) {
printf("%d\n", vals[i]);
}*/
free(vals);
tpool_destroy(tm);
return 0;
}
答案1
得分: 2
以下是翻译好的部分:
在tpool_worker()
中的 tm->working_cnt++;
和在 tpool_wait()
中测试 tm->working_cnt!=0
之间存在线程调度竞争。要修复它,应该在 tpool_add_work()
中将 working_cnt
在工作添加到池中时递增,而不是在工作线程在 tpool_worker()
中取消排队工作时递增。竞争可以通过将 tm->working_cnt++;
移动到 tpool_add_work()
中的 tm->work_mutex
锁定和解锁之间的某处来解决。
另外,tpool.c 需要添加 #include <stdlib.h>
这一行来声明 malloc()
、calloc()
和 free()
。
英文:
There is a thread scheduling race between tm->working_cnt++;
in tpool_worker()
and testing tm->working_cnt!=0
in tpool_wait()
. To fix it, working_cnt
should be incremented at the time the work is added to the pool in tpool_add_work()
instead of incrementing it at the time a worker thread has unqueued the work in tpool_worker()
. The race can be fixed by moving the tm->working_cnt++;
line to tpool_add_work()
somewhere between the locking and unlocking of tm->work_mutex
.
By the way, tpool.c needs the line #include <stdlib.h>
to declare malloc()
, calloc()
and free()
.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论