多线程管道在 C 中使用有缺陷的管道实现

huangapple go评论58阅读模式
英文:

Multithreaded Pipelines in C with faulty pipe implementation

问题

I understand that you're working on a multithreaded pipeline with some code that's running the same function across threads. It seems you're looking for assistance or guidance regarding this code. How can I help you further?

英文:

I am trying to create a multithreaded pipeline that stores a function in multiple threads and uses pipes to communicate with each thread and function. when i run my program it runs the same function over and over again instead of an individual function i think my pipes have issues but i'm not too sure what exactly am i doing wrong?

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <stdbool.h>
int num_ints;
typedef void (*Function)(void* input, void* output);

typedef struct Pipeline {
    Function* functions;
    int numStages;
} Pipeline;
 
Pipeline* new_Pipeline() {
    Pipeline *this = malloc(sizeof(Pipeline));
    this->functions = NULL;
    this->numStages = 0;
    printf("created the pipeline\n");
    return this;   
}

bool Pipeline_add(Pipeline* this, Function f) {
    //reallocating memory to add a stage to the functions array
    this->functions = realloc(this->functions, (this->numStages +1) * sizeof(Function));
    if (this->functions == NULL) {
        return false;
    } 
    else {
        this->functions[this->numStages] = f;
        this->numStages++;
        printf("added a stage\n");
        return true;
    }
    
}

typedef struct {
    Function function;
    int inputPipe;
    int outputPipe;
} thread_args;

void* thread_func(void* arg) {
    thread_args *data = (thread_args*) arg;

    //get the input and output pipes from the args parameter
    int inPipe = data->inputPipe;
    int outPipe = data->outputPipe;
    data->function((void*)&inPipe,(void*)&outPipe);
    return NULL;
}

void Pipeline_execute(Pipeline* this) {
    //create threads 
    pthread_t threads[this->numStages];
    thread_args args[this->numStages];
    for (int i = 0; i < this->numStages; i++) {
         //creating the pipes
        int fd[2]; 
        pipe(fd);
        //creating input and output pipes for each stage
        args[i].function = this->functions[i];
        args[i].inputPipe = fd[0];
        args[i].outputPipe = fd[1];
        if (pthread_create(&threads[i], NULL, thread_func, &args[i]) != 0) {
            printf("created a thread\n");
            perror("pthread_create\n");
        }
        if (i == this->numStages -1) {
            close(fd[0]);
        }
    }
    //waiting for threads to finish
    for (int i = 0; i < this->numStages; i++) {
        if (pthread_join(threads[i], NULL) != 0) {
            perror("pthread_join\n");
            exit(1);
        }
    }
    //closing pipes
    for (int i = 0; i < this->numStages -1; i++) {
        
    }
    
}


void Pipeline_free(Pipeline* this) {
    free(this->functions);
    free(this);
}


bool Pipeline_send(void* channel, void* buffer, size_t size) {
    if ((write(*(int*)channel, buffer, size)) != -1) {
        return true;
    } else {
        return false;
    }
    
}


bool Pipeline_receive(void* channel, void* buffer, size_t size) {
    if ((read(*(int*)channel, buffer, size)) != -1) {
        return true; 
    } else {
        return false;
    }
}
//an application created to help test the implementation of pipes.

static void generateInts(void* input, void* output) {
    printf("generateInts: thread %p\n", (void*) pthread_self());
    for (int i = 1; i <= num_ints; i++) {
        if (!Pipeline_send(output, (void*) &i, sizeof(int))) exit(EXIT_FAILURE);
    }
}


static void squareInts(void* input, void* output) {
    printf("squareInts: thread %p\n", (void*) pthread_self());
    for (int i = 1; i <= num_ints; i++) {
        int number;
        if (!Pipeline_receive(input, (void*) &number, sizeof(int))) exit(EXIT_FAILURE);
        int result = number * number;
        if (!Pipeline_send(output, (void*) &result, sizeof(int))) exit(EXIT_FAILURE);
    }
}


