在使用EPOLLET的epoll服务器中,何时应该将套接字标记为EPOLLOUT?

huangapple go评论78阅读模式
英文:

epoll server with EPOLLET. When should I mark a socket with EPOLLOUT?

问题

以下是您要翻译的代码部分:

So been trying to make an asynchronous server with epoll (edge-triggered). Everything seems to work fine unless I send large files (30-33 MB) over the socket, which ofcourse gives EAGAIN with send since it fills up the kernel buffer. For now, I just keep sending it in the loop, not doing anything if I get EAGAIN, wasting CPU time. 

So, to bring EPOLLOUT into the picture, and from reading a lot of articles, what I should do is set the EPOLLOUT flag on the socket when send returns -1 with EAGAIN, break out of the while loop, go to the main event loop, wait until "events[i]->events & EPOLLOUT" becomes true, send the left over message again, and if all the bytes are done, remove the EPOLLOUT flag from the socket.

Is my approach correct? Or is there a better way to do this? I ask because I currently keep a global "send_buff" which is being re-used for all the clients to whom I send a response. If I implement the above approach, I'd need to have one buffer per client, because I need to keep the "unsend" message which I couldn't due to send() returning -1. Isn't that memory inefficient? If I have 1000 clients, each of which are asking for 1 MB worth of data, then it means I would have to allocate buffers which amount to 1 GB in total! Moreover, I would need to implement a queue structure, which tracks the sockets which have pending sends and then either using FIFO or some priority, send them out when send becomes ready to send again.

Is this how production servers are designed? What strategies do they use in their code?


Next, the second thing I wanted to ask is that when I see the packets that are being sent using wireshark, I always get "HTTP/1.1 200 OK  (text/css)Continuation". This continuation is something new which didn't happen when I plain non-blocking sockets. What could have caused this?


Here's the code for reference. Any help and tips to better the code is highly appreciated. Thanks a lot! 


Here's the server_t struct, which has one buffer (the global buffer) -> 

#define RECV_SIZE 2048
typedef struct server_ds {
  int sockfd;
  struct sockaddr_in info;
  char *reply;
} server_t;

server_t server;

