Golang检测正在进行的请求。

huangapple go评论92阅读模式
英文:

Golang detect in-flight requests

问题

我想知道是否已经有一个库可以解决这个问题,或者对于以下问题有什么建议:

客户端A请求资源A,这是一个长时间运行的请求,因为资源A很昂贵,并且导致缓存未命中。与此同时,客户端B也请求资源A,现在仍然是缓存未命中,因为客户端A的请求尚未返回并填充缓存。所以,客户端B不应该发出新的请求来生成资源A,而是应该阻塞并在客户端A的请求完成并填充缓存后得到通知。

我认为Group Cache库可能有类似的功能,但我还没有浏览代码来弄清楚它们是如何实现的,我也不想将实现与它绑定并将其用作依赖项。

到目前为止,我唯一的解决方案是一种发布-订阅的方式,我们有一个全局的当前正在处理的请求映射,以reqID作为键。当req1到达时,它将其ID设置在映射中,req2到达并检查其ID是否在映射中,因为它请求的是相同的资源,所以它会在一个通知通道上阻塞。当req1完成时,它会执行以下三个操作:

  1. 从映射中删除其ID
  2. 将条目保存在缓存中
  3. 向通知通道发送带有其ID的广播
    req2接收到通知后,解除阻塞并从缓存中获取。

由于Go语言没有内置的广播支持,可能有一个监听广播通道并为每个请求保持广播订阅者列表的goroutine,或者我们可以将映射更改为reqId => list(broadcastChannelSubscribers)。大致就是这样。

如果你认为使用Go的原语有更好的方法,欢迎提供任何意见。这个解决方案中唯一让我担心的部分是这个全局映射,被锁包围,我认为它很快就会成为一个瓶颈。如果你有一些非锁定的想法,即使它们是概率性的,我也很乐意听到。

英文:

I was wondering if there is already a library to do that or maybe a suggestion which way to go for the following problem:

Client A makes request for resource A, this is a long running request since resource A is expensive and it results in a cache miss. In the meantime client B makes request for resource A, now it's still a cache miss since client A's request hasn't returned and populated the cache yet. so instead of making a new request to generate resource A, client B should block and be notified when client A's request is complete and has populated the cache.

I think the group cache library has something along those lines, but I haven't been able to browse through the code to figure out how they do it, I also don't wanna tie the implementation to it and use it as a dependency.

The only solution I had so far is a pub-sub type of thing, where we have a global map of the current in-flight requests with the reqID as a key. When req1 comes it sets its ID in the map, req2 comes and checks if its id is in the map, since its requesting the same resource it is, so we block on a notifier channel. When req1 finishes it does 3 things:

  1. evicts its ID from the map
  2. saves the entry in the cache
  3. sends a broadcast with its ID to the notifier channel
    req2 receives the notification, unblocks and fetches from the cache.

Since go doesn't have built in support for broadcasts, theres probably 1 grouting listening on the broadcast channel and then keeping a list of subscribers to broadcast to for each request, or maybe we change the map to reqId => list(broadcastChannelSubscribers). Something along those lines.

If you think there is a better way to do it with Go's primitives, any input would be appreciated. The only piece of this solution that bothers me is this global map, surrounded by locks, I assume it quickly is going to become a bottleneck. IF you have some non-locking ideas, even if they are probabilistic Im happy to hear them.

答案1

得分: 3

这让我想起一个问题,有人正在实现类似的东西:

https://stackoverflow.com/questions/30329178/coalescing-items-in-channel

我给出了一个实现这种中间层的例子。我认为这符合你的想法:有一个例程来跟踪对同一资源的请求,并防止它们在并行计算中被重新计算。

如果你有一个单独的例程负责接收请求和管理对缓存的访问,你就不需要显式的锁(尽管在通道中有一个隐藏的锁)。无论如何,我不知道你的应用程序的具体情况,但考虑到你需要检查缓存(可能是锁定的)和(偶尔)执行昂贵的缺失条目计算 - 对映射查找进行锁定对我来说似乎不是一个很大的问题。如果你认为这有帮助,你也可以始终扩展更多这样的中间层例程,但你需要一种确定性的方式来路由请求(这样每个缓存条目都由单个例程管理)。

对不起,我没有给你带来一个万能解决方案,但听起来你已经在解决你的问题的正确道路上了。