static void sumIntsAndPrint(void* input, void* output) {
    printf("sumIntsAndPrint: thread %p\n", (void*) pthread_self());
    int number = 0;
    int result = 0;
    for (int i = 1; i <= num_ints; i++) {
        if (!Pipeline_receive(input, (void*) &number, sizeof(int))) exit (EXIT_FAILURE);
        result += number;
    }
    printf("sumIntsAndPrint: result = %i\n", result);
}

static void cleanupExit(Pipeline *p) {
    if (p) {
        Pipeline_free(p);
    }
    exit(EXIT_FAILURE);
}


int main() {
    scanf("%d", &num_ints);
    printf("Setting up pipeline to calculate the sum of squares of integers 1 to %i.\n", num_ints);

    Pipeline *p = new_Pipeline();

    if (p == NULL) cleanupExit(p);
    if (!Pipeline_add(p, generateInts)) cleanupExit(p);
    if (!Pipeline_add(p, squareInts)) cleanupExit(p);
    if (!Pipeline_add(p, sumIntsAndPrint)) cleanupExit(p);
    Pipeline_execute(p);

    Pipeline_free(p);
    return 0;
}

i was expecting it to run each function on a different thread i used another piece of code to debug it and it created three different threads but it was the same function for every thread.
when i ran the application provided to check for the pipeline implementation it returned this to me

Setting up pipeline to calculate the sum of squares of integers 1 to 10.
created the pipeline
added a stage
added a stage
added a stage
generateInts: thread 0x5677640
squareInts: thread 0x5e78640
sumIntsAndPrint: thread 0x6679640
==162384== 
==162384== HEAP SUMMARY:
==162384==     in use at exit: 584 bytes in 4 blocks
==162384==   total heap usage: 9 allocs, 5 frees, 10,096 bytes allocated
==162384==
==162384== LEAK SUMMARY:
==162384==    definitely lost: 0 bytes in 0 blocks
==162384==    indirectly lost: 0 bytes in 0 blocks
==162384==      possibly lost: 544 bytes in 2 blocks
==162384==    still reachable: 40 bytes in 2 blocks

i updated the code

答案1

得分: 1

以下是您要求的代码部分的翻译:

Pipeline_execute 中...

  1. 您不正确地使用了来自 pipe 调用的 fd

  2. 您为每个线程设置了相同的输入和输出到相同的管道,所以它向自身写入(创建了一个无限数据循环?)。

  3. 您必须“交错”这些管道,以便阶段 0 的输出传递到阶段 1 的输入(例如)。

另外,您的线程函数在从目标函数返回后必须关闭管道单元。

这是重新设计的[工作?]代码的一部分,特别注意 Pipeline_execute 中的更改,还有 thread_func 中的更改。

以下是程序的输出:

输入整数的数量:
7
设置管道以计算整数1到7的平方和。
创建了管道
添加了一个阶段
添加了一个阶段
添加了一个阶段
generateInts: 线程 0x7fc5c10a4700
squareInts: 线程 0x7fc5c08a3700
sumIntsAndPrint: 线程 0x7fc5c00a2700
sumIntsAndPrint: 结果 = 140

请注意,这是基于我的回答:fd 泄漏,自定义 Shell 的代码,它使用了 fork 而不是 pthread_create,但问题类似。

英文:

In Pipeline_execute ...

  1. You are using the fd from the pipe call incorrectly.

  2. You are setting the input and output for each thread to the same pipe. So, it writes to itself (creating an infinite data loop?).

  3. You have to "stagger" the pipes so the output of stage 0 goes to the input of stage 1 (e.g.).

Also, your thread function must close the pipe units after returning from the target function.


Here is the refactored [working?] code. In particular, note the changes in Pipeline_execute. Also, the changes in thread_func.

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <stdbool.h>

#define CLOSEME(_fd) \
	do { \
		if (_fd < 0) \
			break; \
		close(_fd); \
		_fd = -1; \
	} while (0)

int num_ints;
typedef void (*Function) (void *input, void *output);

typedef struct Pipeline {
	Function *functions;
	int numStages;
} Pipeline;

