net/http: 协程之间的并发和消息传递

huangapple go评论80阅读模式
英文:

net/http: concurrency and message passing between coroutines

问题

我正在开发一个REST API服务器,服务器的一个功能是能够在创建新资源或修改现有资源时通过WebSocket通知任意数量的客户端。

我有一个自定义的动作路由器,将URL绑定到一个函数和gorillas的WebSocket库的实现。为了进程间通信,我决定依赖于通道,因为它似乎是协程之间通信的惯用方式。它还表现得像一个管道,这是我熟悉的概念。

一个名为Create的函数的原型如下:

func Create(res http.ResponseWriter, req *http.Request, userdata interface{}) (int, string, interface{})

作为userdata,传递了一个PipeSet结构的实例。它是一个在所有协程之间共享的映射,其中键是Pipe的地址(指针),值是相同的东西。这里的理念是为了加快删除时的查找过程。

type Pipe chan string

type PipeSet struct {
    sync.Mutex
    Pipes map[*Pipe]*Pipe
}

func NewPipe() Pipe {
    return make(Pipe)
}

func NewPipeSet() PipeSet {
    var newSet PipeSet
    newSet.Pipes = make(map[*Pipe]*Pipe)
    return newSet
}

func (o *PipeSet) AddPipe(pipe *Pipe) {
    o.Lock()
    o.Pipes[pipe] = pipe
    o.Unlock()
}

func (o *PipeSet) ForeachPipe(f func(pipe Pipe)) {
    o.Lock()
    for k := range o.Pipes {
        f(*o.Pipes[k])
    }
    o.Unlock()
}

func (o *PipeSet) DeletePipe(pipe *Pipe) {
    o.Lock()
    delete(o.Pipes, pipe)
    o.Unlock()
}

当客户端通过WebSocket连接时,会创建一个新的通道(Pipe)并将其添加到共享的PipeSet中。然后,如果创建了一个新资源,协程会遍历整个PipeSet,向每个Pipe发送一条消息。然后将消息转发给连接的客户端。

一个问题区域

我无法检测到客户端的WebSocket连接是否仍然存在。我需要知道这一点来确定是否应该从PipeSet中删除一个Pipe。在这种情况下,我依赖于CloseNotifier,但它从未触发。

代码如下(摘录):

var upgrader = websocket.Upgrader{
    CheckOrigin: func(r *http.Request) bool { return true },
}

conn, err := upgrader.Upgrade(res, req, nil)

if err != nil {
    marker.MarkError(err)
    return http.StatusBadRequest, "", nil
}

defer conn.Close()

exitStatus = http.StatusOK
pipe := genstore.NewPipe()
quit := res.(http.CloseNotifier).CloseNotify()

genStore.WSChannels.AddPipe(&pipe)

for {
    log.Printf("waiting for a message")

    select {
    case wsMsg := <-pipe:
        log.Printf("got a message: %s (num pipes %d)", wsMsg, len(genStore.WSChannels.Pipes))

        if err = conn.WriteMessage(websocket.TextMessage, []byte(wsMsg)); err != nil {
            marker.MarkError(err)
            goto egress
        }

    case <-quit:
        log.Printf("quit...")
        goto egress
    }
}

egress:
genStore.WSChannels.DeletePipe(&pipe)
英文:

I am working on a REST API server and one of the features of the server is being able to notify arbitrary number of client via a websocket when a new resource is created or an existing one is modified.

I have a custom action router to bind an URL to a function and gorillas's implementation of websocket library. For IPC I have decided to rely on channels as it appears to be the idiomatic way to communicate between coroutines. Also it behaves like a pipe which is a concept I am familiar with.

A prototype for a function Create looks like this:

func Create (res http.ResponseWriter, req *http.Request, userdata interface {}) (int, string, interface {})

As a userdata an instance of a structure PipeSet is passed. It is a map that is shared between all coroutines where a key is an address (a pointer to) of a Pipe and value the same thing. The rationale here is to speed up a lookup process when deleting.