英文:

It reminds me of one question where someone was implementing a similar thing:

https://stackoverflow.com/questions/30329178/coalescing-items-in-channel

I gave an answer with an example of implementing such a middle layer. I think this is in line with your ideas: have a routine keeping track of requests for the same resource and prevent them from being recalculating in parallel.

If you have a separate routine responsible for taking requests and managing access to cache, you don't need an explicit lock (there is one buried in a channel though). Anyhow, I don't know specifics of your application, but considering you need to check cache (probably locked) and (occasionally) perform an expensive calculation of missing entry – lock on map lookups doesn't seem like a massive problem to me. You can also always span more such middle layer routines if you think this would help, but you would need a deterministic way of routing the requests (so each cache entry is managed by a single routine).

Sorry for not bringing you a silver bullet solution, but it sounds like you're on a good way of solving your problem anyway.

答案2

得分: 2

缓存和性能问题总是棘手的,你应该始终创建一个基本解决方案进行基准测试,以确保你的假设是正确的。但是,如果我们知道瓶颈是获取资源,并且缓存将带来显著的回报,你可以使用Go的通道来实现排队。假设response是你的资源类型。

type request struct {
    back chan *response
}

func main() {
    c := make(chan request, 10) // 非阻塞
    go func(input chan request) {
        var cached *response
        for _, i := range input {
            if cached == nil { // 只请求一次
                cached = makeLongRunningRequest()
            }
            i.back <- cached
        }
    }(c)

    resp := make(chan *response)

    c <- request{resp} // 缓存未命中
    c <- request{resp} // 将被排队
    c <- request{resp} // 将被排队

    for _, r := range resp {
        // 处理响应
    }
}

在这里,我们只获取一个资源,但你可以为每个要获取的资源启动一个goroutine。Goroutine是廉价的,所以除非你需要同时缓存数百万个资源,否则应该没问题。当然,你也可以在一段时间后终止goroutine。

为了跟踪哪个资源ID属于哪个通道,我会使用一个带有互斥锁的映射。

map[resourceId]chan request

如果获取资源是瓶颈,那么锁定映射的成本应该是可以忽略的。如果锁定映射成为问题,请考虑使用分片映射

总的来说,你的设计看起来已经很不错了。我建议尽量保持设计尽可能简单,并在可能的情况下使用通道而不是锁。它们可以防止可怕的并发错误。

英文:

Caching and perfomance problems are always tricky and you should always make a basic solution to benchmark against to ensure that your assumptions are correct. But if we know that the bottleneck is fetching the resource and that caching will give significant returns you could use Go's channels to implement queuing. Assuming that response is the type of your resource.

type request struct {
     back chan *response
}

func main() {
    c := make(chan request,10) // non-blocking
    go func(input chan request){
        var cached *response
        for _,i := range input {
            if cached == nil { // only make request once
                cached = makeLongRunningRequest()
            }
            i.back &lt;- cached
        }
    }(c)

    resp := make(chan *response)

    c &lt;- request{resp} // cache miss
    c &lt;- request{resp} // will get queued
    c &lt;- request{resp} // will get queued

    for _,r := range resp {
        // do something with response
    }
}

Here we're only fetching one resource but you could start one goroutine for each resource you want to fetch. Goroutines are cheap so unless you need millions of resources cached at the same time you should be ok. You could of course also kill your goroutines after a while.

To keep track of which resource id belongs to which channel, I'd use a map

map[resourceId]chan request

with a mutex. Again, if fetching the resource is the bottle neck then the cost of locking the map should be negligible. If locking the map turns out to be a problem, consider using a sharded map.

In general you seem to be well on your way. I'd advise to try to keep your design as simple as possible and use channels instead of locks when possible. They do protect from terrible concurrency bugs.

答案3

得分: 2

一种解决方案是并发非阻塞缓存,详细讨论可参考《Go语言圣经》第9章。代码示例非常值得一看,因为作者通过多个版本(memo1、memo2等)演示了竞态条件的问题,使用互斥锁保护映射的方法,以及使用通道的版本。

还可以参考https://blog.golang.org/context,该文章介绍了类似的概念,并处理了正在进行的请求的取消。

将内容复制到此答案中是不切实际的,希望链接对您有用。

英文:

