取消未完成的goroutine是否可能?

huangapple go评论93阅读模式
英文:

Is it possible to cancel unfinished goroutines?

问题

考虑一组检查任务,每个任务都有独立的逻辑,因此它们似乎可以并发运行,例如:

type Work struct {
	// ...
}

// 这个检查可能非常耗时
func (w *Work) Check() bool {
	// 返回成功或失败

	//...
}

func CheckAll(works []*Work) {
	num := len(works)
	results := make(chan bool, num)
	for _, w := range works {
		go func(w *Work) {
			results <- w.Check()
		}(w)
	}

	for i := 0; i < num; i++ {
		if r := <-results; !r {
			ReportFailed()
			break;
		}
	}
}

func ReportFailed() {
	// ...
}

当关注results时,如果逻辑是无论哪个任务失败,我们都认为所有任务都失败,那么通道中剩余的值就没有用了。让剩余的未完成的goroutine继续运行并向通道发送结果是没有意义的,而且是浪费的,特别是当w.Check()非常耗时时。理想的效果类似于:

	for _, w := range works {
		if !w.Check() {
			ReportFailed()
			break;
		}
	}

这只运行必要的检查任务,然后中断,但是在顺序非并发的情况下。

那么,有没有可能取消这些未完成的goroutine或者向通道发送消息?

英文:

Consider a group of check works, each of which has independent logic, so they seem to be good to run concurrently, like:

type Work struct {
	// ...
}

// This Check could be quite time-consuming
func (w *Work) Check() bool {
	// return succeed or not

	//...
}

func CheckAll(works []*Work) {
	num := len(works)
	results := make(chan bool, num)
	for _, w := range works {
		go func(w *Work) {
			results &lt;- w.Check()
		}(w)
	}

	for i := 0; i &lt; num; i++ {
		if r := &lt;-results; !r {
			ReportFailed()
			break;
		}
	}
}

func ReportFailed() {
	// ...
}

When concerned about the results, if the logic is no matter which one work fails, we assert all works totally fail, the remaining values in the channel are useless. Let the remaining unfinished goroutines continue to run and send results to the channel is meaningless and waste, especially when w.Check() is quite time-consuming. The ideal effect is similar to:

	for _, w := range works {
		if !w.Check() {
			ReportFailed()
			break;
		}
	}

This only runs necessary check works then break, but is in sequential non-concurrent scenario.

So, is it possible to cancel these unfinished goroutines, or sending to channel?

答案1

得分: 5

取消发送操作

您最初的问题是如何取消发送操作。在通道上进行发送操作基本上是“即时”的。如果通道的缓冲区已满且没有准备好的接收方,发送操作将阻塞。

您可以通过使用select语句和一个cancel通道来“取消”此发送操作,例如:

cancel := make(chan struct{})

select {
case ch <- value:
case <-cancel:
}

在另一个goroutine上使用close(cancel)关闭cancel通道将使上述select语句放弃对ch的发送(如果它正在阻塞)。

但是正如前面所说,发送操作在“准备就绪”的通道上是“即时”的,并且发送操作首先评估要发送的值:

results <- w.Check()

这首先要运行w.Check(),一旦完成,它的返回值将被发送到results

取消函数调用

因此,您真正需要的是取消w.Check()方法调用。为此,惯用的方法是传递一个可以取消的context.Context值,并且w.Check()本身必须监视并“遵守”此取消请求。

请参阅https://stackoverflow.com/questions/62002729/terminating-function-execution-if-a-context-is-cancelled/62002965#62002965

请注意,您的函数必须显式支持此功能。没有函数调用或goroutine的隐式终止,请参阅https://stackoverflow.com/questions/28240133/cancel-a-blocking-operation-in-go/28240299#28240299。

因此,您的Check()应该类似于以下内容:

// This Check could be quite time-consuming
func (w *Work) Check(ctx context.Context, workDuration time.Duration) bool {
    // Do your thing and monitor the context!

    select {
    case <-ctx.Done():
        return false
    case <-time.After(workDuration): // Simulate work
        return true
    case <-time.After(2500 * time.Millisecond): // Simulate failure after 2.5 sec
        return false
    }
}

CheckAll()可能如下所示:

func CheckAll(works []*Work) {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    num := len(works)
    results := make(chan bool, num)

    wg := &sync.WaitGroup{}
    for i, w := range works {
        workDuration := time.Second * time.Duration(i)
        wg.Add(1)
        go func(w *Work) {
            defer wg.Done()
            result := w.Check(ctx, workDuration)
            // You may check and return if context is cancelled
            // so result is surely not sent, I omitted it here.
            select {
            case results <- result:
            case <-ctx.Done():
                return
            }
        }(w)
    }

    go func() {
        wg.Wait()
        close(results) // This allows the for range over results to terminate
    }()

    for result := range results {
        fmt.Println("Result:", result)
        if !result {
            cancel()
            break
        }
    }
}

进行测试:

CheckAll(make([]*Work, 10))

输出(在Go Playground上尝试):

Result: true
Result: true
Result: true
Result: false

我们打印了3次true(在2.5秒内完成的工作),然后失败模拟开始,返回false,并终止所有其他作业。

请注意,上述示例中的sync.WaitGroup并不是严格必需的,因为results具有足够容量来容纳所有结果,但通常这仍然是一个良好的做法(以防将来使用较小的缓冲区)。

参考资料:https://stackoverflow.com/questions/45500836/close-multiple-goroutine-if-an-error-occurs-in-one-in-go/45502591#45502591

英文:

Cancelling a (blocking) send

Your original question asked how to cancel a send operation. A send on a channel is basically "instant". A send on a channel blocks if the channel's buffer is full and there is no ready receiver.

