C 互斥锁被无限阻塞

huangapple go评论58阅读模式
英文:

C mutex getting blocked infinitely

问题

I see you have a C code that implements a queue and multi-threading. It seems that you are facing issues with getting stuck during insertion and are unsure about whether to use thrd_join. Let's address these concerns.

  1. Stuck During Insertion:
    It's crucial to identify why the code gets stuck during insertion. Here are a few common issues you might want to check:

    • Deadlocks: Ensure that there are no deadlocks occurring due to mutex or condition variables.
    • Thread Termination: Make sure that your threads are terminating correctly and not preventing other threads from proceeding.
  2. Use of thrd_join:
    In your code, you have the line commented out for joining the thread:

    //if(thrd_join(thread->id, NULL)){
    //    printf("FALLO AL UNIR\n");
    //}
    

    If your main thread should wait for the worker threads to finish their work before exiting, then you should use thrd_join. However, it's essential to make sure that your worker threads do eventually finish their tasks and exit, or else thrd_join can result in a deadlock.

It's important to thoroughly debug your code to identify why it's getting stuck during insertion. You might consider using debugging tools or print statements to trace the execution and see where the issue is occurring. Additionally, double-check your logic and thread synchronization mechanisms to ensure they are working as expected.

If you have specific issues or error messages, providing more details would help in giving more precise assistance.

英文:

So I gotta implement a queue in C with a system to protect it and make it accessible through multiple threads.
To do so, I've used the producers-consumers logic:

#include <stdlib.h>
#include <threads.h>
#include <stdbool.h>
#include "queue.h"
#include <stdio.h>

int metidos=1;
int quitados=1;


// circular array
typedef struct _queue {
    int size;
    int used;
    int first;
    void **data;
    mtx_t * mutex;
    cnd_t * full;
    cnd_t * empty;
    bool terminado;
} _queue;

void q_terminar(queue q){
    printf("Entra en q_terminar\n");
    mtx_lock(q->mutex);
    q->terminado=true;
    mtx_unlock(q->mutex);
    cnd_broadcast(q->empty);
}

queue q_create(int size) {
    queue q = malloc(sizeof(_queue));

    q->size  = size;
    q->used  = 0;
    q->first = 0;
    q->data  = malloc(size * sizeof(void *));
    q->mutex = malloc(sizeof (mtx_t));
    q->full = malloc(sizeof(cnd_t));
    q->empty = malloc(sizeof(cnd_t));
    q->terminado=false;
    mtx_init(q->mutex, mtx_plain);
    cnd_init(q->full);
    cnd_init(q->empty);

    return q;
}

int q_elements(queue q) {
    mtx_lock(q->mutex);
    int res= q->used;
    mtx_unlock(q->mutex);
    return res;
}

int q_insert(queue q, void *elem) {
    if(q->terminado==true){
        return 1;
    }
    printf("Entra en insert\n");
    mtx_lock(q->mutex);
    while(q->used == q->size){
        printf("ESperando para insertar\n");
        cnd_wait(q->full, q->mutex);
        printf("Recibiendo señal para insertar\n");
    }
    //if(q->size == q->used) return -1;
    q->data[(q->first + q->used) % q->size] = elem;
    q->used++;

    printf("Insertado, este es el elemento %d en ser insertado\n",metidos);
    printf("En la cola hay %d elementos\n",q->used);
    metidos++;
    if(q->used == 1){
        cnd_broadcast(q->empty);
        printf("Enviando señal para despertar a los que borran\n");
    }

    mtx_unlock(q->mutex);

    return 0;
}

void *q_remove(queue q) {
    printf("Entra en remove\n");
    void *res;
    mtx_lock(q->mutex);
    if(q->terminado == true){
        mtx_unlock(q->mutex);
        return NULL;
    }
    while(q->used ==0 && q->terminado==false){
        printf("Esperando para quitar\n");
        cnd_wait(q->empty, q->mutex);
        printf("Recibiendo señal para quitar\n");
    }

    if(q->used == 0) {
        mtx_unlock(q->mutex);
        return NULL;
    }
    res = q->data[q->first];
    q->first = (q->first + 1) % q->size;
    q->used--;
    cnd_signal(q->full);
    printf("Quitado, este es el elemento %d en ser quitado\n", quitados);
    printf("En la cola hay %d elementos\n",q->used);
    quitados++;
    if(q->used == q->size-1){
        cnd_broadcast(q->full);
        printf("Enviando señal para despertar a los que insertan\n");
    }

    mtx_unlock(q->mutex);
    return res;
}

