如何设置goroutine的优先级

huangapple go评论89阅读模式
英文:

How to prioritize goroutines

问题

我想同时调用两个端点(A和B)。但是如果我从两个端点都得到200的响应,我需要使用来自A的响应,否则使用B的响应。
如果B先返回,我需要等待A,换句话说,只要A返回200,我必须使用A。

你们能帮我设计这个模式吗?

谢谢。

英文:

I want to call two endpoints at the same time (A and B). But if I got a response 200 from both I need to use the response from A otherwise use B response.
If B returns first I need to wait for A, in other words, I must use A whenever A returns 200.

Can you guys help me with the pattern?

Thank you

答案1

得分: 4

等待来自A的结果。如果结果不好,则等待来自B的结果。使用带缓冲的通道来存储B的结果,以便当A的结果好时发送方不会被阻塞。

在下面的代码片段中,fnA()fnB()函数用于向端点发出请求,消耗响应并进行清理。我假设结果是[]byte类型,但也可能是解码JSON或其他类型的结果。这是fnA的示例:

func fnA() ([]byte, error) {
    r, err := http.Get("http://example.com/a")
    if err != nil {
        return nil, err
    }
    defer r.Body.Close() // <-- 重要:关闭响应体!
    if r.StatusCode != 200 {
        return nil, errors.New("bad response")
    }
    return ioutil.ReadAll(r.Body)
}

定义一个类型来保存结果和错误:

type response struct {
    result []byte
    err    error
}

完成这些准备工作后,下面是如何优先处理A而不是B的方法。

a := make(chan response)
go func() {
    result, err := fnA()
    a <- response{result, err}
}()

b := make(chan response, 1) // 大小 > 0 很重要!
go func() {
    result, err := fnB()
    b <- response{result, err}
}()

resp := <-a
if resp.err != nil {
    resp = <-b
    if resp.err != nil {
        // 处理错误。A和B都失败了。
    }
}
result := resp.result

如果应用程序不与A和B并发执行代码,则无需为A使用goroutine:

b := make(chan response, 1) // 大小 > 0 很重要!
go func() {
    result, err := fnB()
    b <- response{result, err}
}()

result, err := fnA()
if err != nil {
    resp = <-b
    if resp.err != nil {
        // 处理错误。A和B都失败了。
    }
    result = resp.result
}
英文:

Wait for a result from A. If the result is not good, then wait from a result from B. Use a buffered channel for the B result so that the sender does not block when A is good.

In the following snippet, fnA() and fnB() functions that issue requests to the endpoints, consume the response and cleanup. I assume that the result is a []byte, but it could be the result of decoding JSON or something else. Here's an example for fnA:

