使用通道进行请求-响应通信的惯用方式

huangapple go评论67阅读模式
英文:

Idiomatic way to make a request-response communication using channels

问题

也许我只是没有正确阅读规范,或者我的思维方式仍然停留在旧的同步方法上,但在Go中,如何以一种类型发送消息,然后接收另一种类型作为响应,这是正确的方式?

我想到的一种方法是:

package main

import "fmt"

type request struct {
    out      chan string
    argument int
}

var input = make(chan *request)
var cache = map[int]string{}

func processor() {
    for {
        select {
        case in := <-input:
            if result, exists := cache[in.argument]; exists {
                in.out <- result
            }
            result := fmt.Sprintf("%d", in.argument)
            cache[in.argument] = result
            in.out <- result
        }
    }
}

func main() {
    go processor()
    responseCh := make(chan string)
    input <- &request{
        out:      responseCh,
        argument: 1,
    }
    result := <-responseCh
    fmt.Println(result)
}

对于这个示例来说,cache并不是必需的,但否则它会导致数据竞争。

这是我应该做的吗?

英文:

Maybe I'm just not reading the spec right or my mindset is still stuck with older synchronization methods, but what is the right way in Go to send one type as receive something else as a response?

One way I had come up with was

package main
import &quot;fmt&quot;

type request struct {
    out chan string
    argument int
}
var input = make(chan *request)
var cache = map[int]string{}
func processor() {
    for {
        select {
            case in := &lt;- input:
                if result, exists := cache[in.argument]; exists {
                    in.out &lt;- result
                }
                result := fmt.Sprintf(&quot;%d&quot;, in.argument)
                cache[in.argument] = result
                in.out &lt;- result
        }
    }
}

func main() {
    go processor()
    responseCh := make(chan string)
    input &lt;- &amp;request{
        responseCh,
        1,
    }
    result := &lt;- responseCh
    fmt.Println(result)
}

That cache is not really necessary for this example but otherwise it would cause a datarace.

Is this what I'm supposed to do?

答案1

得分: 26

有很多可能性,取决于你的问题的最佳解决方法是什么。当你从一个通道接收到东西时,没有默认的响应方式 - 你需要自己构建流程(就像你在问题的示例中所做的那样)。在每个请求中发送一个响应通道可以给你很大的灵活性,因为你可以选择将响应路由到哪里,但很多时候并不是必需的。

以下是一些其他的例子:

1. 发送和接收来自同一个通道

你可以使用非缓冲通道同时发送和接收响应。这很好地说明了非缓冲通道实际上是程序中的同步点。当然,限制是我们需要发送与请求和响应完全相同的类型:

package main

import (
	"fmt"
)

func pow2() (c chan int) {
	c = make(chan int)
	go func() {
		for x := range c {
			c <- x*x
		}
	}()
	return c
}

func main() {
	c := pow2()
	c <- 2
	fmt.Println(<-c) // = 4
	c <- 4
	fmt.Println(<-c) // = 8
}

2. 发送到一个通道,从另一个通道接收

你可以分离输入和输出通道。如果你愿意,你可以使用缓冲版本。这可以用作请求/响应场景,并允许你有一个负责发送请求的路由,另一个负责处理请求,还有一个负责接收响应的路由。示例:

package main

import (
	"fmt"
)

func pow2() (in chan int, out chan int) {
	in = make(chan int)
	out = make(chan int)
	go func() {
		for x := range in {
			out <- x*x
		}		
	}()
	return
}

func main() {
	in, out := pow2()
	go func() {
		in <- 2
		in <- 4
	}()
	fmt.Println(<-out) // = 4
	fmt.Println(<-out) // = 8
}

3. 每个请求都发送响应通道

这就是你在问题中提到的方式。它给了你灵活性,可以指定响应的路由。如果你希望响应命中特定的处理程序,这将非常有用,例如你有许多客户端有一些任务要做,你希望响应由同一个客户端接收。

package main

import (
	"fmt"
	"sync"
)

type Task struct {
	x int
	c chan int
}

func pow2(in chan Task) {
	for t := range in {
		t.c <- t.x*t.x
	}		
}

func main() {
	var wg sync.WaitGroup	
	in := make(chan Task)

	// 两个处理器
	go pow2(in)
	go pow2(in)

	// 五个客户端有一些任务
	for n := 1; n < 5; n++ {
		wg.Add(1)
		go func(x int) {
			defer wg.Done()
			c := make(chan int)
			in <- Task{x, c}
			fmt.Printf("%d**2 = %d\n", x, <-c)
		}(n)
	}

	wg.Wait()
}

