这个多线程程序为什么会被阻塞?

huangapple go评论66阅读模式
英文:

Why does this multi-threaded program get blocked?

问题

以下是您要翻译的代码部分:

这是我尝试在Web服务器中实现多线程的代码

#include <stdio.h>
#include <pthread.h>
#include "request.h"
#include "io_helper.h"

char default_root[] = ".";
int open_connections = 0;
int n_buffers = 1;
pthread_mutex_t mutex;
pthread_cond_t cond_empty;
pthread_cond_t cond_full;
int *conn_fds;

// 缓冲区消费者
void *manage_conn() {
    while (1) {
        for (int i = 0; i < n_buffers; ++i) {
            pthread_mutex_lock(&mutex);
            while (!open_connections) {
                printf("IN.\n");
                pthread_cond_wait(&cond_full, &mutex);
                printf("OUT.\n");
            }
            if (conn_fds[i]) {
                request_handle(conn_fds[i]);
                close_or_die(conn_fds[i]);
                conn_fds[i] = 0;
                open_connections--;
                pthread_cond_signal(&cond_empty);
            }
            pthread_mutex_unlock(&mutex);
        }
    }
}

int main(int argc, char *argv[]) {
    enum {
        FIFO,
        SFF
    };
    int c;
    char *root_dir = default_root;
    int port = 10002;
    int n_threads = 1;

    int schedalg = FIFO;

    while ((c = getopt(argc, argv, "d:p:t:b:s:")) != -1) {
        switch (c) {
            case 'd':
                root_dir = optarg;
                break;
            case 'p':
                port = atoi(optarg);
                break;
            case 't':
                n_threads = atoi(optarg);
                break;
            case 'b':
                n_buffers = atoi(optarg);
                printf("This.\n");
                break;
            case 's':
                if (!strcmp("FIFO", optarg)) schedalg = FIFO;
                else if (!strcmp("SFF", optarg)) schedalg = SFF;
                else {
                    printf("Invalid scheduling algorithm.\n");
                    return 0;
                }
                break;
            default:
                fprintf(stderr, "usage: wserver [-d basedir] [-p port]\n");
                exit(1);
        }
    }

    pthread_t *cid = malloc(n_threads * sizeof(*cid));
    conn_fds = malloc(n_buffers * sizeof(*conn_fds));
    for (int i = 0; i < n_buffers; i++) {
        conn_fds[i] = 0;
    }
    for (int i = 0; i < n_threads; ++i) {
        pthread_create(&cid[i], NULL, manage_conn, NULL);
    }

    // 切换到指定目录
    chdir_or_die(root_dir);

    // 现在开始工作

    int listen_fd = open_listen_fd_or_die(port);
    int i = 0;
    while (1) {
        struct sockaddr_in client_addr;
        int client_len = sizeof(client_addr);
        pthread_mutex_lock(&mutex);
        while (open_connections >= n_buffers) {
            pthread_cond_wait(&cond_empty, &mutex);
        }
        int conn_fd = accept_or_die(listen_fd, (sockaddr_t *) &client_addr, (socklen_t *) &client_len);
        conn_fds[i] = conn_fd;
        i = (i + 1) % n_buffers;
        open_connections++;
        pthread_cond_signal(&cond_full);
        pthread_mutex_unlock(&mutex);
        printf("New connection opened.\n");
    }
    return 0;
}

希望这能帮助您。如果您有任何进一步的问题,请随时提出。

英文:

This is my attempt of implementing multi-threading in a web server.