type Pipe chan string                                                           
                                                                            
type PipeSet struct {                                                           
    sync.Mutex                                                                  
    Pipes map [*Pipe] *Pipe                                                     
}                                                                               
                                                                            
func NewPipe () Pipe {                                                          
    return make (Pipe)                                                          
}                                                                               
                                                                            
func NewPipeSet () PipeSet {                                                    
    var newSet PipeSet                                                      
    newSet.Pipes = make (map[*Pipe] *Pipe)                                  
    return newSet                                                           
}                                                                               
                                                                            
func (o *PipeSet) AddPipe (pipe *Pipe) {                                        
    o.Lock ()                                                                   
    o.Pipes[pipe] = pipe                                                        
    o.Unlock ()                                                                 
}                                                                               
                                                                            
func (o *PipeSet) ForeachPipe (f func (pipe Pipe)) {                            
    o.Lock ()                                                                   
    for k := range (o.Pipes) {                                                  
        f (*o.Pipes[k])                                                         
    }                                                                           
    o.Unlock ()                                                                 
}                                                                               
                                                                            
func (o *PipeSet) DeletePipe (pipe *Pipe) {                                     
    o.Lock ()                                                                   
    delete (o.Pipes, pipe)                                                      
    o.Unlock ()                                                                 
}

When a client connects via websocket a new channel (a Pipe) is created and added to a shared PipeSet. Then if a new resource is created a coroutine goes through an entire PipeSet sending a message to each Pipe. The message is then forwarded to a connected clients on the other side.

A problem area

I am unable to detect whether client's websocket connection is still there. I need to know that to determine whether I should remove a Pipe from the PipeSet. I am relying on CloseNotifier in this case. It never fires.

The code looks like this (excerpt):

var upgrader = websocket.Upgrader {
	CheckOrigin: func (r *http.Request) bool { return true },
}

conn, err := upgrader.Upgrade (res, req, nil)

if err != nil {
	marker.MarkError (err)
	return http.StatusBadRequest, &quot;&quot;, nil
}

defer conn.Close ()

exitStatus = http.StatusOK
pipe := genstore.NewPipe ()
quit := res.(http.CloseNotifier).CloseNotify ()

genStore.WSChannels.AddPipe (&amp;pipe)

for {
	log.Printf (&quot;waiting for a message&quot;)

	select {
		case wsMsg = &lt;-pipe:
			log.Printf (&quot;got a message: %s (num pipes %d)&quot;, wsMsg, len (genStore.WSChannels.Pipes))

			if err = conn.WriteMessage (websocket.TextMessage, []byte (wsMsg)); err != nil {
				marker.MarkError (err)
				goto egress
			}

		case &lt;-quit:
			log.Printf (&quot;quit...&quot;)
			goto egress
	}
}

egress:
genStore.WSChannels.DeletePipe (&amp;pipe)

答案1

得分: 3

当你使用Gorilla将HTTP连接升级为WebSocket连接时,它会劫持该连接,net/http服务器停止为其提供服务。这意味着,从那一刻起,你不能依赖net/http的事件。

可以参考这个链接:https://github.com/gorilla/websocket/issues/123

因此,在这里你可以为每个新的WebSocket连接启动一个新的goroutine,该goroutine将从该连接读取数据,并在失败时向quit通道写入一条消息。

英文:

When you upgrade HTTP connection to a WebSocket connection using Gorilla, it hijacks that connection and net/http server stops serving it. This means, that you can't rely on a net/http events from that moment.

Check this: https://github.com/gorilla/websocket/issues/123

So, what you can do here is to start new goroutine for every new WebSocket connection, which will read data from this connection and write a message to a quit channel on a failure.

huangapple
  • 本文由 发表于 2017年6月3日 12:15:35
  • 转载请务必保留本文链接:https://go.coder-hub.com/44340192.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定