Here's the event loop -> 

 for (;;) {
    // printf("before wait\n");
    nfds = epoll_wait(event->epollfd, event->events, MAX_EVENTS, -1);
    // printf("after wait\n");

    if (nfds == -1) {
      perror("epoll_wait()");
      return -1;
    }

    for (int i = 0; i < nfds; i++) {
      if (event->events[i].events == EPOLLHUP) {
        fprintf(stderr, "got EPOLLUP\n");
      }
      if (event->events[i].events == EPOLLERR) {
        fprintf(stderr, "got EPOLLERR\n");
      }
      if (event->events[i].data.fd == server.sockfd &&
          event->events[i].events & EPOLLIN) {
        // we recieved a new connection on the socket that the server is
        // listening on accept all the incoming new connections
        while (1) {
          memset(&in_addr, 0, sizeof(in_addr));
          in_len = sizeof(in_addr);

          int connfd =
              accept(server.sockfd, (struct sockaddr *)&in_addr, &in_len);

          if (connfd == -1) {
            if (errno == EAGAIN || errno == EWOULDBLOCK) {
              // we have accepted all the connections that we could
              // no more connection, break out of this loop
              break;
            }
            perror("accept()");
            break;
          }

          printf("New client %d connection!\n", connfd);
          if (make_socket_nonblocking(connfd) == -1) {
            continue;
          }

          event->ev.events = EPOLLIN | EPOLLET | EPOLLOUT ;
          event->ev.data.fd = connfd;

          if (epoll_ctl(event->epollfd, EPOLL_CTL_ADD, connfd, &event->ev) <
              0) {
            fprintf(stderr, "epoll set insertion error: fd=%d", connfd);
            close(connfd);
            continue;
          }
        }

      // }else if (event->events[i].events & EPOLLOUT ){
      //     printf("recieved : EPOLLOUT \n");
      //     event->events[i].events = EPOLLIN | EPOLLET;
      //     if( epoll_ctl(event->epollfd,EPOLL_CTL_MOD,event->events[i].data.fd,&event->events[i]) < 0 ){
      //       perror("epoll_ctl()");
            
      //     }
      }else if (event->events[i].events & EPOLLIN) {
        // client has send some data to the socket
        // recieve all the data at once
        int client_fd = event->events[i].data.fd;
        char buffer[RECV_SIZE];
        int client_closed = 0;

        while (1) {
          char *ptr = buffer;
          ssize_t bytes = recv(client_fd, ptr, RECV_SIZE - 1, 0);
          if (bytes == 0) {
            // client has closed the connection
            // close the client socket and remove from the epoll_loop_t
            printf("Client %d closed connection\n", client_fd);
            client_closed = 1;
            close(client_fd);
            break;
          } else if (bytes == -1) {
            if (errno == EAGAIN || errno == EWOULDBLOCK) {
              // we have read all we could
              // break from the while loop
              break;
            }
            perror("recv()");
            break;
          } else {
            ptr[bytes] = '\0';
            ptr += bytes;
          }
        }
        // if client has not closed the socket
        // only then send the reply
        //printf("client closed var : %d\n", client_closed);
        if (!client_closed) {
          http_t request;
          // parse_request returns -1 when the request is something other than
          // GET request. In that case we send the default NOT implemented
          // message to the client
          if (parse_request(buffer, strlen(buffer), &request) == -1) {
            // send_all returns -1 only when send() call returns with EAGAIN or
            // EWOULDBLOCK this indicates that the send buffers are full retry
            // it again later
            if (send_all(client_fd, strlen(not_implemented_reply),
                         not_implemented_reply) == -1) {
              // for now continue
              // to be implemented later
              continue;
            }
          }

          printf("sending response...\n");
          send_response(&request, client_fd);
          printf("sent OK reply\n");

          // free the server reply which was created using malloc
          if (server.reply != NULL) {
            free(server

<details>
<summary>英文:</summary>

So been trying to make an asynchronous server with epoll (edge-triggered). Everything seems to work fine unless I send large files (30-33 MB) over the socket, which ofcourse gives EAGAIN with send since it fills up the kernel buffer. For now, I just keep sending it in the loop, not doing anything if I get EAGAIN, wasting CPU time. 

So, to bring EPOLLOUT into the picture, and from reading a lot of articles, what I should do is set the EPOLLOUT flag on the socket when send returns -1 with EAGAIN, break out of the while loop, go to the main event loop, wait until ``events[i]-&gt;events &amp; EPOLLOUT``becomes true, send the left over message again, and if all the bytes are done, remove the EPOLLOUT flag from the socket.

Is my approach correct? Or is there a better way to do this? I ask because I currently keep a global ``send_buff`` which is being re-used for all the clients to whom I send a response. If I implement the above approach, I&#39;d need to have one buffer per client, because I need to keep the &quot;unsend&quot; message which I couldn&#39;t due to send() returning -1. Isn&#39;t that memory inefficient? If I have 1000 clients, each of which are asking for 1 MB worth of data, then it means I would have to allocate buffers which amount to 1 GB in total! Moreover, I would need to implement a queue structure, which tracks the sockets which have pending sends and then either using FIFO or some priority, send them out when send becomes ready to send again.

Is this how production servers are designed? What strategies do they use in their code?


Next, the second thing I wanted to ask is that when I see the packets that are being sent using wireshark, I always get ``HTTP/1.1 200 OK  (text/css)Continuation``. This continuation is something new which didn&#39;t happen when I plain non-blocking sockets. What could have caused this?


Here&#39;s the code for reference. Any help and tips to better the code is highly appreciated. Thanks a lot! 


Here&#39;s the server_t struct, which has one buffer (the global buffer) -&gt; 

    #define RECV_SIZE 2048
    typedef struct server_ds {
      int sockfd;
      struct sockaddr_in info;
      char *reply;
    } server_t;
    
    server_t server;

Here&#39;s the event loop -&gt; 

     for (;;) {
        // printf(&quot;before wait\n&quot;);
        nfds = epoll_wait(event-&gt;epollfd, event-&gt;events, MAX_EVENTS, -1);
        // printf(&quot;after wait\n&quot;);
    
        if (nfds == -1) {
          perror(&quot;epoll_wait()&quot;);
          return -1;
        }
    
        for (int i = 0; i &lt; nfds; i++) {
          if (event-&gt;events[i].events == EPOLLHUP) {
            fprintf(stderr, &quot;got EPOLLUP\n&quot;);
          }
          if (event-&gt;events[i].events == EPOLLERR) {
            fprintf(stderr, &quot;got EPOLLERR\n&quot;);
          }
          if (event-&gt;events[i].data.fd == server.sockfd &amp;&amp;
              event-&gt;events[i].events &amp; EPOLLIN) {
            // we recieved a new connection on the socket that the server is
            // listening on accept all the incoming new connections
            while (1) {
              memset(&amp;in_addr, 0, sizeof(in_addr));
              in_len = sizeof(in_addr);
    
              int connfd =
                  accept(server.sockfd, (struct sockaddr *)&amp;in_addr, &amp;in_len);
    
              if (connfd == -1) {
                if (errno == EAGAIN || errno == EWOULDBLOCK) {
                  // we have accepted all the connections that we could
                  // no more connection, break out of this loop
                  break;
                }
                perror(&quot;accept()&quot;);
                break;
              }
    
              printf(&quot;New client %d connection!\n&quot;, connfd);
              if (make_socket_nonblocking(connfd) == -1) {
                continue;
              }
    
              event-&gt;ev.events = EPOLLIN | EPOLLET | EPOLLOUT ;
              event-&gt;ev.data.fd = connfd;
    
              if (epoll_ctl(event-&gt;epollfd, EPOLL_CTL_ADD, connfd, &amp;event-&gt;ev) &lt;
                  0) {
                fprintf(stderr, &quot;epoll set insertion error: fd=%d&quot;, connfd);
                close(connfd);
                continue;
              }
            }
    
          // }else if (event-&gt;events[i].events &amp; EPOLLOUT ){
          //     printf(&quot;recieved : EPOLLOUT \n&quot;);
          //     event-&gt;events[i].events = EPOLLIN | EPOLLET;
          //     if( epoll_ctl(event-&gt;epollfd,EPOLL_CTL_MOD,event-&gt;events[i].data.fd,&amp;event-&gt;events[i]) &lt; 0 ){
          //       perror(&quot;epoll_ctl()&quot;);
                
          //     }
          }else if (event-&gt;events[i].events &amp; EPOLLIN) {
            // client has send some data to the socket
            // recieve all the data at once
            int client_fd = event-&gt;events[i].data.fd;
            char buffer[RECV_SIZE];
            int client_closed = 0;
    
            while (1) {
              char *ptr = buffer;
              ssize_t bytes = recv(client_fd, ptr, RECV_SIZE - 1, 0);
              if (bytes == 0) {
                // client has closed the connection
                // close the client socket and remove from the epoll_loop_t
                printf(&quot;Client %d closed connection\n&quot;, client_fd);
                client_closed = 1;
                close(client_fd);
                break;
              } else if (bytes == -1) {
                if (errno == EAGAIN || errno == EWOULDBLOCK) {
                  // we have read all we could
                  // break from the while loop
                  break;
                }
                perror(&quot;recv()&quot;);
                break;
              } else {
                ptr[bytes] = &#39;\0&#39;;
                ptr += bytes;
              }
            }
            // if client has not closed the socket
            // only then send the reply
            //printf(&quot;client closed var : %d\n&quot;, client_closed);
            if (!client_closed) {
              http_t request;
              // parse_request returns -1 when the request is something other than
              // GET request. In that case we send the default NOT implemented
              // message to the client
              if (parse_request(buffer, strlen(buffer), &amp;request) == -1) {
                // send_all returns -1 only when send() call returns with EAGAIN or
                // EWOULDBLOCK this indicates that the send buffers are full retry
                // it again later
                if (send_all(client_fd, strlen(not_implemented_reply),
                             not_implemented_reply) == -1) {
                  // for now continue
                  // to be implemented later
                  continue;
                }
              }
    
              printf(&quot;sending response...\n&quot;);
              send_response(&amp;request, client_fd);
              printf(&quot;sent OK reply\n&quot;);
    
              // free the server reply which was created using malloc
              if (server.reply != NULL) {
                free(server.reply);
                server.reply = NULL;
              }
            }
          }
        }
      }


Here&#39;s the send_all() function ( send_response() calls send_all() internally after some preprocessing of the data )

    int send_all(int sockfd, size_t len, const char *reply) {
      ssize_t bytes = 0;
      int send_size = 4028;
      size_t total_sent = 0;
      // printf(&quot;file size: %ld\n&quot;, len);
      while (len) {
        bytes = send(sockfd, reply, len, 0);
        printf(&quot;b sent : %ld\n&quot;,bytes);
        if (bytes == -1) {
          if (errno == EAGAIN || errno == EWOULDBLOCK) {
            // the send() buffer is full, retry it later
            // printf(&quot;got EAGAIN\n&quot;);
            continue;
          }
        }
        total_sent += bytes;
        reply += bytes;
        len -= bytes;
      }
      printf(&quot;total file size sent : %ld\n&quot;, total_sent);
      return 0;
    }



</details>


# 答案1
**得分**: 1

> 我的方法正确吗?
> 
> 正如您发现的那样,您最初的方法(循环检查`EAGAIN`)存在一个缺点,即一个表现不佳的客户端可能会导致线程繁忙循环,停止处理其他客户端,因此对于支持多个客户端的服务器来说,这不是一个很好的方法。
> 
> 您提出的新方法更好,因为即使一些客户端遇到网络问题,其他客户端的处理也可以像往常一样继续进行。
> 
> 如果我实施上述方法,我需要为每个客户端保留一个缓冲区,因为我需要保留未发送的消息,这是因为`send()`返回-1而未能发送。这不是内存效率低下吗?
> 
> 当然,这可能会导致内存效率低下,但有一些方法可以减少内存使用。
> 
> 你可以尝试减小潜在未发送消息的大小。在你的示例中,你提到为一个客户端生成了1MB的数据,然后不得不缓冲它。但是是否真的有必要提前生成整个兆字节的数据呢?为什么不只生成1KB的数据,一旦将这1KB的数据传递给`send()`,就生成下一个1KB,依此类推,直到整个1MB以这种方式发送?如果你从文件中读取数据,这应该很容易实现,这将将你的示例的潜在最坏情况内存使用减少了1000倍。(如果你以其他方式生成数据,实现这一点可能会复杂一些,但仍然可行)
> 
> 另一种可能适用的减少内存使用的方法是使用引用计数来共享数据。例如,如果你知道许多客户端可能在大约相同的时间请求相同的数据,那么不必为每个客户端加载单独的数据副本到内存中,你可以加载单个数据副本,让所有客户端直接从中读取,并保持对它的引用计数,当引用计数降为零时释放数据。(如果数据位于磁盘上,你甚至可以进一步`mmap()`数据,使其看起来好像在内存中,但实际上MMU会秘密地从磁盘/磁盘缓存中直接按需读取它,当程序访问其虚拟地址时)
> 
> 至于“队列结构”,这可能会起作用,但你想要跟踪每个客户端的主要事项是:
> 
>    - 对于边缘触发,`EPOLLOUTET`位是否当前设置为该客户端。
>    - 是否有数据需要尽快发送给该客户端(无论是在RAM上,还是在磁盘上,甚至只是潜在数据,尚未生成但你知道客户端需要的数据)。这将决定是否应该为该客户端设置`EPOLLOUT`位。
>    - 整个出站数据流中的位置(以便你知道哪些字节已经传递给`send()`,因此在套接字指示准备好写入时,下一次应该传递给`send()`的字节是哪些)
>    - 无论你需要为客户端的出站数据流“生成更多字节”的信息。这可以是字节本身,但不必如此;只需在需要时生成它们所需的信息。例如,对于从文件中读取的情况,这可能只是指向打开文件句柄的`FILE *`指针。
> 
> 请注意,使用上述信息,你仍然可以拥有一个(希望很小的)每个客户端的重发缓冲区,其中保存了以前尝试传递给`send()`但`send()`拒绝接受的“剩余”字节...或者如果你愿意,你可以完全跳过重发缓冲区,将上述所述的“生成字节”功能扩展到能够重新生成以前生成的字节,从流的顶部指定的字节偏移开始。这可以节省一些每个客户端的RAM,但增加了一些额外的代码复杂性。

<details>
<summary>英文:</summary>

&gt; Is my approach correct?

As you discovered, your original approach (of looping on the `EAGAIN`) has the drawback that a single badly-behaving client can cause the thread to busy-loop and stop handling other clients, so it&#39;s not a great approach for servers that support multiple clients.

Your proposed new approach is better, because even if some clients are having network trouble, handling of the other clients can continue as usual.

&gt; If I implement the above approach, I&#39;d need to have one buffer per
&gt; client, because I need to keep the &quot;unsend&quot; message which I couldn&#39;t
&gt; due to send() returning -1. Isn&#39;t that memory inefficient?

It certainly *could* be memory-inefficient, but there are things you can do to reduce memory usage.

One thing you could do is reduce the size of the potentially-unsent message.  In your example, you mention generating 1MB worth of data for a client, and then having to buffer that up.  But is it really necessary to generate the entire megabyte of data up-front?  Why not just generate 1KB of data instead, and once that 1KB of data has been passed to `send()`, generate the next 1KB, and so on, until the entire 1MB has been sent that way?  If you are reading the data from a file, that should be pretty straightforward to implement, and that would reduce the potential worst-case memory usage of your example by a factor of 1000.  (If you&#39;re generating the data some other way, it might be a bit more complicated to implement this, but still doable)

Another potentially-applicable way to reduce memory usage would be to use reference-counting to share data.  e.g. if you know many clients are likely to request the same data at around the same time, then instead of loading a separate copy of the data into RAM for each client, you could load a single copy of the data, let all the clients read from it directly, and keep a reference-count to it and free the data when the reference-count drops to zero.  (If the data is on disk, you could go even further and `mmap()` the data so that it *looks* like it&#39;s in RAM, but actually the MMU is secretly demand-reading it directly out of the disk/disk-cache when its virtual addresses are being accessed by the program)

As for a &quot;queue structure&quot;, that might work, but the main things you&#39;d want to keep track of for each client are:

   - For edge-triggering, whether the `EPOLLOUTET` bit is currently set for that client.
   - Whether you have data you want to send to that client ASAP (either in RAM, or on disk, or even just potential data that you haven&#39;t generated yet but you know that the client wants).  This will determine whether or not the `EPOLLOUT` bit should be set for that client.
   - Where in the overall outgoing data-stream you are (so that you know what bytes you already passed to `send()`, and therefore which bytes should be passed to `send()` then next time the socket indicates it is ready-for-write)
   - Whatever state info you need to &quot;generate some more bytes&quot; on demand for the client&#39;s outgoing-data-stream.  (This could be the bytes themselves, but it doesn&#39;t have to be; just the info necessary to generate them when you do need them.  e.g. for the reading-from-file case, this might be just the `FILE *` pointer to the open file-handle)

Note that with the above info, you could still have a (hopefully small) per-client resend-buffer that holds the &quot;leftover&quot; bytes that you previously tried to pass in to `send()` but that `send()` refused to accept... or if you prefer to, you could skip the resend-buffer entirely and extend the &quot;generate-bytes&quot; functionality described above to be able to re-generate previously-generated bytes starting at a specified byte-offset from the top of the stream.  That could save a bit of per-client RAM, at the cost of adding some additional code-complexity.


</details>



huangapple
  • 本文由 发表于 2023年6月12日 21:55:39
  • 转载请务必保留本文链接:https://go.coder-hub.com/76457379.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定