void q_destroy(queue q) {
    mtx_destroy(q->mutex);
    cnd_destroy(q->full);
    cnd_destroy(q->empty);
    free(q->full);
    free(q->empty);
    free(q->mutex);
    free(q->data);
    free(q);
}

Now, in the main file, I must separate the call the function "sum" does to get_entries in a separate thread, so I've made a function called get_entries_thread

#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <dirent.h>
#include <stdlib.h>
#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <openssl/evp.h>
#include <threads.h>
#include "options.h"
#include "queue.h"
#define MAX_PATH 1024
#define BLOCK_SIZE (10*1024*1024)
#define MAX_LINE_LENGTH (MAX_PATH * 2)
struct file_md5 {
char *file;
unsigned char *hash;
unsigned int hash_size;
};
struct thread_get_entries_args{
int id;
char *dir;
queue q;
};
struct thread_get_entries_info {
thrd_t id;
struct thread_get_entries_args * entries_args;
cnd_t condicion;
};
thrd_t global;
void get_entries(char *dir, queue q);
void print_hash(struct file_md5 *md5) {
for(int i = 0; i < md5->hash_size; i++) {
printf("%02hhx", md5->hash[i]);
}
}
void read_hash_file(char *file, char *dir, queue q) {
FILE *fp;
char line[MAX_LINE_LENGTH];
char *file_name, *hash;
int hash_len;
if((fp = fopen(file, "r")) == NULL) {
printf("Could not open %s : %s\n", file, strerror(errno));
exit(0);
}
while(fgets(line, MAX_LINE_LENGTH, fp) != NULL) {
char *field_break;
struct file_md5 *md5 = malloc(sizeof(struct file_md5));
if((field_break = strstr(line, ": ")) == NULL) {
printf("Malformed md5 file\n");
exit(0);
}
*field_break = '\0';
file_name = line;
hash      = field_break + 2;
hash_len  = strlen(hash);
md5->file      = malloc(strlen(file_name) + strlen(dir) + 2);
sprintf(md5->file, "%s/%s", dir, file_name);
md5->hash      = malloc(hash_len / 2);
md5->hash_size = hash_len / 2;
for(int i = 0; i < hash_len; i+=2)
sscanf(hash + i, "%02hhx", &md5->hash[i / 2]);
printf("Se llama a q_insert\n");
q_insert(q, md5);
}
fclose(fp);
}
void sum_file(struct file_md5 *md5) {
EVP_MD_CTX *mdctx;
int nbytes;
FILE *fp;
char *buf;
if((fp = fopen(md5->file, "r")) == NULL) {
printf("Could not open %s\n", md5->file);
return;
}
buf = malloc(BLOCK_SIZE);
const EVP_MD *md = EVP_get_digestbyname("md5");
mdctx = EVP_MD_CTX_create();
EVP_DigestInit_ex(mdctx, md, NULL);
while((nbytes = fread(buf, 1, BLOCK_SIZE, fp)) >0)
EVP_DigestUpdate(mdctx, buf, nbytes);
md5->hash = malloc(EVP_MAX_MD_SIZE);
EVP_DigestFinal_ex(mdctx, md5->hash, &md5->hash_size);
EVP_MD_CTX_destroy(mdctx);
free(buf);
fclose(fp);
}
void recurse(char *entry, void *arg) {
queue q = * (queue *) arg;
struct stat st;
stat(entry, &st);
if(S_ISDIR(st.st_mode))/////
get_entries(entry, q);
}
void add_files(char *entry, void *arg) {
queue q = * (queue *) arg;
struct stat st;
stat(entry, &st);
if(S_ISREG(st.st_mode)) {
printf("Se llama a q_insert\n");
q_insert(q, strdup(entry));
}
}
void walk_dir(char *dir, void (*action)(char *entry, void *arg), void *arg) {
DIR *d;
struct dirent *ent;
char full_path[MAX_PATH];
if((d = opendir(dir)) == NULL) {
printf("Could not open dir %s\n", dir);
return;
}
while((ent = readdir(d)) != NULL) {
if(strcmp(ent->d_name, ".") == 0 || strcmp(ent->d_name, "..") ==0)
continue;
snprintf(full_path, MAX_PATH, "%s/%s", dir, ent->d_name);
action(full_path, arg);
}
closedir(d);
}
void get_entries(char *dir, queue q) {
walk_dir(dir, add_files, &q);
walk_dir(dir, recurse, &q);
}
void check(struct options opt) {
queue in_q;
struct file_md5 *md5_in, md5_file;
in_q  = q_create(opt.queue_size);
read_hash_file(opt.file, opt.dir, in_q);
while((md5_in = q_remove(in_q))) {
printf("Se llama a q_remove in en check\n");
md5_file.file = md5_in->file;
sum_file(&md5_file);
if(memcmp(md5_file.hash, md5_in->hash, md5_file.hash_size)!=0) {
printf("File %s doesn't match.\nFound:    ", md5_file.file);
print_hash(&md5_file);
printf("\nExpected: ");
print_hash(md5_in);
printf("\n");
}
free(md5_file.hash);
free(md5_in->file);
free(md5_in->hash);
free(md5_in);
}
q_destroy(in_q);
}
int get_entries_cast(void*ptr){
struct thread_get_entries_args * entries_args = ptr;
get_entries(entries_args->dir, entries_args->q);
printf("Llamada a q_terminar\n");
q_terminar(entries_args->q);
return 0;
}
void start_get_entries_thread(char *dir, queue in_q){
struct thread_get_entries_info* thread;
thread = malloc(sizeof(struct thread_get_entries_info));
if(thread == NULL){
printf("Not enough memory available.\n");
exit(1);
}
thread->entries_args = malloc(sizeof(struct thread_get_entries_args));
thread->entries_args->dir=dir;
thread->entries_args->q=in_q;
thread->entries_args->id=0;
if(0!= thrd_create(&thread->id, get_entries_cast, thread->entries_args)){
printf("FALLO AL CREAR\n");
}
global = thread->id;
}
void sum(struct options opt) {
queue in_q, out_q;
char *ent;
FILE *out;
struct file_md5 *md5;
int dirname_len;
in_q  = q_create(opt.queue_size);
out_q = q_create(opt.queue_size);
start_get_entries_thread(opt.dir, in_q); //Use thread here instead of calling get_entries
printf("Va a entrar en remove\n");
while((ent = q_remove(in_q)) != NULL) {
md5 = malloc(sizeof(struct file_md5));
md5->file = ent;
sum_file(md5);
printf("Se llama a q_insert\n");
q_insert(out_q, md5);
}
printf("Llamada a q_terminar\n");
q_terminar(out_q);
if((out = fopen(opt.file, "w")) == NULL) {
printf("Could not open output file\n");
exit(0);
}
dirname_len = strlen(opt.dir) + 1; // length of dir + /
while((md5 = q_remove(out_q)) != NULL) {
printf("Se llama a q_remove out\n");
fprintf(out, "%s: ", md5->file + dirname_len);
for(int i = 0; i < md5->hash_size; i++)
fprintf(out, "%02hhx", md5->hash[i]);
fprintf(out, "\n");
free(md5->file);
free(md5->hash);
free(md5);
}
//if(thrd_join(thread->id, NULL)){
//printf("FALLO AL UNIR\n");
//}
fclose(out);
q_destroy(in_q);
q_destroy(out_q);
}
int main(int argc, char *argv[]) {
struct options opt;
opt.num_threads = 5;
opt.queue_size  = 1;
opt.check       = true;
opt.file        = NULL;
opt.dir         = NULL;
read_options (argc, argv, &opt);
if(opt.check)
check(opt);
else
sum(opt);
}

