在Go语言中,是否可以同时等待通道和文件描述符?

huangapple go评论81阅读模式
英文:

Is it possible to wait on both channels and file descriptors at the same time in Go?

问题

我知道在Go语言中可以使用select {}语法在多个通道上等待,并且可以使用syscall.Select()或类似的函数在多个文件描述符上等待。但是,是否可以同时在两个通道上等待呢?

背景是,我想要一个goroutine,在一个通道上接收消息并将其转发到一个套接字连接上(由gozmq提供),同时在套接字连接上等待回复。

由于底层库的线程安全要求,套接字只能在一个线程中访问,这就是为什么我想知道是否有办法从单个goroutine中处理这个问题的原因。

英文:

I know I can wait on multiple channels using the select {} syntax in Go, and wait on multiple file descriptors using syscall.Select() or similar functions. But is it possible to wait on both channels at once?

For background, I want to have a goroutine that accepts for messages over a channel and forwards them over a socket connection (provided by gozmq), while simultaneously waiting for replies on the socket connection.

Due to the thread safety requirements of the underlying library, the socket can only be accessed in one thread at a time, which is why I was wondering if there is a way to handle this from a single goroutine.

答案1

得分: 13

选择在通道和文件描述符上进行选择是不可能的,因为这些抽象位于不同的层次上。通道由Go运行时处理,而文件描述符由操作系统处理。你需要的是在它们之间建立一个桥梁,可以使用net.Pipe()来实现。

你需要做的就是将一个goroutine专门用于epoll()/select()来监视你的zmq-sockets和一个单独的“唤醒”net.Pipe()。这就是你的轮询服务器。另一个goroutine监听你的读取和写入通道。当有人在读取或写入通道上发送消息时,第二个goroutine会发送消息到管道上以唤醒轮询服务器。

这就是标准库中net包的工作原理。我强烈建议阅读它以获取灵感(或者直接借鉴... BSD许可证非常自由)。

以下是net包中的pollServer的描述。你可能需要阅读代码才能理解这段描述,但是从fd.go中的这一部分开始查找应该是一个不错的起点。

// 一个pollServer帮助FD在收到EAGAIN后确定何时重试非阻塞读取或写入。
// 当一个FD需要等待时,将fd发送到s.cr(用于读取)或s.cw(用于写入)以将请求传递给轮询服务器。
// 然后在fd.cr/fd.cw上接收。
// 当pollServer发现FD上的I/O可能再次可用时,它将在fd.cr/fd.cw上发送fd以唤醒任何等待的进程。
// 这个协议被实现为s.WaitRead()和s.WaitWrite()。
//
// 这里有一个微妙之处:当在s.cr/s.cw上发送时,轮询服务器可能正在系统调用中,等待一个fd变为就绪。
// 它不会查看请求通道。为了解决这个问题,轮询服务器不仅等待它所拥有的FD,还等待它自己的管道。
// 在发送到缓冲通道s.cr/s.cw之后,WaitRead/WaitWrite向管道写入一个字节,导致轮询服务器的轮询系统调用返回。
// 作为对管道可读的响应,轮询服务器重新轮询其请求通道。
//
// 注意,顺序是“发送请求”然后“唤醒服务器”。
// 如果操作顺序颠倒,就会出现竞争:轮询服务器可能会被唤醒并查看请求通道,发现它是空的,然后再次进入睡眠状态,
// 这都发生在请求者成功发送请求之前。因为发送必须在唤醒之前完成,所以请求通道必须是有缓冲的。
// 大小为1的缓冲区足以处理任何请求负载。如果有很多进程尝试提交请求,其中一个将成功,轮询服务器将读取请求,
// 然后通道将在下一个进程的请求之前为空。更大的缓冲区可能有助于批量请求。
//
// 为了避免关闭时的竞争,所有的fd操作都是加锁和引用计数的。
// 当调用netFD.Close()时,它调用syscall.Shutdown并设置一个关闭标志。
// 只有当最后一个引用被移除时,fd才会被关闭。

祝你在重新实现net时好运。在所有这些之后,好消息是你的zmq-socket将在Go中是线程安全的。

