英文:
How to implement functional parallel-map in c?
问题
函数映射是一种将回调函数应用于数组中的每个元素并返回回调返回值列表的函数。例如,在伪代码中,map(["hello", "world"], fn(x) => x + " meow")
将返回 ["hello meow", "world meow"]
。
由于在C中可以将函数指针作为参数传递,因此可以按如下方式实现函数映射:
void** fp_map(void** array, size_t len, void* (*execute)(void*))
{
// 为返回项分配内存
void** returns = malloc(sizeof(void*) * len);
if (returns == NULL) err(42, "Malloc failed, buy more ram");
// 映射值
for (int i = 0; i < len; ++i)
returns[i] = execute(array[i]);
return returns;
}
如果我在主方法中写以下匿名函数,它将把 [“hello”, “world”]
映射为 [“hello meow”, “world meow”]
:
int main() {
char* arr[] = {"hello", "world"};
char** arr2 = fp_map((void**) arr, 2, ({ void* _func_ (void* x) {
char* buf = malloc(sizeof(char) * (strlen(x) + 7));
strcpy(buf, x);
strcat(buf, " meow");
return buf;
}; _func_; }));
for (int i = 0; i < 3; ++i)
printf("%s\n", arr2[i]);
}
现在,我想要实现一个并行映射以加速操作。由于这是纯函数的,对具有相同参数的回调函数的调用将返回相同的返回值。如何使用多线程使每次调用 execute()
都在不同的线程上运行,但仍然将结果按顺序返回到数组中?
英文:
A functional map is a function that applies a callback function to each element in an array and returns a list of callback return values. For example, in pseudocode, map(["hello", "world"], fn(x) => x + " meow")
would return ["hello meow", "world meow"]
Since function pointers can be passed as parameters in C, it is possible to implement a functional map like below:
void** fp_map(void** array, size_t len, void* (*execute)(void*))
{
// Allocate memory for return items
void** returns = malloc(sizeof(void*) * len);
if (returns == NULL) err(42, "Malloc failed, buy more ram");
// Map values
for (int i = 0; i < len; ++i)
returns[i] = execute(array[i]);
return returns;
}
If I write the following anonymous function in my main method, it would map ["hello", "world"]
to ["hello meow", "world meow"]
:
int main() {
char* arr[] = {"hello", "world"};
char** arr2 = fp_map((void**) arr, 2, ({ void* _func_ (void* x) {
char* buf = malloc(sizeof(char) * (strlen(x) + 7));
strcpy(buf, x);
strcat(buf, " meow");
return buf;
}; _func_; }));
for (int i = 0; i < 3; ++i)
printf("%s\n", arr2[i]);
}
Now, I want to implement a parallel map to speed things up. Since this is purely functional, calls to the callback function with the same parameters would return the same return values. How can I use multithreading so that each call to execute()
runs on a different thread, but still have the results return in an ordered array?
答案1
得分: 1
我已经写好了以下的代码,其中我为线程创建了上下文,然后针对每次计算都生成了一个单独的线程。等待所有线程完成并返回值。
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <threads.h>
#define ERRORON(expr) \
do { \
if (expr) { \
fprintf(stderr, "ERROR: %s\n", #expr); \
exit(1); \
} \
} while (0)
#define ARRLEN(x) (sizeof(x) / sizeof(*x))
struct mythread_context {
void **returns;
void *(*execute)(void *);
void **array;
size_t i;
};
int mythread(void *arg) {
const struct mythread_context *ctx = arg;
// 执行要执行的操作。
ctx->returns[ctx->i] = ctx->execute(ctx->array[ctx->i]);
return 0;
}
void **fp_map(void **array, size_t len, void *(*execute)(void *)) {
// 为返回项目分配内存
void **returns = malloc(sizeof(*returns) * len);
ERRORON(!returns);
// 为线程和上下文分配内存。
thrd_t *threads = malloc(sizeof(*threads) * len);
ERRORON(!threads);
struct mythread_context *ctxs = malloc(sizeof(*ctxs) * len);
ERRORON(!ctxs);
for (size_t i = 0; i < len; ++i) {
const struct mythread_context thisctx = {
.returns = returns,
.execute = execute,
.array = array,
.i = i,
};
ctxs[i] = thisctx;
// 为每个返回值、执行和数组索引启动一个线程。
int ret = thrd_create(&threads[i], mythread, &ctxs[i]);
ERRORON(ret != thrd_success);
}
for (size_t i = 0; i < len; ++i) {
// 等待所有线程完成。它们将分别并行分配到返回值。
int ret = thrd_join(threads[i], NULL);
ERRORON(ret != thrd_success);
}
free(threads);
free(ctxs);
return returns;
}
void *appnend_to_char(void *x) {
char *buf = malloc(sizeof(char) * (strlen(x) + 7));
strcpy(buf, x);
strcat(buf, " meow");
return buf;
}
int main() {
const char *arr[] = {"hello", "world"};
char **arr2 = (char **)fp_map((void **)arr, ARRLEN(arr), appnend_to_char);
for (size_t i = 0; i < ARRLEN(arr); ++i) {
printf("%s\n", arr2[i]);
}
// 释放内存
for (size_t i = 0; i < ARRLEN(arr); ++i) {
free(arr2[i]);
}
free(arr2);
}
或者,您可以无缝集成OpenMP,只需使用以下代码:
void **fp_map(void **array, size_t len, void *(*execute)(void *)) {
void **returns = malloc(sizeof(*returns) * len);
ERRORON(!returns);
size_t i;
#pragma omp parallel for
for (size_t i = 0; i < len; ++i) {
returns[i] = execute(array[i]);
}
return returns;
}
备注:
({
是GCC扩展,_不_是C语言的一部分。在C编程语言中没有lambda表达式或匿名函数。- 我不确定是应该使用C11的
threads.h
还是更倾向于POSIX的pthread。这两者的接口非常相似。 - 上下文相对较大,可以进行优化。
malloc
的数量也可以进行优化。
英文:
I have written the following code, in which I create a context for the thread, then for every calculation I spawn a separate thread. Join all the threads and return the value.
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <threads.h>
#define ERRORON(expr) \
do { \
if (expr) { \
fprintf(stderr, "ERROR: %s\n", #expr); \
exit(1); \
} \
} while (0)
#define ARRLEN(x) (sizeof(x) / sizeof(*x))
struct mythread_context {
void **returns;
void *(*execute)(void *);
void **array;
size_t i;
};
int mythread(void *arg) {
const struct mythread_context *ctx = arg;
// Execute the stuff to execute.
ctx->returns[ctx->i] = ctx->execute(ctx->array[ctx->i]);
return 0;
}
void **fp_map(void **array, size_t len, void *(*execute)(void *)) {
// Allocate memory for return items
void **returns = malloc(sizeof(*returns) * len);
ERRORON(!returns);
// Allocate memory for threads and contextes.
thrd_t *threads = malloc(sizeof(*threads) * len);
ERRORON(!threads);
struct mythread_context *ctxs = malloc(sizeof(*ctxs) * len);
ERRORON(!ctxs);
for (size_t i = 0; i < len; ++i) {
const struct mythread_context thisctx = {
.returns = returns,
.execute = execute,
.array = array,
.i = i,
};
ctxs[i] = thisctx;
// Start a thread for every returns, execute and array index.
int ret = thrd_create(&threads[i], mythread, &ctxs[i]);
ERRORON(ret != thrd_success);
}
for (size_t i = 0; i < len; ++i) {
// Join all threads. They will assing to returns separately concurrently.
int ret = thrd_join(threads[i], NULL);
ERRORON(ret != thrd_success);
}
free(threads);
free(ctxs);
return returns;
}
void *appnend_to_char(void *x) {
char *buf = malloc(sizeof(char) * (strlen(x) + 7));
strcpy(buf, x);
strcat(buf, " meow");
return buf;
}
int main() {
const char *arr[] = {"hello", "world"};
char **arr2 = (char **)fp_map((void **)arr, ARRLEN(arr), appnend_to_char);
for (size_t i = 0; i < ARRLEN(arr); ++i) {
printf("%s\n", arr2[i]);
}
// free memory
for (size_t i = 0; i < ARRLEN(arr); ++i) {
free(arr2[i]);
}
free(arr2);
}
Alternatively, you can just seamlessly integrate with OpenMP, with just:
void **fp_map(void **array, size_t len, void *(*execute)(void *)) {
void **returns = malloc(sizeof(*returns) * len);
ERRORON(!returns);
size_t i;
#pragma omp parallel for
for (size_t i = 0; i < len; ++i) {
returns[i] = execute(array[i]);
}
return returns;
}
Notes:
({
is a GCC extension, not part of C language. There are no lambdas or anonymous functions in C programming langauge.- I am not sure if C11 threads.h should be used or rather POSIX pthreads should be preferred. The interface is very similar.
- The context is rather big, it could be optimized. The count of
malloc
coudl also be optimized.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论