So the thing is, at first it works but then it gets infinitely stuck trying to insert. And I also don't now whether to use thrd_join or not. Thanks in advance.

答案1

得分: 1

以下是您要翻译的部分:

  • q_terminar()只唤醒等待q->empty条件变量的线程,而不是等待q->full条件变量的线程。也许您预期此函数只在队列为空时调用,但对q->full进行额外的广播将更安全且成本很低。

  • q_insert()在未获取队列互斥锁之前读取q->terminado。这通常会创建数据竞争。除了条件变量和互斥锁本身之外,不应该有任何线程访问队列结构的任何成员,而不持有互斥锁,或依赖于其他有效的同步机制。

  • q->insert()只在函数进入时一次检查q->terminado。它应该每次从条件变量等待中醒来时都重新检查,并根据其是否为true采取适当的操作。

  • q_insert()只在插入项后队列大小恰好为1时向q->empty条件变量广播。这可能足够,但我建议无条件地执行该广播,因为这样更容易理解,并可以确保在所有情况下都是正确的。

  • q_remove()中,我建议将if(q->terminado == true)块移动到while之后,并删除if (q->used == 0)块(控制只有在q->used == 0的情况下才会达到该点是因为q->terminado == true)。当前的代码在这方面没有错误,但确实是多余的。如果您愿意,可以在if(q->terminado == true)块中添加assert(q->used == 0)(我会这样做)。

  • q_remove()无需同时发送信号到q->full条件变量和广播到它。广播可能是要保留的一种。然而,最好的做法是无条件执行该广播,而不仅仅在项目移除后队列大小恰好为q->size - 1时执行。

