英文:
TCP connection pool
问题
我是socket的新手,正在尝试在TCP socket上创建连接池。我的实现方式是每次调用发送32位长度,然后发送二进制消息。但是我遇到了一个问题,有时候读取器会接收到服务器之前的响应(当客户端在发送错误时关闭并重新建立socket时可能会发生)。我应该如何在发送新请求之前刷新socket(清除上一次调用的剩余字节)?有什么建议吗?
编辑:我了解到TCP始终流式传输0s,如果我在消息之前发送byte(1),那么我可以有一个刷新函数,在每次新调用之前检查socket是否为空。
英文:
i'm new to socket and trying to create a connection pooling over tcp socket. my implementation send 32bit length then binary message for each call. But i'm having problem with sometimes the reader receiving previous response from server (could happened when client close and re-establish socket on send error). how do i flush socket (remaining bytes from previous call) before a new request. any suggestion?
Edit: i learned that tcp always stream 0s, what if i send byte(1) before message so i can have a flush function to check if socket not empty before a new call.
答案1
得分: 12
你的帖子实际上提出了几个问题:
- 如何管理连接池?
- 如何处理套接字通信?
这实际上是两个不同的问题。连接池只是一种管理一组连接的方式。一个简单的实现方式是使用如下的类:
package netpool
import (
"net"
)
const MaxConnections = 3
type Error string
func (e Error) Error() string {
return string(e)
}
var ErrMaxConn = Error("Maximum connections reached")
type Netpool struct {
name string
conns int
free []net.Conn
}
func NewNetpool(name string) *Netpool {
return &Netpool{
name: name,
}
}
func (n *Netpool) Open() (conn net.Conn, err error) {
if n.conns >= MaxConnections && len(n.free) == 0 {
return nil, ErrMaxConn
}
if len(n.free) > 0 {
// 返回连接池中的第一个空闲连接
conn = n.free[0]
n.free = n.free[1:]
} else {
addr, err := net.ResolveTCPAddr("tcp", n.name)
if err != nil {
return nil, err
}
conn, err = net.DialTCP("tcp", nil, addr)
if err != nil {
return nil, err
}
n.conns += 1
}
return conn, err
}
func (n *Netpool) Close(conn net.Conn) error {
n.free = append(n.free, conn)
return nil
}
我在这里创建了一个独立的类。通常情况下,它会作为更高级的类(如MyHTTPHost或MyDatabase)的一部分来实现。
在这个简单的实现中,通过netpool.Open()返回的连接没有被跟踪。如果在netpool.Close()之外关闭连接,可能会导致连接泄漏。如果你想要跟踪它们,可以添加一个活动和非活动连接池,这样就可以解决这个问题。
还有一些其他的事情你可能想要添加到连接池的实现中:
- 线程保护(使用sync.Mutex,例如)
- 在一段时间不活动后关闭连接池中的连接
- 错误检查,确保关闭的连接仍然有效
一旦你有了一个连接,你可以像正常情况下一样调用它的Read和Write方法。要刷新套接字上的所有未完成数据,你可以简单地使用ioutil.ReadAll()辅助函数。默认情况下,如果没有数据可用,它将无限期地阻塞。为了避免这种情况,可以使用以下代码添加读取超时:
conn.SetReadDeadline(time.Now().Add(500 * time.Millisecond))
_, err = ioutil.ReadAll(conn)
neterr, ok := err.(net.Error)
if ok && neterr.Timeout() {
err = nil // 在这种情况下,超时不是一个错误
}
if err != nil {
// 处理错误情况。
}
如果有待处理的数据,这将从给定的连接中读取所有数据;如果没有待处理的数据,则在500毫秒后返回一个I/O超时错误。
需要进行类型断言,因为ioutil.ReadAll()返回一个Error接口,而不是net.Error接口,我们需要后者来轻松地判断调用是否由于超时而返回。
英文:
Your post actually asks several questions:
- How to manage a connection pool?
- How to handle communication over the sockets?
These are really two different things. A connection pool is just a way to manage a set of connections. A simple way to implement this is with a class such as:
package netpool
import (
"net"
)
const MaxConnections = 3
type Error string
func (e Error) Error() string {
return string(e)
}
var ErrMaxConn = Error("Maximum connections reached")
type Netpool struct {
name string
conns int
free []net.Conn
}
func NewNetpool(name string) *Netpool {
return &Netpool{
name: name,
}
}
func (n *Netpool) Open() (conn net.Conn, err error) {
if n.conns >= MaxConnections && len(n.free) == 0 {
return nil, ErrMaxConn
}
if len(n.free) > 0 {
// return the first free connection in the pool
conn = n.free[0]
n.free = n.free[1:]
} else {
addr, err := net.ResolveTCPAddr("tcp", n.name)
if err != nil {
return nil, err
}
conn, err = net.DialTCP("tcp", nil, addr)
if err != nil {
return nil, err
}
n.conns += 1
}
return conn, err
}
func (n *Netpool) Close(conn net.Conn) error {
n.free = append(n.free, conn)
return nil
}
I have created a stand-alone class here. It would typically be implemented as part of a higher-level class such as MyHTTPHost, or MyDatabase.
In this simple implementation, connections that are returned via netpool.Open() are not tracked. It's possible to leak connections by calling Open(), then closing the connections outside of netpool.Close(). It's possible to track them if you want to hold an active and inactive pool, for example, which would solve this problem.
A couple of other things you might want to add to a pooling implementation:
- Threading protection (using sync.Mutex, for example)
- Closing of connections in the freepool after some length of inactivity
- Error checking to be sure that closed connections are still valid
Once you have a connection, you can call Read and Write on it normally. To flush all oustanding data on the socket, you can simply use the ioutil.ReadAll() helper function. By default, this will block indefinitely if there is no data available. To avoid that, add a read timeout using:
conn.SetReadDeadline(time.Now().Add(500 * time.Millisecond))
_, err = ioutil.ReadAll(conn)
neterr, ok := err.(net.Error)
if ok && neterr.Timeout() {
err = nil // timeout isn't an error in this case
}
if err != nil {
// handle the error case.
}
This will read all the data from the given connection if any is pending, or will return after 500ms with an I/O Timeout error if no data was pending.
The type assertion is required because ioutil.ReadAll() returns an Error interface, rather than a net.Error interface, and we need the latter to be able to easily find out if the call returned due to a timeout.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论