使用sync.WaitGroup.wait时如何实现超时?

huangapple go评论86阅读模式
英文:

How to implement a timeout when using sync.WaitGroup.wait?

问题

我遇到了一个情况,我想要追踪一些goroutine,直到它们在特定点同步,例如当所有的URL都被获取时。然后,我们可以将它们全部放在一起,并按特定的顺序显示。

我认为这里可以使用屏障,使用gosync.WaitGroup来实现。然而,在实际情况中,我们无法确保所有的获取操作在短时间内都成功。因此,我想在等待获取操作时引入一个超时机制。

我是一个对Golang不太熟悉的新手,所以有人可以给我一些建议吗?


我期望的代码示例是这样的:

wg := &sync.WaitGroup{}
select {
case <-wg.Wait():
    // 所有操作完成!
case <-time.After(500 * time.Millisecond):
    // 超时处理。
}

我知道Wait方法不支持通道(Channel)。

英文:

I have come across a situation that i want to trace some goroutine to sync on a specific point, for example when all the urls are fetched. Then, we can put them all and show them in specific order.

I think this is the barrier comes in. It is in go with sync.WaitGroup. However, in real situation that we can not make sure that all the fetch operation will succeed in a short time. So, i want to introduce a timeout when wait for the fetch operations.

I am a newbie to Golang, so can someone give me some advice?


What i am looking for is like this:

   wg := &amp;sync.WaigGroup{}
   select {
   case &lt;-wg.Wait():
   // All done!
   case &lt;-time.After(500 * time.Millisecond):
   // Hit timeout.
   }

I know Wait do not support Channel.

答案1

得分: 34

如果你只想要一个整洁的选择,你可以通过生成一个调用方法并在完成后关闭/发送通道的例程来将阻塞函数转换为通道。

done := make(chan struct{})
go func() {
   wg.Wait()
   close(done)
}()

select {
case <-done:
// 全部完成!
case <-time.After(500 * time.Millisecond):
// 超时。
}
英文:

If all you want is your neat select, you can easily convert blocking function to a channel by spawning a routine which calls a method and closes/sends on channel once done.

done := make(chan struct{})
go func() {
   wg.Wait()
   close(done)
}()

select {
case &lt;-done:
// All done!
case &lt;-time.After(500 * time.Millisecond):
// Hit timeout.
}

答案2

得分: 2

将你的结果发送到一个缓冲通道,足够容纳所有结果,而不会阻塞,并在主线程的for-select循环中读取它们:

func work(msg string, d time.Duration, ret chan<- string) {
    time.Sleep(d) // 模拟工作。
    select {
    case ret <- msg:
    default:
    }
}

// ...

const N = 2
ch := make(chan string, N)

go work("printed", 100*time.Millisecond, ch)
go work("not printed", 1000*time.Millisecond, ch)

timeout := time.After(500 * time.Millisecond)
loop:
for received := 0; received < N; received++ {
    select {
    case msg := <-ch:
        fmt.Println(msg)
    case <-timeout:
        fmt.Println("timeout!")
        break loop
    }
}

Playground: http://play.golang.org/p/PxeEEJo2dz.

参考资料:Go并发模式:超时,继续进行.

英文:

Send your results to a buffered channel enough to take all results, without blocking, and read them in for-select loop in the main thread:

func work(msg string, d time.Duration, ret chan&lt;- string) {
	time.Sleep(d) // Work emulation.
	select {
	case ret &lt;- msg:
	default:
	}
}

// ...

const N = 2
ch := make(chan string, N)

go work(&quot;printed&quot;, 100*time.Millisecond, ch)
go work(&quot;not printed&quot;, 1000*time.Millisecond, ch)

timeout := time.After(500 * time.Millisecond)
loop:
for received := 0; received &lt; N; received++ {
	select {
	case msg := &lt;-ch:
		fmt.Println(msg)
	case &lt;-timeout:
		fmt.Println(&quot;timeout!&quot;)
		break loop
	}
}

Playground: http://play.golang.org/p/PxeEEJo2dz.

See also: Go Concurrency Patterns: Timing out, moving on.

