如何在第一个 goroutine 完成后安全地绕过其他 goroutine 的结果?

huangapple go评论68阅读模式
英文:

How to safely bypass results from other goroutines when first is completed

问题

我想要向多个服务器请求数据(例如多个读取副本)。在这个任务中,速度最重要,所以应该首先返回第一个结果,其他的结果可以忽略。

我在绕过这个数据的习惯方式上遇到了问题。当主进程退出时(所有较慢的goroutine都没有完成它们的工作),这个问题就没有了。但是当我们取消注释最后一行(使用Sleep函数),我们可以看到其他的goroutine也在工作。

现在我正在通过通道传递数据,有没有不传递它们的方法?

有什么好的和安全的方法来处理这种问题?

package main

import (
	"fmt"
	"log"
	"math/rand"
	"time"
)

type Result int

type Conn struct {
	Id int
}

func (c *Conn) DoQuery(params string) Result {
	log.Println("Querying start", params, c.Id)
	time.Sleep(time.Duration(rand.Int31n(1000)) * time.Millisecond)
	log.Println("Querying end", params, c.Id)

	return Result(1000 + c.Id*c.Id)
}

func Query(conns []Conn, query string) Result {
	ch := make(chan Result)
	for _, conn := range conns {
		go func(c Conn) {
			ch <- c.DoQuery(query)
		}(conn)
	}

	return <-ch
}

func main() {
	conns := []Conn{Conn{1}, Conn{2}, Conn{3}, Conn{4}, Conn{5}}
	result := Query(conns, "query!")
	fmt.Println(result)
    // time.Sleep(time.Minute)
}
英文:

I want to ask several servers for data (e.g. multiple read replicas).
In this task most important is speed, so first result should be served
and all other can be ignored.

I have problem with idiomatic way of bypassing this data. Everything
with this problem is ok when it quits (all slower goroutines are not
finishing their work, because main process exists). But when we uncomment
last line (with Sleep) We can see that other goroutines are doing their work too.

Now I'm pushing data through channel is there any way to not push them?

What is good and safe way of dealing with this kind of problems?

package main

import (
	&quot;fmt&quot;
	&quot;log&quot;
	&quot;math/rand&quot;
	&quot;time&quot;
)

type Result int

type Conn struct {
	Id int
}

func (c *Conn) DoQuery(params string) Result {
	log.Println(&quot;Querying start&quot;, params, c.Id)
	time.Sleep(time.Duration(rand.Int31n(1000)) * time.Millisecond)
	log.Println(&quot;Querying end&quot;, params, c.Id)

	return Result(1000 + c.Id*c.Id)
}

func Query(conns []Conn, query string) Result {
	ch := make(chan Result)
	for _, conn := range conns {
		go func(c Conn) {
			ch &lt;- c.DoQuery(query)
		}(conn)
	}

	return &lt;-ch
}

func main() {
	conns := []Conn{Conn{1}, Conn{2}, Conn{3}, Conn{4}, Conn{5}}
	result := Query(conns, &quot;query!&quot;)
	fmt.Println(result)
    // time.Sleep(time.Minute)
}

答案1

得分: 5

我的建议是将ch设置为带有每个查询一个空间的缓冲通道:ch := make(chan Result, len(conns))。这样每个查询都可以完成,并且不会在通道写入时阻塞。

Query可以读取一次并返回第一个结果。当所有其他goroutine完成时,通道最终会被垃圾回收,所有内容都会消失。使用无缓冲通道,你创建了许多永远无法终止的goroutine。

编辑:
如果你想取消正在进行的请求,可能会变得更加困难。某些操作和API提供了取消功能,而其他一些则没有。对于HTTP请求,你可以在请求结构体上使用Cancel字段。只需提供一个可以关闭的通道来取消:

func (c *Conn) DoQuery(params string, cancel chan struct{}) Result {
    //省略错误处理。正确处理错误很重要。
    req, _ := http.NewRequest(...)
    req.Cancel = cancel
    resp, _ := http.DefaultClient.Do(req)
    //在取消时,请求将返回某种错误。
    return readData(resp)
}
func Query(conns []Conn, query string) Result {
    ch := make(chan Result)
    cancel := make(chan struct{})
    for _, conn := range conns {
        go func(c Conn) {
            ch <- c.DoQuery(query, cancel)
        }(conn)
    }

    first := <-ch
    close(cancel)
    return first
}

这可能有助于取消你不关心的大型请求的读取,但它可能实际上并不会取消远程服务器上的请求。如果你的查询不是HTTP请求,而是数据库调用或其他内容,你需要查看是否有类似的取消机制可供使用。

英文:

My recommendation would be to make ch a buffered channel with one space per query: ch := make(chan Result, len(conns)). This way each query can run to completion, and will not block on the channel write.

Query can read once and return the first result. When all other goroutines complete, the channel will eventually be garbage collected and everything will go away. With your unbuffered channel, you create a lot of goroutines that can never terminate.

EDIT:
If you want to cancel in-flight requests, it can become significantly harder. Some operations and apis provide cancellation, and others don't. With an http request you can use Cancel field on the request struct. Simply provide a channel that you can close to cancel:

func (c *Conn) DoQuery(params string, cancel chan struct{}) Result {
    //error handling omitted. It is important to handle errors properly. 
    req, _ := http.NewRequest(...)
    req.Cancel = cancel
    resp, _ := http.DefaultClient.Do(req)
    //On Cancellation, the request will return an error of some kind.
    return readData(resp)
}
func Query(conns []Conn, query string) Result {
    ch := make(chan Result)
    cancel := make(chan struct{})
    for _, conn := range conns {
        go func(c Conn) {
            ch &lt;- c.DoQuery(query,cancel)
        }(conn)
    }

    first := &lt;-ch
    close(cancel)
    return first
}

This may help if there is a large request to read that you won't care about, but it may or may not actually cancel the request on the remote server. If your query is not http, but a database call or something else, you will need to look into if there is a similar cancellation mechanism you can use.

huangapple
  • 本文由 发表于 2015年11月5日 00:15:46
  • 转载请务必保留本文链接:https://go.coder-hub.com/33526746.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定