func fnA() ([]byte, error) {
	r, err := http.Get(&quot;http://example.com/a&quot;)
	if err != nil {
		return nil, err
	}
	defer r.Body.Close() // &lt;-- Important: close the response body!
	if r.StatusCode != 200 {
		return nil, errors.New(&quot;bad response&quot;)
	}
	return ioutil.ReadAll(r.Body)
}

Define a type to hold the result and error.

 type response struct {
     result []byte
     err error 
 }

With those preliminaries done, here's how to prioritize A over B.

a := make(chan response)
go func() {
	result, err := fnA()
	a &lt;- response{result, err}
}()

b := make(chan response, 1) // Size &gt; 0 is important!
go func() {
	result, err := fnB()
	b &lt;- response{result, err}
}()

resp := &lt;-a
if resp.err != nil {
	resp = &lt;-b
    if resp.err != nil {
        // handle error.  A and B both failed.
    }
}
result := resp.result

If the application does not execute code concurrently with A and B, then there's no need to use a goroutine for A:

b := make(chan response, 1) // Size &gt; 0 is important!
go func() {
	result, err := fnB()
	b &lt;- response{result, err}
}()

result, err := fnA()
if err != nil {
	resp = &lt;-b
    if resp.err != nil {
        // handle error.  A and B both failed.
    }
    result = resp.result
}

答案2

得分: 1

我建议你使用类似这样的代码,这是一个庞大的解决方案,但你可以根据需要启动多个端点。

func endpointPriorityTest() {

	const (
		sourceA = "a"
		sourceB = "b"
		sourceC = "c"
	)

	type endpointResponse struct {
		source   string
		response *http.Response
		err      error
	}

	epResponseChan := make(chan *endpointResponse)

	endpointsMap := map[string]string{
		sourceA: "https://jsonplaceholder.typicode.com/posts/1",
		sourceB: "https://jsonplaceholder.typicode.com/posts/10",
		sourceC: "https://jsonplaceholder.typicode.com/posts/100",
	}

	for source, endpointURL := range endpointsMap {
		source := source
		endpointURL := endpointURL
		go func(respChan chan<- *endpointResponse) {
			// You can add a delay so that the response from A takes longer than from B
			// and look to the result map
			// if source == sourceA {
			// 	time.Sleep(time.Second)
			// }
			resp, err := http.Get(endpointURL)

			respChan <- &endpointResponse{
				source:   source,
				response: resp,
				err:      err,
			}
		}(epResponseChan)
	}

	respCache := make(map[string]*http.Response)

	// Reading endpointURL responses from chan
	for epResp := range epResponseChan {
		// Skips failed requests
		if epResp.err != nil {
			continue
		}

		// Save successful response to cache map
		respCache[epResp.source] = epResp.response

		// Interrupt reading channel if we've got a response from source A
		if epResp.source == sourceA {
			break
		}
	}

	fmt.Println("result map:", respCache)

	// Now we can use data from cache map
	// resp, ok := respCache[sourceA]
	// if ok{
	// 	...
	// }
}

这段代码是一个示例,它使用goroutine并发地请求多个端点,并将响应保存在缓存中。你可以根据需要修改和扩展它。

英文:

I'm suggesting you to use something like this, this is a bulky solution, but there you can start more than two endpoints for you needs.

func endpointPriorityTest() {
const (
sourceA = &quot;a&quot;
sourceB = &quot;b&quot;
sourceC = &quot;c&quot;
)
type endpointResponse struct {
source   string
response *http.Response
error
}
epResponseChan := make(chan *endpointResponse)
endpointsMap := map[string]string{
sourceA: &quot;https://jsonplaceholder.typicode.com/posts/1&quot;,
sourceB: &quot;https://jsonplaceholder.typicode.com/posts/10&quot;,
sourceC: &quot;https://jsonplaceholder.typicode.com/posts/100&quot;,
}
for source, endpointURL := range endpointsMap {
source := source
endpointURL := endpointURL
go func(respChan chan&lt;- *endpointResponse) {
// You can add a delay so that the response from A takes longer than from B
// and look to the result map
// if source == sourceA {
// 	time.Sleep(time.Second)
// }
resp, err := http.Get(endpointURL)
respChan &lt;- &amp;endpointResponse{
source:   source,
response: resp,
error:    err,
}
}(epResponseChan)
}
respCache := make(map[string]*http.Response)
// Reading endpointURL responses from chan
for epResp := range epResponseChan {
// Skips failed requests
if epResp.error != nil {
continue
}
// Save successful response to cache map
respCache[epResp.source] = epResp.response
// Interrupt reading channel if we&#39;ve got an response from source A
if epResp.source == sourceA {
break
}
}
fmt.Println(&quot;result map: &quot;, respCache)
// Now we can use data from cache map
// resp, ok :=respCache[sourceA]
// if ok{
// 	...
// }
}

答案3

得分: 0

通常在Go语言中,通道(channel)用于在goroutine之间进行通信。您可以使用以下示例代码来编排您的场景。基本上,您将通道传递给callB函数,该函数将保存响应。您不需要在goroutine中运行callA,因为您始终需要来自该端点/服务的结果。

package main

import (
	"fmt"
	"time"
)

func main() {
	resB := make(chan int)
	go callB(resB)
	res := callA()
	if res == 200 {
		fmt.Print("No Need for B")
	} else {
		res = <-resB
		fmt.Printf("Response from B: %d", res)
	}
}

func callA() int {
	time.Sleep(1000)
	return 200
}

func callB(res chan int) {
	time.Sleep(500)
	res <- 200
}

更新:根据评论中的建议,上述代码存在"callB"泄漏的问题。

package main

import (
	"fmt"
	"time"
)

func main() {
	resB := make(chan int, 1)
	go callB(resB)
	res := callA()
	if res == 200 {
		fmt.Print("No Need for B")
	} else {
		res = <-resB
		fmt.Printf("Response from B: %d", res)
	}
}

func callA() int {
	time.Sleep(1000 * time.Millisecond)
	return 200
}

func callB(res chan int) {
	time.Sleep(500 * time.Millisecond)
	res <- 200
}
英文:

Normally in golang, channel are used for communicating between goroutines.
You can orchestrate your scenario with following sample code.
basically you pass channel into your callB which will hold response. You don't need to run callA in goroutine as you always need result from that endpoint/service

package main
import (
&quot;fmt&quot;
&quot;time&quot;
)
func main() {
resB := make(chan int)
go callB(resB)
res := callA()
if res == 200 {
fmt.Print(&quot;No Need for B&quot;)
} else {
res = &lt;-resB
fmt.Printf(&quot;Response from B : %d&quot;, res)
}
}
func callA() int {
time.Sleep(1000)
return 200
}
func callB(res chan int) {
time.Sleep(500)
res &lt;- 200
}

Update: As suggestion given in comment, above code leaks "callB"

package main
import (
&quot;fmt&quot;
&quot;time&quot;
)
func main() {
resB := make(chan int, 1)
go callB(resB)
res := callA()
if res == 200 {
fmt.Print(&quot;No Need for B&quot;)
} else {
res = &lt;-resB
fmt.Printf(&quot;Response from B : %d&quot;, res)
}
}
func callA() int {
time.Sleep(1000 * time.Millisecond)
return 200
}
func callB(res chan int) {
time.Sleep(500 * time.Millisecond)
res &lt;- 200
}

答案4

得分: 0

@Zombo的答案有正确的逻辑流程。在此基础上,我建议添加一个:利用context包。

基本上,任何可能阻塞的任务都应该使用context.Context,以便在提前取消的情况下,调用链能够执行更高效的清理操作。

在你的情况下,context.Context还可以用来在A调用成功时提前中止B调用:

func failoverResult(ctx context.Context) *http.Response {

    // 包装(父)上下文
    ctx, cancel := context.WithCancel(ctx)
 
    // 如果我们提前返回,即如果`fnA()`先完成,
    // 这将“取消”`fnB()`的请求。
    defer cancel()

    b := make(chan *http.Response, 1)
    go func() {
        b <- fnB(ctx)
    }()

    resp := fnA(ctx)
    if resp.StatusCode != 200 {
        resp = <-b
    }

    return resp
}

fnA(和fnB)的代码可能如下所示:

func fnA(ctx context.Context) (resp *http.Response) {
    req, _ := http.NewRequestWithContext(ctx, "GET", aUrl)
    
    resp, _ = http.DefaultClient.Do(req)  // TODO: 检查错误
    return
}
英文:

@Zombo 's answer has the correct logic flow. Piggybacking off this, I would suggest one addition: leveraging the context package.

Basically, any potentially blocking tasks should use context.Context to allow the call-chain to perform more efficient clean-up in the event of early cancelation.

context.Context also can be leveraged, in your case, to abort the B call early if the A call succeeds:

func failoverResult(ctx context.Context) *http.Response {
// wrap the (parent) context
ctx, cancel := context.WithCancel(ctx)
// if we return early i.e. if `fnA()` completes first
// this will &quot;cancel&quot; `fnB()`&#39;s request.
defer cancel()
b := make(chan *http.Response, 1)
go func() {
b &lt;- fnB(ctx)
}()
resp := fnA(ctx)
if resp.StatusCode != 200 {
resp = &lt;-b
}
return resp
}

fnA (and fnB) would look something like this:

func fnA(ctx context.Context) (resp *http.Response) {
req, _ := http.NewRequestWithContext(ctx, &quot;GET&quot;, aUrl)
resp, _ = http.DefaultClient.Do(req)  // TODO: check errors
return
}

huangapple
  • 本文由 发表于 2022年2月1日 06:16:44
  • 转载请务必保留本文链接:https://go.coder-hub.com/70933481.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定