英文:

Selecting on both a channel and an file descriptor is not possible because the abstractions are at different levels. Channels are handled by the go runtime and file descriptors by the operating system. What you need is to make a bridge between them and this can be done with a net.Pipe().

Pretty much what you need to do is dedicate one goroutine to epoll()/select() to watch your zmq-sockets and a single "wake up" net.Pipe(). This is your poll server. Another goroutine listens on your read and write channels. When someone sends on the read or write channels, the second goroutine would send on the pipe to wake up the poll server.

This is how the net package in the standard library works. I highly recommend reading it for inspiration (or stealing... the BSD license is very liberal).

Here is a description of pollServer from net itself. You may need to read the code to understand what this is saying, but this section from fd.go should be a good place to start looking.

// A pollServer helps FDs determine when to retry a non-blocking
// read or write after they get EAGAIN.  When an FD needs to wait,
// send the fd on s.cr (for a read) or s.cw (for a write) to pass the
// request to the poll server.  Then receive on fd.cr/fd.cw.
// When the pollServer finds that i/o on FD should be possible
// again, it will send fd on fd.cr/fd.cw to wake any waiting processes.
// This protocol is implemented as s.WaitRead() and s.WaitWrite().
//
// There is one subtlety: when sending on s.cr/s.cw, the
// poll server is probably in a system call, waiting for an fd
// to become ready.  It's not looking at the request channels.
// To resolve this, the poll server waits not just on the FDs it has
// been given but also its own pipe.  After sending on the
// buffered channel s.cr/s.cw, WaitRead/WaitWrite writes a
// byte to the pipe, causing the pollServer's poll system call to
// return.  In response to the pipe being readable, the pollServer
// re-polls its request channels.
//
// Note that the ordering is "send request" and then "wake up server".
// If the operations were reversed, there would be a race: the poll
// server might wake up and look at the request channel, see that it
// was empty, and go back to sleep, all before the requester managed
// to send the request.  Because the send must complete before the wakeup,
// the request channel must be buffered.  A buffer of size 1 is sufficient
// for any request load.  If many processes are trying to submit requests,
// one will succeed, the pollServer will read the request, and then the
// channel will be empty for the next process's request.  A larger buffer
// might help batch requests.
//
// To avoid races in closing, all fd operations are locked and
// refcounted. when netFD.Close() is called, it calls syscall.Shutdown
// and sets a closing flag. Only when the last reference is removed
// will the fd be closed.

Good luck re-implementing net. The good news at the end of all this your zmq-socket will be thread safe in go.

答案2

得分: 2

为每个您希望等待的fd生成一个新的goroutine,当它们读取到内容时,让它们将fd发送到一个通道,然后在这些通道上进行选择。

英文:

Spawn a new goroutine for each fd you wish to wait on, have them send the fd to a channel when they read something, select on the channels.

答案3

得分: 1

所有的net.Conn在Go中都可以并发访问。如果你的库是基于这一点的,就不应该有任何问题。

否则,在Go中,每个连接通常会生成一个goroutine。这个goroutine通常只负责从套接字中读取数据(可能会阻塞),并通过通道将所有数据转发给另一个协调goroutine。然后,这个协调goroutine可以使用select语句与所有这些套接字连接(包装在通道中)和其他事件进行交互。还要注意,你可以很容易地向这些通道添加缓冲区,这样慢速客户端就不会阻塞你的协调goroutine。

英文:

All net.Conn's in Go can be accessed concurrently. If your library is based on that, there shouldn't be any problems.

Otherwise, it's quite common in Go to spawn one goroutine per connection. This goroutine often is just responsible for reading from the socket (which might block) and forward all the data to another coordinator goroutine using a channel. This coordinator goroutine is then able to interact with all those socket connections (which are wrapped in a channel) and other events using a select statement. Also note, that you can easily add a buffer to those channels, so that a slow client can't block your coordinator goroutine.

huangapple
  • 本文由 发表于 2012年7月5日 18:51:24
  • 转载请务必保留本文链接:https://go.coder-hub.com/11342833.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定