One solution is a concurrent non-blocking cache as discussed in detail in The Go Programming Language, chapter 9.

The code samples are well worth a look because the authors take you through several versions (memo1, memo2, etc), illustrating problems of race conditions, using mutexes to protect maps, and a version using just channels.

Also consider https://blog.golang.org/context as it has similar concepts and deals with cancellation of in flight requests.

It's impractical to copy the content into this answer, so hopefully the links are of use.

答案4

得分: 2

这已经是Golang提供的一个功能,称为single flight

对于你的用例,只需在single flight之上添加一些额外的逻辑。考虑下面的代码片段:

func main() {
	http.HandleFunc("/github", func(w http.ResponseWriter, r *http.Request) {
        var key = "facebook"
        var requestGroup singleflight.Group
        // 先在缓存中搜索,如果在缓存中找到则直接返回,否则进行 single flight 请求
        if res, err := searchCache(); err != nil{
            return res
        } 
        // 缓存未命中 -> 进行 single flight 请求,并将结果缓存起来
		v, err, shared := requestGroup.Do(key, func() (interface{}, error) {
			// companyStatus() 返回 string 和 error,满足 interface{}, error 的要求,因此可以直接返回结果。
            if err != nil {
                return interface{}, err
            }
			return companyStatus(), nil
		})
		if err != nil {
			http.Error(w, err.Error(), http.StatusInternalServerError)
			return
		}
        // 在这里设置缓存
        setCache(key, v)

		status := v.(string)

		log.Printf("/Company handler requst: status %q, shared result %t", status, shared)

		fmt.Fprintf(w, "Company Status: %q", status)
	})

	http.ListenAndServe("127.0.0.1:8080", nil)
}

// companyStatus 获取公司的 API 状态
func getCompanyStatus() (string, error) {
	log.Println("Making request to Some API")
	defer log.Println("Request to Some API Complete")

	time.Sleep(1 * time.Second)

	resp, err := http.Get("Get URL")
	if err != nil {
		return "", err
	}
	defer resp.Body.Close()

	if resp.StatusCode != 200 {
		return "", fmt.Errorf("Upstream response: %s", resp.Status)
	}

	r := struct{ Status string }{}

	err = json.NewDecoder(resp.Body).Decode(&r)

	return r.Status, err
}

希望这段代码能够自解释,并且你可以参考Single Flight 官方文档深入了解 single flight。

英文:

This is already provided by Golang as a feature single flight.

For your use case just use some extra logic on top of single flight. Consider the code snippet below:

func main() {
http.HandleFunc(&quot;/github&quot;, func(w http.ResponseWriter, r *http.Request) {
var key = &quot;facebook&quot;
var requestGroup singleflight.Group
// Search The Cache, if found in cache return from cache, else make single flight request
if res, err := searchCache(); err != nil{
return res
} 
// Cache Miss-&gt; Make Single Flight Request, and Cache it
v, err, shared := requestGroup.Do(key, func() (interface{}, error) {
// companyStatus() returns string, error, which statifies interface{}, error, so we can return the result directly.
if err != nil {
return interface{}, err
}
return companyStatus(), nil
})
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
//Set the Cache Here
setCache(key, v)
status := v.(string)
log.Printf(&quot;/Company handler requst: status %q, shared result %t&quot;, status, shared)
fmt.Fprintf(w, &quot;Company Status: %q&quot;, status)
})
http.ListenAndServe(&quot;127.0.0.1:8080&quot;, nil)
}
// companyStatus retrieves Comapny&#39;s API status
func getCompanyStatus() (string, error) {
log.Println(&quot;Making request to Some API&quot;)
defer log.Println(&quot;Request to Some API Complete&quot;)
time.Sleep(1 * time.Second)
resp, err := http.Get(&quot;Get URL&quot;)
if err != nil {
return &quot;&quot;, err
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
return &quot;&quot;, fmt.Errorf(&quot;Upstream response: %s&quot;, resp.Status)
}
r := struct{ Status string }{}
err = json.NewDecoder(resp.Body).Decode(&amp;r)
return r.Status, err
}

I hope the code snippet is self explanatory and you can refer to Single Flight Official Docs to delve deep into single flight.

huangapple
  • 本文由 发表于 2015年6月28日 00:51:19
  • 转载请务必保留本文链接:https://go.coder-hub.com/31091139.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定