另外,

  • sum()在拆除队列之前没有首先加入第二个线程。只有当没有线程在锁定互斥锁或可以在其销毁后尝试锁定它时,才能安全地销毁互斥锁。只有当没有线程在等待它,或可以在其销毁后尝试等待它、发送信号或广播给它时,才能安全地销毁条件变量。在其他线程确认已终止之前,无法确定队列销毁在这些方面是安全的,而加入该线程似乎是获得这种确认的最佳方式。

  • 作为更一般的经验法则,您应该加入或分离除初始线程以外的每个线程。

  • 正如@StephanSchlecht在评论中所观察到的,初始线程将项目加入到out_q,但没有其他线程将这些项目出队。仅在从in_q出队所有项目后,初始线程才尝试将它们出队。因此,如果项目多于out_q一次可以容纳的数量,那么初始线程最终将填满该队列并阻塞尝试添加另一个项目。在那一点之后,它将不再从in_q出队更多项目,可能导致其他线程也将该队列填满并阻塞。可能的解决方案包括:

    • 可以让另一个线程与现有的两个线程一起处理out_q,或
    • 初始线程可以立即处理输出项目,而不是将它们排队以供以后处理,或
    • 初始线程可以使用不具有固定容量的不同数据结构来存储输出项目,例如链接列表。
英文:

There are several weaknesses in the Queue code's use of synchonization objects. Among them:

  • q_terminar() wakes only the threads waiting on the q->empty CV, not those waiting on the q->full CV. Perhaps you anticipate that this function will be called only when the queue is empty, but it would be safer and very cheap for that function to additionally broadcast to q->full.

  • q_insert() reads q->terminado without first acquiring the queue's mutex. This will typically create a data race. No member of the queue structure other than the condition variables and the mutex itself should be accessed by any thread without holding the mutex locked, or relying on some other effective synchronization mechanism.

  • q->insert() checks q->terminado only once, at function entry. It should check again each time it wakes from a CV wait, and take appropriate action if it finds that member to be true.

  • q_insert() broadcasts to the q->empty CV only when, after an item is inserted, the queue size is exactly 1. This is probably sufficient , but I recommend performing that broadcast unconditionally, because it's easier to reason about that and be confident it's correct in all cases.

  • In q_remove(), I suggest moving the if(q->terminado == true) block after the while and removing the if (q->used == 0) block (the only way control can reach that point with q->used == 0 is if q->termindo == true). The current code is not erroneous in this regard, but it is redundant. If you like, add an assert(q->used == 0) in the if(q->terminado == true) block (I would).

  • It is redundant for q_remove() to both signal the q->full CV and and broadcast to it. The broadcast is probably the one to retain. However, it would be better to perform that broadcast unconditionally, instead of only when the queue size after item removal is exactly q->size - 1.

Additionally,

  • sum() tears down the queues without first joining the second thread. It is safe to destroy a mutex only if no thread holds it locked or can attempt to lock it after its destruction. It is safe to destroy a CV only if no thread is waiting on it, or can attempt to wait on it, signal it, or broadcast to it after its destruction. I don't see how you can be confident that your queue destruction is safe in those regards until the other thread(s) is confirmed to have terminated, and joining that thread appears to be the best way available to get such confirmation.

  • As a more general rule of thumb, you should either join or detach every thread but the initial one.

  • And as @StephanSchlecht oberved in comments, the initial thread enqueues items on out_q, but no other thread dequeues those items. The initial thread attempts to dequeue them only after dequeuing all the items from in_q. Thus, if there are more items than out_q can accommodate at once then the initial thread will eventually fill that queue and block trying to add another item. After that point it will not dequeue any more items from in_q, likely causing the other thread to fill that queue to capacity and block, too. Possible solutions include:

    • another thread could be tasked with handling out_q concurrently with the existing two, or
    • the initial thread could process output items immediately instead of enqueuing them for later processing, or
    • the initial thread could use a different data structure for output items, such as a linked list, that does not have a fixed capacity.

huangapple
  • 本文由 发表于 2023年3月4日 02:10:46
  • 转载请务必保留本文链接:https://go.coder-hub.com/75630522.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定