英文:
Multithreaded Pipelines in C with faulty pipe implementation
问题
I understand that you're working on a multithreaded pipeline with some code that's running the same function across threads. It seems you're looking for assistance or guidance regarding this code. How can I help you further?
英文:
I am trying to create a multithreaded pipeline that stores a function in multiple threads and uses pipes to communicate with each thread and function. when i run my program it runs the same function over and over again instead of an individual function i think my pipes have issues but i'm not too sure what exactly am i doing wrong?
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <stdbool.h>
int num_ints;
typedef void (*Function)(void* input, void* output);
typedef struct Pipeline {
Function* functions;
int numStages;
} Pipeline;
Pipeline* new_Pipeline() {
Pipeline *this = malloc(sizeof(Pipeline));
this->functions = NULL;
this->numStages = 0;
printf("created the pipeline\n");
return this;
}
bool Pipeline_add(Pipeline* this, Function f) {
//reallocating memory to add a stage to the functions array
this->functions = realloc(this->functions, (this->numStages +1) * sizeof(Function));
if (this->functions == NULL) {
return false;
}
else {
this->functions[this->numStages] = f;
this->numStages++;
printf("added a stage\n");
return true;
}
}
typedef struct {
Function function;
int inputPipe;
int outputPipe;
} thread_args;
void* thread_func(void* arg) {
thread_args *data = (thread_args*) arg;
//get the input and output pipes from the args parameter
int inPipe = data->inputPipe;
int outPipe = data->outputPipe;
data->function((void*)&inPipe,(void*)&outPipe);
return NULL;
}
void Pipeline_execute(Pipeline* this) {
//create threads
pthread_t threads[this->numStages];
thread_args args[this->numStages];
for (int i = 0; i < this->numStages; i++) {
//creating the pipes
int fd[2];
pipe(fd);
//creating input and output pipes for each stage
args[i].function = this->functions[i];
args[i].inputPipe = fd[0];
args[i].outputPipe = fd[1];
if (pthread_create(&threads[i], NULL, thread_func, &args[i]) != 0) {
printf("created a thread\n");
perror("pthread_create\n");
}
if (i == this->numStages -1) {
close(fd[0]);
}
}
//waiting for threads to finish
for (int i = 0; i < this->numStages; i++) {
if (pthread_join(threads[i], NULL) != 0) {
perror("pthread_join\n");
exit(1);
}
}
//closing pipes
for (int i = 0; i < this->numStages -1; i++) {
}
}
void Pipeline_free(Pipeline* this) {
free(this->functions);
free(this);
}
bool Pipeline_send(void* channel, void* buffer, size_t size) {
if ((write(*(int*)channel, buffer, size)) != -1) {
return true;
} else {
return false;
}
}
bool Pipeline_receive(void* channel, void* buffer, size_t size) {
if ((read(*(int*)channel, buffer, size)) != -1) {
return true;
} else {
return false;
}
}
//an application created to help test the implementation of pipes.
static void generateInts(void* input, void* output) {
printf("generateInts: thread %p\n", (void*) pthread_self());
for (int i = 1; i <= num_ints; i++) {
if (!Pipeline_send(output, (void*) &i, sizeof(int))) exit(EXIT_FAILURE);
}
}
static void squareInts(void* input, void* output) {
printf("squareInts: thread %p\n", (void*) pthread_self());
for (int i = 1; i <= num_ints; i++) {
int number;
if (!Pipeline_receive(input, (void*) &number, sizeof(int))) exit(EXIT_FAILURE);
int result = number * number;
if (!Pipeline_send(output, (void*) &result, sizeof(int))) exit(EXIT_FAILURE);
}
}
static void sumIntsAndPrint(void* input, void* output) {
printf("sumIntsAndPrint: thread %p\n", (void*) pthread_self());
int number = 0;
int result = 0;
for (int i = 1; i <= num_ints; i++) {
if (!Pipeline_receive(input, (void*) &number, sizeof(int))) exit (EXIT_FAILURE);
result += number;
}
printf("sumIntsAndPrint: result = %i\n", result);
}
static void cleanupExit(Pipeline *p) {
if (p) {
Pipeline_free(p);
}
exit(EXIT_FAILURE);
}
int main() {
scanf("%d", &num_ints);
printf("Setting up pipeline to calculate the sum of squares of integers 1 to %i.\n", num_ints);
Pipeline *p = new_Pipeline();
if (p == NULL) cleanupExit(p);
if (!Pipeline_add(p, generateInts)) cleanupExit(p);
if (!Pipeline_add(p, squareInts)) cleanupExit(p);
if (!Pipeline_add(p, sumIntsAndPrint)) cleanupExit(p);
Pipeline_execute(p);
Pipeline_free(p);
return 0;
}
i was expecting it to run each function on a different thread i used another piece of code to debug it and it created three different threads but it was the same function for every thread.
when i ran the application provided to check for the pipeline implementation it returned this to me
Setting up pipeline to calculate the sum of squares of integers 1 to 10.
created the pipeline
added a stage
added a stage
added a stage
generateInts: thread 0x5677640
squareInts: thread 0x5e78640
sumIntsAndPrint: thread 0x6679640
==162384==
==162384== HEAP SUMMARY:
==162384== in use at exit: 584 bytes in 4 blocks
==162384== total heap usage: 9 allocs, 5 frees, 10,096 bytes allocated
==162384==
==162384== LEAK SUMMARY:
==162384== definitely lost: 0 bytes in 0 blocks
==162384== indirectly lost: 0 bytes in 0 blocks
==162384== possibly lost: 544 bytes in 2 blocks
==162384== still reachable: 40 bytes in 2 blocks
i updated the code
答案1
得分: 1
以下是您要求的代码部分的翻译:
在 Pipeline_execute
中...
-
您不正确地使用了来自
pipe
调用的fd
。 -
您为每个线程设置了相同的输入和输出到相同的管道,所以它向自身写入(创建了一个无限数据循环?)。
-
您必须“交错”这些管道,以便阶段 0 的输出传递到阶段 1 的输入(例如)。
另外,您的线程函数在从目标函数返回后必须关闭管道单元。
这是重新设计的[工作?]代码的一部分,特别注意 Pipeline_execute
中的更改,还有 thread_func
中的更改。
以下是程序的输出:
输入整数的数量:
7
设置管道以计算整数1到7的平方和。
创建了管道
添加了一个阶段
添加了一个阶段
添加了一个阶段
generateInts: 线程 0x7fc5c10a4700
squareInts: 线程 0x7fc5c08a3700
sumIntsAndPrint: 线程 0x7fc5c00a2700
sumIntsAndPrint: 结果 = 140
请注意,这是基于我的回答:fd 泄漏,自定义 Shell 的代码,它使用了 fork
而不是 pthread_create
,但问题类似。
英文:
In Pipeline_execute
...
-
You are using the
fd
from thepipe
call incorrectly. -
You are setting the input and output for each thread to the same pipe. So, it writes to itself (creating an infinite data loop?).
-
You have to "stagger" the pipes so the output of stage 0 goes to the input of stage 1 (e.g.).
Also, your thread function must close the pipe units after returning from the target function.
Here is the refactored [working?] code. In particular, note the changes in Pipeline_execute
. Also, the changes in thread_func
.
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <stdbool.h>
#define CLOSEME(_fd) \
do { \
if (_fd < 0) \
break; \
close(_fd); \
_fd = -1; \
} while (0)
int num_ints;
typedef void (*Function) (void *input, void *output);
typedef struct Pipeline {
Function *functions;
int numStages;
} Pipeline;
Pipeline *
new_Pipeline()
{
Pipeline *this = malloc(sizeof(Pipeline));
this->functions = NULL;
this->numStages = 0;
printf("created the pipeline\n");
return this;
}
bool
Pipeline_add(Pipeline *this, Function f)
{
// reallocating memory to add a stage to the functions array
this->functions = realloc(this->functions, (this->numStages + 1) * sizeof(Function));
if (this->functions == NULL) {
return false;
}
else {
this->functions[this->numStages] = f;
this->numStages++;
printf("added a stage\n");
return true;
}
}
typedef struct {
Function function;
int inputPipe;
int outputPipe;
} thread_args;
void *
thread_func(void *arg)
{
thread_args *data = (thread_args *) arg;
// get the input and output pipes from the args parameter
int inPipe = data->inputPipe;
int outPipe = data->outputPipe;
data->function((void *) &inPipe, (void *) &outPipe);
CLOSEME(data->inputPipe);
CLOSEME(data->outputPipe);
return NULL;
}
void
Pipeline_execute(Pipeline *this)
{
// create threads
pthread_t threads[this->numStages];
thread_args args[this->numStages];
int fd[2] = { -1, -1 };
for (int i = 0; i < this->numStages; i++) {
// use input side from _prior_ stage for our input
args[i].inputPipe = fd[0];
// last stage has _no_ output
if (i == this->numStages - 1) {
fd[0] = -1;
fd[1] = -1;
}
else
pipe(fd);
// set output for _this_ stage from the pipe output
args[i].outputPipe = fd[1];
// creating input and output pipes for each stage
args[i].function = this->functions[i];
if (pthread_create(&threads[i], NULL, thread_func, &args[i]) != 0) {
printf("created a thread\n");
perror("pthread_create\n");
}
}
// waiting for threads to finish
for (int i = 0; i < this->numStages; i++) {
if (pthread_join(threads[i], NULL) != 0) {
perror("pthread_join\n");
exit(1);
}
}
// closing pipes
// Now done in thread_func
for (int i = 0; i < this->numStages - 1; i++) {
}
}
void
Pipeline_free(Pipeline *this)
{
free(this->functions);
free(this);
}
bool
Pipeline_send(void *channel, void *buffer, size_t size)
{
if ((write(*(int *) channel, buffer, size)) != -1) {
return true;
}
else {
return false;
}
}
bool
Pipeline_receive(void *channel, void *buffer, size_t size)
{
if ((read(*(int *) channel, buffer, size)) != -1) {
return true;
}
else {
return false;
}
}
//an application created to help test the implementation of pipes.
static void
generateInts(void *input, void *output)
{
printf("generateInts: thread %p\n", (void *) pthread_self());
for (int i = 1; i <= num_ints; i++) {
if (!Pipeline_send(output, (void *) &i, sizeof(int)))
exit(EXIT_FAILURE);
}
}
static void
squareInts(void *input, void *output)
{
printf("squareInts: thread %p\n", (void *) pthread_self());
for (int i = 1; i <= num_ints; i++) {
int number;
if (!Pipeline_receive(input, (void *) &number, sizeof(int)))
exit(EXIT_FAILURE);
int result = number * number;
if (!Pipeline_send(output, (void *) &result, sizeof(int)))
exit(EXIT_FAILURE);
}
}
static void
sumIntsAndPrint(void *input, void *output)
{
printf("sumIntsAndPrint: thread %p\n", (void *) pthread_self());
int number = 0;
int result = 0;
for (int i = 1; i <= num_ints; i++) {
if (!Pipeline_receive(input, (void *) &number, sizeof(int)))
exit(EXIT_FAILURE);
result += number;
}
printf("sumIntsAndPrint: result = %i\n", result);
}
static void
cleanupExit(Pipeline * p)
{
if (p) {
Pipeline_free(p);
}
exit(EXIT_FAILURE);
}
int
main()
{
printf("Enter number of ints:\n");
scanf("%d", &num_ints);
printf("Setting up pipeline to calculate the sum of squares of integers 1 to %i.\n", num_ints);
Pipeline *p = new_Pipeline();
if (p == NULL)
cleanupExit(p);
if (!Pipeline_add(p, generateInts))
cleanupExit(p);
if (!Pipeline_add(p, squareInts))
cleanupExit(p);
if (!Pipeline_add(p, sumIntsAndPrint))
cleanupExit(p);
Pipeline_execute(p);
Pipeline_free(p);
return 0;
}
Here is the program output:
Enter number of ints:
7
Setting up pipeline to calculate the sum of squares of integers 1 to 7.
created the pipeline
added a stage
added a stage
added a stage
generateInts: thread 0x7fc5c10a4700
squareInts: thread 0x7fc5c08a3700
sumIntsAndPrint: thread 0x7fc5c00a2700
sumIntsAndPrint: result = 140
This is based upon my answer: fd leak, custom Shell It is using fork
instead of pthread_create
but the issues were similar.
答案2
得分: 0
以下是您要翻译的代码部分:
thread_args args = {
.function = this->functions[i],
.inputPipe = fd[0],
.outputPipe = fd[1],
};
This lives on the stack, and does not persist outside of the loop it is defined in. It may have ceased to exist by the time your thread runs, or multiple threads may end up reading the same values. You're almost certainly passing the same address to all of your threads.
You're doing the equivalent of:
int a;
a = 1;
pthread_create(..., &a);
a = 2;
pthread_create(..., &a);
But there's no guarantee of when any of these threads will be run.
You need to allocate structures for each thread, that last as long as the threads do. Simplest might be to define:
thread_args args[this->num_stages];
at the top of `Pipeline_execute()` and populate that.
i.e. you want something like:
thread_args args[this->num_stages];
for (int i = 0; i < this->numStages; i++) {
//creating the pipes
int fd[2];
pipe(fd);
//creating input and output pipes for each stage
args[i].function = this->functions[i],
args[i].inputPipe = fd[0],
args[i].outputPipe = fd[1],
if (pthread_create(&threads[i], NULL, thread_func, &args[i]) != 0) {
printf("created a thread\n");
perror("pthread_create");
}
...
}
英文:
You are passing this to each of your threads:
thread_args args = {
.function = this->functions[i],
.inputPipe = fd[0],
.outputPipe = fd[1],
};
This lives on the stack, and does not persist outside of the loop it is defined in. It may have ceased to exist by the time your thread runs, or multiple threads may end up reading the same values. You're almost certainly passing the same address to all of your threads.
You're doing the equivalent of:
int a;
a = 1;
pthread_create(..., &a);
a = 2;
pthread_create(..., &a);
But there's no guarantee of when any of these threads will be run.
You need to allocate structures for each thread, that last as long as the threads do. Simplest might be to define:
thread_args args[this->num_stages];
at the top of Pipeline_execute()
and populate that.
i.e. you want something like:
thread_args args[this->num_stages];
for (int i = 0; i < this->numStages; i++) {
//creating the pipes
int fd[2];
pipe(fd);
//creating input and output pipes for each stage
args[i].function = this->functions[i],
args[i].inputPipe = fd[0],
args[i].outputPipe = fd[1],
if (pthread_create(&threads[i], NULL, thread_func, &args[i]) != 0) {
printf("created a thread\n");
perror("pthread_create\n");
}
...
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论