值得一提的是,这种情况不一定需要使用每个任务返回通道来实现。如果结果具有某种客户端上下文(例如客户端ID),一个单一的多路复用器可以接收所有的响应,然后根据上下文进行处理。

有时候,为了实现简单的请求-响应模式,没有必要使用通道。当设计Go程序时,我发现自己试图将太多的通道注入到系统中(只是因为我认为它们非常好用)。有时候,旧好的函数调用就是我们所需要的:

package main

import (
	"fmt"
)

func pow2(x int) int {
	return x*x
}

func main() {
	fmt.Println(pow2(2))
	fmt.Println(pow2(4))
}

(如果有人遇到与你的示例类似的问题,这可能是一个很好的解决方案。回应你在问题下收到的评论,如果需要保护单个结构(例如缓存),创建一个结构并公开一些方法,使用互斥锁保护并发使用可能会更好。)

英文:

There're plenty of possibilities, depends what is best approach for your problem. When you receive something from a channel, there is nothing like a default way for responding – you need to build the flow by yourself (and you definitely did in the example in your question). Sending a response channel with every request gives you a great flexibility as with every request you can choose where to route the response, but quite often is not necessary.

Here are some other examples:

1. Sending and receiving from the same channel

You can use unbuffered channel for both sending and receiving the responses. This nicely illustrates that unbuffered channels are in fact a synchronisation points in your program. The limitation is of course that we need to send exactly the same type as request and response:

package main

import (
	&quot;fmt&quot;
)

func pow2() (c chan int) {
	c = make(chan int)
	go func() {
		for x := range c {
			c &lt;- x*x
		}
	}()
	return c
}

func main() {
	c := pow2()
	c &lt;- 2
	fmt.Println(&lt;-c) // = 4
	c &lt;- 4
	fmt.Println(&lt;-c) // = 8
}

2. Sending to one channel, receiving from another

You can separate input and output channels. You would be able to use buffered version if you wish. This can be used as request/response scenario and would allow you to have a route responsible for sending the requests, another one for processing them and yet another for receiving responses. Example:

package main

import (
	&quot;fmt&quot;
)

func pow2() (in chan int, out chan int) {
	in = make(chan int)
	out = make(chan int)
	go func() {
		for x := range in {
			out &lt;- x*x
		}		
	}()
	return
}

func main() {
	in, out := pow2()
	go func() {
		in &lt;- 2
		in &lt;- 4
	}()
	fmt.Println(&lt;-out) // = 4
	fmt.Println(&lt;-out) // = 8
}

3. Sending response channel with every request

This is what you've presented in the question. Gives you a flexibility of specifying the response route. This is useful if you want the response to hit the specific processing routine, for example you have many clients with some tasks to do and you want the response to be received by the same client.

package main

import (
	&quot;fmt&quot;
	&quot;sync&quot;
)

type Task struct {
	x int
	c chan int
}

func pow2(in chan Task) {
	for t := range in {
		t.c &lt;- t.x*t.x
	}		
}

func main() {
	var wg sync.WaitGroup	
	in := make(chan Task)

	// Two processors
	go pow2(in)
	go pow2(in)

	// Five clients with some tasks
	for n := 1; n &lt; 5; n++ {
		wg.Add(1)
		go func(x int) {
			defer wg.Done()
			c := make(chan int)
			in &lt;- Task{x, c}
			fmt.Printf(&quot;%d**2 = %d\n&quot;, x, &lt;-c)
		}(n)
	}

	wg.Wait()
}

Worth saying this scenario doesn't necessary need to be implemented with per-task return channel. If the result has some sort of the client context (for example client id), a single multiplexer could be receiving all the responses and then processing them according to the context.

Sometimes it doesn't make sense to involve channels to achieve simple request-response pattern. When designing go programs, I caught myself trying to inject too many channels into the system (just because I think they're really great). Old good function calls is sometimes all we need:

package main

import (
	&quot;fmt&quot;
)

func pow2(x int) int {
	return x*x
}

func main() {
	fmt.Println(pow2(2))
	fmt.Println(pow2(4))
}

(And this might be a good solution if anyone encounters similar problem as in your example. Echoing the comments you've received under your question, having to protect a single structure, like cache, it might be better to create a structure and expose some methods, which would protect concurrent use with mutex.)

huangapple
  • 本文由 发表于 2014年12月2日 04:36:45
  • 转载请务必保留本文链接:https://go.coder-hub.com/27236827.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定