Golang清理不会被读取的通道中的项目。

huangapple go评论84阅读模式
英文:

Golang clean items out of channel that will not be read

问题

我正在开发一个HTTP端点,它将接收来自客户端的请求,并在接收到来自另一个服务器的"ack"确认请求之前阻塞,或者在超时后解除阻塞。我的代码与服务器之间的通信不包含在此示例中,但你可以假设对于每个请求,最终可能会收到一个ack确认。

由于在短时间内会有许多请求通过我的模块,我不能假设给定的ack与我正在阻塞的请求相关。编辑:这里需要澄清一下,因为它引起了一些混淆。请求和ack都是由控制器从外部来源接收的。这就是为什么我要异步处理它们的原因。/编辑因此,如果ack与请求无关,我的代码会将其放回通道中。还需要注意的是,http.ListenAndServe会异步调用我的函数。

如果请求在超时之前得到确认,就没有问题。然而,如果ack在超时之后才到达,它将被添加到通道中并且永远不会被移除。这将导致通道填满。我不敢使用"cancel"通道,因为也有可能对于某个请求不会收到任何ack,这样取消通道也会填满。

**问题:**我该如何防止迟到的ack填满我的通道?/我该如何识别和移除迟到的ack?

以下是代码。没有play.golang.org的链接,因为http.ListenAndServe :/

package main

import (
	"fmt"
	"net/http"
	"time"
)

const timeout = 10

func startEndpoint(w http.ResponseWriter, r *http.Request) {
	var ack string
	timer := time.NewTimer(time.Second * timeout)
	defer timer.Stop()

	m := r.RequestURI[len("/start/"):]
	fmt.Print(m)
AckRecycle:
	for {
		select {
		case ack = <-acks:
			if ack == m {
				//我们找到了我们自己的ack
				fmt.Print("+")
				w.Write([]byte("Ack received for " + ack))
				break AckRecycle
			} else {
				//我们在通道上找到的不是我们的ack
				fmt.Print(".")
				time.Sleep(time.Millisecond * 100)
				acks <- ack
			}
		case <-timer.C:
			//我们等待ack的时间已经用完
			w.Write([]byte("Timeout waiting for " + m))
			break AckRecycle
		default:
			//通道是空的
			fmt.Print("-")
			time.Sleep(time.Millisecond * 100)
		}
	}
	return
}

func ackEndpoint(w http.ResponseWriter, r *http.Request) {
	ack := r.RequestURI[len("/ack/"):]
	acks <- ack
	fmt.Print("Ack for " + ack)
	w.Write([]byte("Thanks!"))
	return
}

var acks = make(chan string, 10)

func main() {
	http.HandleFunc("/ack/", ackEndpoint)
	http.HandleFunc("/start/", startEndpoint)

	http.ListenAndServe("127.0.0.1:8888", nil)
}

注意:要测试这个代码,在本地机器上运行它。使用Curl/Wget命令127.0.0.1:8888/start/bob,然后使用Curl/Wget命令127.0.0.1:8888/ack/bob。你可以将bob替换为任何字符串以查看行为。

我对Go还不熟悉。欢迎在评论中提供其他反馈。

英文:

I am working on an http endpoint that will receive a request from a client and block until it receives an "ack" for that request from another server or until it passes a timeout. The communication between my code and the server is not included in this sample, but you can assume that for each request, an ack may be received eventually.

Since many requests will pass through my module in short periods of time, I cannot assume that a given ack is related the the request I am blocking on. EDIT: Clarification here since it has caused some confusion. Both requests and acks are received by the controller from external sources. This is why I am handling them asychronously. /EDIT For this reason, my code places acks back on the channel if they are not relevant. It is also important to note that http.ListenAndServe calls my functions asynchronously.

If the request is acked within the timeout, there are no problems. However, it the ack comes after the timeout has passed, it will be added to a channel and never removed. This will cause the channel to fill up. I am afraid to use a "cancel" channel because it is also possible that no ack will be received for a given request, causing the cancel channel to fill as well.

The Question: How can I keep late acks from filling my channel?/How can I identify and remove late acks?

Code below. No play.golang.org link because http.ListenAndServe :/

package main
import (
&quot;fmt&quot;
&quot;net/http&quot;
&quot;time&quot;
)
const timeout = 10
func startEndpoint(w http.ResponseWriter, r *http.Request) {
var ack string
timer := time.NewTimer(time.Second * timeout)
defer timer.Stop()
m := r.RequestURI[len(&quot;/start/&quot;):]
fmt.Print(m)
AckRecycle:
for {
select {
case ack = &lt;-acks:
if ack == m {
//What we found was our own ack
fmt.Print(&quot;+&quot;)
w.Write([]byte(&quot;Ack received for &quot; + ack))
break AckRecycle
} else {
//What we found on the channel wasn&#39;t for us
fmt.Print(&quot;.&quot;)
time.Sleep(time.Millisecond * 100)
acks &lt;- ack
}
case &lt;-timer.C:
//We ran out of time waiting for our ack
w.Write([]byte(&quot;Timeout waiting for &quot; + m))
break AckRecycle
default:
//Channel was empty
fmt.Print(&quot;-&quot;)
time.Sleep(time.Millisecond * 100)
}
}
return
}
func ackEndpoint(w http.ResponseWriter, r *http.Request) {
ack := r.RequestURI[len(&quot;/ack/&quot;):]
acks &lt;- ack
fmt.Print(&quot;Ack for &quot; + ack)
w.Write([]byte(&quot;Thanks!&quot;))
return
}
var acks = make(chan string, 10)
func main() {
http.HandleFunc(&quot;/ack/&quot;, ackEndpoint)
http.HandleFunc(&quot;/start/&quot;, startEndpoint)
http.ListenAndServe(&quot;127.0.0.1:8888&quot;, nil)
}

NOTE: To test this, run it on your local machine. Curl/Wget 127.0.0.1:8888/start/bob and then Curl/Wget 127.0.0.1:8888/ack/bob. You can replace bob with any string to see the behavior.

I'm new to Go. Feel free to provide other feedback in the comments.

答案1

得分: 1

保持一个“正在处理中的uuid映射”的map;当你收到一个/start/请求时,将其添加到映射中,当你收到一个ack(或者当请求超时)时,将其从映射中移除。如果你收到一个不在映射中的ack,立即丢弃它。

请注意,默认情况下,map不是线程安全的。

英文:

Keep a map of "uuids in process"; when you receive a /start/ add it to the map, and when you receive an ack (or when the request times out) remove it. If you receive an ack that isn't in the map, discard it immediately.

Be careful, as maps are not thread-safe by default.

答案2

得分: 0

似乎你正在尝试以Akka的异步风格编写Go代码。这是一个困难的选择;按照惯用的Go方式应该更容易。问题出在这里:"我不能假设给定的确认与我正在阻塞的请求相关联。"

相反,你需要使用简单的顺序步骤处理每个请求,并直接将确认发送回客户端。为此,每个请求都需要有自己的服务goroutine。

英文:

It seems like you might be trying to write Go in the asynchronous style of Akka. This is a hard choice; idiomatic Go should be easier. The flaw is here: "I cannot assume that a given ack is related the the request I am blocking on".

Instead, you need to handle each request with simple sequential steps and send the ack directly back to its client. For this, each request would need its own service goroutine.

huangapple
  • 本文由 发表于 2015年1月20日 05:13:19
  • 转载请务必保留本文链接:https://go.coder-hub.com/28033565.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定