You can "cancel" this send by using a select statement and a cancel channel which you close, e.g.:

cancel := make(chan struct{})

select {
case ch &lt;- value:
case &lt;- cancel:
}

Closing the cancel channel with close(cancel) on another goroutine will make the above select abandon the send on ch (if it's blocking).

But as said, the send is "instant" on a "ready" channel, and the send first evaluates the value to be sent:

results &lt;- w.Check()

This first has to run w.Check(), and once it's done, its return value will be sent on results.

Cancelling a function call

So what you really need is to cancel the w.Check() method call. For that, the idiomatic way is to pass a context.Context value which you can cancel, and w.Check() itself must monitor and "obey" this cancellation request.

See https://stackoverflow.com/questions/62002729/terminating-function-execution-if-a-context-is-cancelled/62002965#62002965

Note that your function must support this explicitly. There is no implicit termination of function calls or goroutines, see https://stackoverflow.com/questions/28240133/cancel-a-blocking-operation-in-go/28240299#28240299.

So your Check() should look something like this:

// This Check could be quite time-consuming
func (w *Work) Check(ctx context.Context, workDuration time.Duration) bool {
	// Do your thing and monitor the context!

	select {
	case &lt;-ctx.Done():
		return false
	case &lt;-time.After(workDuration): // Simulate work
		return true
	case &lt;-time.After(2500 * time.Millisecond): // Simulate failure after 2.5 sec
		return false
	}
}

And CheckAll() may look like this:

func CheckAll(works []*Work) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	num := len(works)
	results := make(chan bool, num)

	wg := &amp;sync.WaitGroup{}
	for i, w := range works {
		workDuration := time.Second * time.Duration(i)
		wg.Add(1)
		go func(w *Work) {
			defer wg.Done()
			result := w.Check(ctx, workDuration)
			// You may check and return if context is cancelled
			// so result is surely not sent, I omitted it here.
			select {
			case results &lt;- result:
			case &lt;-ctx.Done():
				return
			}
		}(w)
	}

	go func() {
		wg.Wait()
		close(results) // This allows the for range over results to terminate
	}()

	for result := range results {
		fmt.Println(&quot;Result:&quot;, result)
		if !result {
			cancel()
			break
		}
	}
}

Testing it:

CheckAll(make([]*Work, 10))

Output (try it on the Go Playground):

Result: true
Result: true
Result: true
Result: false

We get true printed 3 times (works that complete under 2.5 seconds), then the failure simulation kicks in, returns false, and terminates all other jobs.

Note that the sync.WaitGroup in the above example is not strictly needed as results has a buffer capable of holding all results, but in general it's still good practice (should you use a smaller buffer in the future).

See related: https://stackoverflow.com/questions/45500836/close-multiple-goroutine-if-an-error-occurs-in-one-in-go/45502591#45502591

答案2

得分: 1

简短回答是:不可以

除非goroutine本身达到return或堆栈的末尾,否则无法取消或关闭任何goroutine。

如果你想要取消某个操作,最好的方法是将context.Context传递给它们,并在goroutine内部监听context.Done()。当context被取消时,你应该return,goroutine在执行完延迟操作(如果有的话)后会自动结束。

英文:

The short answer is: No.

You can not cancel or close any goroutine unless the goroutine itself reaches the return or end of its stack.

If you want to cancel something, the best approach is to pass a context.Context to them and listen to this context.Done() inside of the routine. Whenever context is canceled, you should return and the goroutine will automatically die after executing defers(if any).

答案3

得分: 0

package main

import "fmt"

type Work struct {
	// ...
	Name      string
	IsSuccess chan bool
}

// 这个检查可能会耗费很多时间
func (w *Work) Check() {
	// 返回成功或失败

	//...
	if len(w.Name) > 0 {
		w.IsSuccess <- true
	} else {
		w.IsSuccess <- false
	}

}

// 堆排序
func main() {
	works := make([]*Work, 3)
	works[0] = &Work{
		Name:      "",
		IsSuccess: make(chan bool),
	}
	works[1] = &Work{
		Name:      "111",
		IsSuccess: make(chan bool),
	}
	works[2] = &Work{
		Name:      "",
		IsSuccess: make(chan bool),
	}

	for _, w := range works {
		go w.Check()
	}

	for i, w := range works {
		select {
		case checkResult := <-w.IsSuccess:
			fmt.Printf("索引 %d 的检查结果为 %t \n", i, checkResult)
		}
	}
}

点击查看图片

英文:
package main

import &quot;fmt&quot;

type Work struct {
	// ...
	Name string
	IsSuccess chan bool
}

// This Check could be quite time-consuming
func (w *Work) Check() {
	// return succeed or not

	//...
	if len(w.Name) &gt; 0 {
		w.IsSuccess &lt;- true
	}else{
		w.IsSuccess &lt;- false
	}

}


//堆排序
func main() {
	works := make([]*Work,3)
	works[0] = &amp;Work{
		Name: &quot;&quot;,
		IsSuccess: make(chan bool),
	}
	works[1] =  &amp;Work{
		Name: &quot;111&quot;,
		IsSuccess: make(chan bool),
	}
	works[2] =&amp;Work{
		Name: &quot;&quot;,
		IsSuccess: make(chan bool),
	}

	for _,w := range works {
		go w.Check()
	}

	for i,w := range works{
		select {
		case checkResult := &lt;-w.IsSuccess :
			fmt.Printf(&quot;index %d checkresult %t \n&quot;,i,checkResult)
		}
	}
}

enter image description here

huangapple
  • 本文由 发表于 2022年4月13日 17:28:23
  • 转载请务必保留本文链接:https://go.coder-hub.com/71855079.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定