#include &lt;stdio.h&gt;
#include &lt;pthread.h&gt;
#include &quot;request.h&quot;
#include &quot;io_helper.h&quot;
char default_root[] = &quot;.&quot;;
int open_connections = 0;
int n_buffers = 1;
pthread_mutex_t mutex;
pthread_cond_t cond_empty;
pthread_cond_t cond_full;
int *conn_fds;
//Buffer consumer
void *manage_conn() {
while(1) {
for (int i = 0; i &lt; n_buffers; ++i)
{
pthread_mutex_lock(&amp;mutex);
while(!open_connections) {
printf(&quot;IN.\n&quot;);
pthread_cond_wait(&amp;cond_full, &amp;mutex);
printf(&quot;OUT.\n&quot;);
}
if(conn_fds[i]) {
request_handle(conn_fds[i]);
close_or_die(conn_fds[i]);
conn_fds[i] = 0;
open_connections--;
pthread_cond_signal(&amp;cond_empty);		
}
pthread_mutex_unlock(&amp;mutex);
//printf(&quot;New connection closed.\n&quot;);	
}	
}	
}
int main(int argc, char *argv[]) {	
enum {
FIFO,
SFF
};
int c;
char *root_dir = default_root;
int port = 10002;
int n_threads = 1;
int schedalg = FIFO;
while ((c = getopt(argc, argv, &quot;d:p:t:b:s:&quot;)) != -1) {
switch (c) {
case &#39;d&#39;:
root_dir = optarg;
break;
case &#39;p&#39;:
port = atoi(optarg);
break;
case &#39;t&#39;:
n_threads = atoi(optarg);
break;
case &#39;b&#39;:
n_buffers = atoi(optarg);
printf(&quot;This.\n&quot;);
break;
case &#39;s&#39;:
if(!strcmp(&quot;FIFO&quot;, optarg)) schedalg = FIFO;
else if(!strcmp(&quot;SFF&quot;, optarg)) schedalg = SFF;
else {
printf(&quot;Invalid scheduling algorithm.\n&quot;);
return 0;
}
break;
default:
fprintf(stderr, &quot;usage: wserver [-d basedir] [-p port]\n&quot;);
exit(1);
}
}
pthread_t *cid = malloc(n_threads * sizeof(*cid));
conn_fds = malloc(n_buffers * sizeof(*conn_fds));
for(int i = 0; i &lt; n_buffers; i++) {
conn_fds[i] = 0;
}
for (int i = 0; i &lt; n_threads; ++i)
{
pthread_create(&amp;cid[i], NULL, manage_conn, NULL);
}
// run out of this directory
chdir_or_die(root_dir);
// now, get to work
int listen_fd = open_listen_fd_or_die(port);
int i = 0;
while (1) {
struct sockaddr_in client_addr;
int client_len = sizeof(client_addr);
pthread_mutex_lock(&amp;mutex);
while(open_connections &gt;= n_buffers) {
pthread_cond_wait(&amp;cond_empty, &amp;mutex);
}
int conn_fd = accept_or_die(listen_fd, (sockaddr_t *) &amp;client_addr, (socklen_t *) &amp;client_len);
conn_fds[i] = conn_fd;
i = (i + 1) % n_buffers;
open_connections++;
pthread_cond_signal(&amp;cond_full);
pthread_mutex_unlock(&amp;mutex);
printf(&quot;New connection opened.\n&quot;);
}
return 0;
}

The main thread takes care of accepting new connections and places their corresponding file descriptors into a buffer.
Through the command line arguments, the user specifies how many additional threads will be created and how big the buffer for the file descriptors will be. This new threads will respond to and close the new connections via manage_conn.

For now I'm testing the server with these arguments:

> -b 20 -t 5

5 additional threads will manage the connections and the buffer size is 20.

I execute the server program and a client program that tells the server to run spin.cgi (a program that just spins for the specified amount of time).

Normally, less than 5 threads run and the main one accepts the new connection request but everything gets blocked after that.

这个多线程程序为什么会被阻塞?

First of all, I don't understand what happens to the other threads that are never executed.

However, my main question is why the threads that were waiting for cond_full never wake up when it is supposed to be signaled by the main thread after opening a new connection.

I already tried to debug this, but the error never shows up (I suppose it is because of the different timing).

Edit:

I forgot to initialize the mutex and the condition variables. I already changed that but the problem is is still there.

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond_empty = PTHREAD_COND_INITIALIZER;
pthread_cond_t cond_full = PTHREAD_COND_INITIALIZER;

答案1

得分: 1

This line was causing the problem.

这行导致了问题。

int conn_fd = accept_or_die(listen_fd, (sockaddr_t *) &client_addr, (socklen_t *) &client_len);

它一直在等待新的连接请求,但同时它一直保持着互斥锁的锁定,所以它没有让其他线程运行。

它一直在等待新的连接请求,但同时它一直保持着互斥锁的锁定,所以它没有让其他线程运行。

What I did to prevent this is unlock the mutex before executing it and lock again after it accepted a new request.

为了防止这个问题,我在执行之前解锁了互斥锁,然后在接受新请求后再次锁定了它。

pthread_mutex_unlock(&mutex);
pthread_mutex_unlock(&mutex);
int conn_fd = accept_or_die(listen_fd, (sockaddr_t *) &client_addr, (socklen_t *) &client_len);
int conn_fd = accept_or_die(listen_fd, (sockaddr_t *) &client_addr, (socklen_t *) &client_len);
pthread_mutex_lock(&mutex);
pthread_mutex_lock(&mutex);

英文:

This line was causing the problem.

int conn_fd = accept_or_die(listen_fd, (sockaddr_t *) &amp;client_addr, (socklen_t *) &amp;client_len);

It kept waiting for a new connection request, but at the same time it was keeping the mutex locked, so it didn't let the other threads run.

What I did to prevent this is unlock the mutex before executing it and lock again after it accepted a new request.

	pthread_mutex_unlock(&amp;mutex);
int conn_fd = accept_or_die(listen_fd, (sockaddr_t *) &amp;client_addr, (socklen_t *) &amp;client_len);
pthread_mutex_lock(&amp;mutex);

huangapple
  • 本文由 发表于 2023年5月14日 01:59:11
  • 转载请务必保留本文链接:https://go.coder-hub.com/76244196.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定