答案3

得分: 1

另一种方法是在内部进行监控。你的问题有些限制,但我假设你是通过循环启动goroutine的,即使你没有,你也可以重构代码以适应你的需求。你可以使用以下两个示例中的任意一个:第一个示例将为每个请求设置超时时间,第二个示例将为整个请求批次设置超时时间,并在超时后继续执行。

var wg sync.WaitGroup
wg.Add(1)
go func() {
    success := make(chan struct{}, 1)
    go func() {
        // 发送请求并等待响应
        // 假设已收到响应
        time.Sleep(5 * time.Second)
        success <- struct{}{}
        // goroutine 在返回后会正常关闭
        fmt.Println("正常返回")
    }()

    select {
    case <-success:
        break
    case <-time.After(1 * time.Second):
        break
    }

    wg.Done()
    // 所有资源应该被垃圾回收,不再占用空间
}()

wg.Wait()

// 处理获取到的结果
fmt.Println("完成")
time.Sleep(10 * time.Second)
fmt.Println("在 limbo goroutine 完成后检查是否有错误抛出")

如果你只是想要一个通用的简单方法来设置所有请求的超时时间你可以这样做

```go
var wg sync.WaitGroup
waiter := make(chan int)
wg.Add(1)
go func() {
    success := make(chan struct{}, 1)
    go func() {
        // 发送请求并等待响应
        // 假设已收到响应
        time.Sleep(5 * time.Second)
        success <- struct{}{}
        // goroutine 在返回后会正常关闭
        fmt.Println("正常返回")
    }()

    select {
    case <-success:
        break
    case <-time.After(1 * time.Second):
        // 控制每个请求的超时时间,以确保调用 wg.Done() 并让持有 .Wait 的 goroutine 关闭
        break
    }
    wg.Done()
    // 所有资源应该被垃圾回收,不再占用空间
}()

completed := false
go func(completed *bool) {
    // 使用任意一个等待方式解除阻塞
    wg.Wait()
    if !*completed {
        waiter <- 1
        *completed = true
    }
    fmt.Println("返回 Two")
}(&completed)

go func(completed *bool) {
    // 等待一段时间
    time.Sleep(time.Second * 5)
    if !*completed {
        waiter <- 1
        *completed = true
    }
    fmt.Println("返回 One")
}(&completed)

// 阻塞,直到超时或 .Wait 停止阻塞
<-waiter

// 处理获取到的结果
fmt.Println("完成")
time.Sleep(10 * time.Second)
fmt.Println("在 limbo goroutine 完成后检查是否有错误抛出")

这样你的 WaitGroup 将保持同步不会有任何 goroutine 被遗留在 limbo 状态

你可以在这里尝试运行代码http://play.golang.org/p/g0J_qJ1BUT,你可以更改变量来观察不同的运行结果。

编辑我在使用手机如果有人能修复格式问题那就太好了谢谢

<details>
<summary>英文:</summary>

Another way to do it would be to monitor it internally, your question is limited but I&#39;m going to assume you&#39;re starting your goroutines through a loop even if you&#39;re not you can refactor this to work for you but you could do one of these 2 examples, the first one will timeout each request to timeout individually and the second one will timeout the entire batch of requests and move on if too much time has passed 

    var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		success := make(chan struct{}, 1)
		go func() {
			// send your request and wait for a response
			// pretend response was received
		    time.Sleep(5 * time.Second)
			success &lt;- struct{}{}
			// goroutine will close gracefully after return		
            fmt.Println(&quot;Returned Gracefully&quot;)
		}()

		select {
		case &lt;-success:
			break
		case &lt;-time.After(1 * time.Second):
			break
		}
        
		wg.Done()
		// everything should be garbage collected and no longer take up space
	}()

	wg.Wait()

	// do whatever with what you got	
    fmt.Println(&quot;Done&quot;)
    time.Sleep(10 * time.Second)
    fmt.Println(&quot;Checking to make sure nothing throws errors after limbo goroutine is done&quot;)

Or if you just want a general easy way to timeout ALL requests you could do something like

    var wg sync.WaitGroup
    waiter := make(chan int)
    wg.Add(1)
    go func() {
        success := make(chan struct{}, 1)
        go func() {
            // send your request and wait for a response
            // pretend response was received
            time.Sleep(5 * time.Second)
            success &lt;- struct{}{}
            // goroutine will close gracefully after return     
            fmt.Println(&quot;Returned Gracefully&quot;)
        }()
    
        select {
        case &lt;-success:
            break
        case &lt;-time.After(1 * time.Second):
            // control the timeouts for each request individually to make sure that wg.Done gets called and will let the goroutine holding the .Wait close
            break
        }
        wg.Done()
        // everything should be garbage collected and no longer take up space
    }()
    
    completed := false
	go func(completed *bool) {
		// Unblock with either wait
		wg.Wait()
		if !*completed {
			waiter &lt;- 1			
            *completed = true
		}		
        fmt.Println(&quot;Returned Two&quot;)
	}(&amp;completed)

	go func(completed *bool) {
		// wait however long
		time.Sleep(time.Second * 5)
		if !*completed {
			waiter &lt;- 1			
            *completed = true
		}		
        fmt.Println(&quot;Returned One&quot;)
	}(&amp;completed)


     // block until it either times out or .Wait stops blocking 
     &lt;-waiter

    // do whatever with what you got    
    fmt.Println(&quot;Done&quot;)
    time.Sleep(10 * time.Second)
    fmt.Println(&quot;Checking to make sure nothing throws errors after limbo goroutine is done&quot;)

This way your WaitGroup will stay in sync and you won&#39;t have any goroutines left in limbo 

http://play.golang.org/p/g0J_qJ1BUT try it here you can change the variables around to see it work differently 

Edit: I&#39;m on mobile If anybody could fix the formatting that would be great thanks. 


</details>



# 答案4
**得分**: 1

如果您想避免将并发逻辑与业务逻辑混合在一起我写了这个库https://github.com/shomali11/parallelizer来帮助您解决这个问题。它封装了并发逻辑,因此您不必担心它。

所以在您的示例中

    package main

    import (
	    "github.com/shomali11/parallelizer"
	    "fmt"
    )

    func main() {
        urls := []string{ ... }
        results = make([]*HttpResponse, len(urls)

        options := &Options{ Timeout: time.Second }
    	group := parallelizer.NewGroup(options)
        for index, url := range urls {
    	    group.Add(func(index int, url string, results *[]*HttpResponse) {
		        return func () {
                    ...

                    results[index] = &HttpResponse{url, response, err}
                }
	        }(index, url, &results))
        }

	    err := group.Run()

	    fmt.Println("Done")
	    fmt.Println(fmt.Sprintf("Results: %v", results))
	    fmt.Printf("Error: %v", err) // 如果完成则为nil,如果超时则为err
    }

<details>
<summary>英文:</summary>

If you would like to avoid mixing concurrency logic with business logic, I wrote this library https://github.com/shomali11/parallelizer to help you with that. It encapsulates the concurrency logic so you do not have to worry about it.

So in your example:

    package main

    import (
	    &quot;github.com/shomali11/parallelizer&quot;
	    &quot;fmt&quot;
    )

    func main() {
        urls := []string{ ... }
        results = make([]*HttpResponse, len(urls)

        options := &amp;Options{ Timeout: time.Second }
    	group := parallelizer.NewGroup(options)
        for index, url := range urls {
    	    group.Add(func(index int, url string, results *[]*HttpResponse) {
		        return func () {
                    ...

                    results[index] = &amp;HttpResponse{url, response, err}
                }
	        }(index, url, &amp;results))
        }

	    err := group.Run()

	    fmt.Println(&quot;Done&quot;)
	    fmt.Println(fmt.Sprintf(&quot;Results: %v&quot;, results))
	    fmt.Printf(&quot;Error: %v&quot;, err) // nil if it completed, err if timed out
    }

</details>



huangapple
  • 本文由 发表于 2015年12月21日 19:52:18
  • 转载请务必保留本文链接:https://go.coder-hub.com/34395060.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定