Pipeline *
new_Pipeline()
{
	Pipeline *this = malloc(sizeof(Pipeline));

	this->functions = NULL;
	this->numStages = 0;
	printf("created the pipeline\n");
	return this;
}

bool
Pipeline_add(Pipeline *this, Function f)
{
	// reallocating memory to add a stage to the functions array
	this->functions = realloc(this->functions, (this->numStages + 1) * sizeof(Function));
	if (this->functions == NULL) {
		return false;
	}
	else {
		this->functions[this->numStages] = f;
		this->numStages++;
		printf("added a stage\n");
		return true;
	}

}

typedef struct {
	Function function;
	int inputPipe;
	int outputPipe;
} thread_args;

void *
thread_func(void *arg)
{
	thread_args *data = (thread_args *) arg;

	// get the input and output pipes from the args parameter
	int inPipe = data->inputPipe;
	int outPipe = data->outputPipe;

	data->function((void *) &inPipe, (void *) &outPipe);

	CLOSEME(data->inputPipe);
	CLOSEME(data->outputPipe);

	return NULL;
}

void
Pipeline_execute(Pipeline *this)
{
	// create threads
	pthread_t threads[this->numStages];
	thread_args args[this->numStages];

	int fd[2] = { -1, -1 };

	for (int i = 0; i < this->numStages; i++) {
		// use input side from _prior_ stage for our input
		args[i].inputPipe = fd[0];

		// last stage has _no_ output
		if (i == this->numStages - 1) {
			fd[0] = -1;
			fd[1] = -1;
		}
		else
			pipe(fd);

		// set output for _this_ stage from the pipe output
		args[i].outputPipe = fd[1];

		// creating input and output pipes for each stage
		args[i].function = this->functions[i];
		if (pthread_create(&threads[i], NULL, thread_func, &args[i]) != 0) {
			printf("created a thread\n");
			perror("pthread_create\n");
		}
	}

	// waiting for threads to finish
	for (int i = 0; i < this->numStages; i++) {
		if (pthread_join(threads[i], NULL) != 0) {
			perror("pthread_join\n");
			exit(1);
		}
	}

	// closing pipes
	// Now done in thread_func
	for (int i = 0; i < this->numStages - 1; i++) {
	}
}

void
Pipeline_free(Pipeline *this)
{
	free(this->functions);
	free(this);
}

bool
Pipeline_send(void *channel, void *buffer, size_t size)
{
	if ((write(*(int *) channel, buffer, size)) != -1) {
		return true;
	}
	else {
		return false;
	}

}

bool
Pipeline_receive(void *channel, void *buffer, size_t size)
{
	if ((read(*(int *) channel, buffer, size)) != -1) {
		return true;
	}
	else {
		return false;
	}
}

//an application created to help test the implementation of pipes.

static void
generateInts(void *input, void *output)
{
	printf("generateInts: thread %p\n", (void *) pthread_self());
	for (int i = 1; i <= num_ints; i++) {
		if (!Pipeline_send(output, (void *) &i, sizeof(int)))
			exit(EXIT_FAILURE);
	}
}

static void
squareInts(void *input, void *output)
{
	printf("squareInts: thread %p\n", (void *) pthread_self());
	for (int i = 1; i <= num_ints; i++) {
		int number;

		if (!Pipeline_receive(input, (void *) &number, sizeof(int)))
			exit(EXIT_FAILURE);
		int result = number * number;

		if (!Pipeline_send(output, (void *) &result, sizeof(int)))
			exit(EXIT_FAILURE);
	}
}

static void
sumIntsAndPrint(void *input, void *output)
{
	printf("sumIntsAndPrint: thread %p\n", (void *) pthread_self());
	int number = 0;
	int result = 0;

	for (int i = 1; i <= num_ints; i++) {
		if (!Pipeline_receive(input, (void *) &number, sizeof(int)))
			exit(EXIT_FAILURE);
		result += number;
	}
	printf("sumIntsAndPrint: result = %i\n", result);
}

static void
cleanupExit(Pipeline * p)
{
	if (p) {
		Pipeline_free(p);
	}
	exit(EXIT_FAILURE);
}

int
main()
{
	printf("Enter number of ints:\n");
	scanf("%d", &num_ints);
	printf("Setting up pipeline to calculate the sum of squares of integers 1 to %i.\n", num_ints);

	Pipeline *p = new_Pipeline();

	if (p == NULL)
		cleanupExit(p);
	if (!Pipeline_add(p, generateInts))
		cleanupExit(p);
	if (!Pipeline_add(p, squareInts))
		cleanupExit(p);
	if (!Pipeline_add(p, sumIntsAndPrint))
		cleanupExit(p);
	Pipeline_execute(p);

	Pipeline_free(p);
	return 0;
}

Here is the program output:

Enter number of ints:
7
Setting up pipeline to calculate the sum of squares of integers 1 to 7.
created the pipeline
added a stage
added a stage
added a stage
generateInts: thread 0x7fc5c10a4700
squareInts: thread 0x7fc5c08a3700
sumIntsAndPrint: thread 0x7fc5c00a2700
sumIntsAndPrint: result = 140

This is based upon my answer: fd leak, custom Shell It is using fork instead of pthread_create but the issues were similar.

答案2

得分: 0

以下是您要翻译的代码部分:

thread_args args = {
    .function = this->functions[i],
    .inputPipe = fd[0],
    .outputPipe = fd[1],
};
This lives on the stack, and does not persist outside of the loop it is defined in. It may have ceased to exist by the time your thread runs, or multiple threads may end up reading the same values. You're almost certainly passing the same address to all of your threads.

You're doing the equivalent of:

int a;

a = 1;
pthread_create(..., &a);

a = 2;
pthread_create(..., &a);
But there's no guarantee of when any of these threads will be run.

You need to allocate structures for each thread, that last as long as the threads do. Simplest might be to define:

thread_args args[this->num_stages];
at the top of `Pipeline_execute()` and populate that.

i.e. you want something like:

thread_args args[this->num_stages];

for (int i = 0; i < this->numStages; i++) {
    //creating the pipes
    int fd[2]; 
    pipe(fd);
    //creating input and output pipes for each stage
    args[i].function = this->functions[i],
    args[i].inputPipe = fd[0],
    args[i].outputPipe = fd[1],

    if (pthread_create(&threads[i], NULL, thread_func, &args[i]) != 0) {
        printf("created a thread\n");
        perror("pthread_create");
    }
...
}
英文:

You are passing this to each of your threads:

        thread_args args = {
            .function = this-&gt;functions[i],
            .inputPipe = fd[0],
            .outputPipe = fd[1],
        };

This lives on the stack, and does not persist outside of the loop it is defined in. It may have ceased to exist by the time your thread runs, or multiple threads may end up reading the same values. You're almost certainly passing the same address to all of your threads.

You're doing the equivalent of:

int a;

a = 1;
pthread_create(..., &amp;a);

a = 2;
pthread_create(..., &amp;a);

But there's no guarantee of when any of these threads will be run.

You need to allocate structures for each thread, that last as long as the threads do. Simplest might be to define:

    thread_args args[this-&gt;num_stages];

at the top of Pipeline_execute() and populate that.

i.e. you want something like:

    thread_args args[this-&gt;num_stages];

    for (int i = 0; i &lt; this-&gt;numStages; i++) {
        //creating the pipes
        int fd[2]; 
        pipe(fd);
        //creating input and output pipes for each stage
        args[i].function = this-&gt;functions[i],
        args[i].inputPipe = fd[0],
        args[i].outputPipe = fd[1],

        if (pthread_create(&amp;threads[i], NULL, thread_func, &amp;args[i]) != 0) {
            printf(&quot;created a thread\n&quot;);
            perror(&quot;pthread_create\n&quot;);
        }
    ...
    }

huangapple
  • 本文由 发表于 2023年4月11日 05:59:09
  • 转载请务必保留本文链接:https://go.coder-hub.com/75981045.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定