使用goroutines和context创建可取消的工作线程

huangapple go评论81阅读模式
英文:

Creating cancelable workers with goroutines and context

问题

我正在尝试理解如何正确使用goroutines、channels和context来创建可取消的后台工作程序。

我熟悉使用可以在显式调用时取消的context,将其附加到worker goroutine应该可以停止worker。

但是我无法弄清楚如何使用它来实现这个目标。

下面的示例演示了一个worker goroutine,它从一个名为"urls"的channel获取数据,并携带一个可取消的context。

//worker.go
func Worker(id int, client *http.Client, urls chan string, ctx context.Context, wg *sync.WaitGroup) {
	fmt.Printf("Worker %d is starting\n", id)
	select {
	// 用于从URL写入数据的channel的占位符
	case url := <-urls:
		fmt.Printf("Worker :%d received url :%s\n", id, url)
	// 检查是否取消了进程
	case <-ctx.Done():
		fmt.Printf("Worker :%d exiting..\n", id)
	}
	fmt.Printf("Worker :%d done..\n", id)
	wg.Done()
}

这对我来说不起作用,有两个原因:

  1. 对于无缓冲的channel,在没有goroutine读取时向其写入数据将会阻塞它,因此一旦更多数据被添加到urls channel中,发送者将会被阻塞。
  2. 它会立即返回,一旦其中任何一个channel返回。

我还尝试将select语句包装在一个无限循环中,但在context之后添加一个break语句会引发错误。

func Worker(id int, client *http.Client, urls chan string, ctx context.Context, wg *sync.WaitGroup) {
	fmt.Printf("Worker %d is starting\n", id)
	for {
		select {
		// 用于从URL写入数据的channel的占位符
		case url := <-urls:
			fmt.Printf("Worker :%d received url :%s\n", id, url)
		// 检查是否取消了进程
		case <-ctx.Done():
			fmt.Printf("Worker :%d exiting..\n", id)
			break // raises error :ineffective break statement. Did you mean to break out of the outer loop? (SA4011)go-staticcheck
		}
	}
	fmt.Printf("Worker :%d done..\n", id) // 代码无法到达
	wg.Done()
}

实现这样的功能的正确方法是什么?

附注:关于设计这类worker进程的任何资源/参考资料也将非常有帮助。

英文:

I'm trying to understand how to properly use goroutines along with channels and context, to create a cancelable background worker.

I'm familiar with using contexts that can cancel when explicitly called, attaching it to the worker goroutine should let me stop the worker.

But I cant figure out how to use it to achieve what this.

The example below illustrates a worker goroutine that gets the data from a channel 'urls', and it also carries a cancelable context.

//worker.go
func Worker(id int, client *http.Client, urls chan string, ctx context.Context, wg *sync.WaitGroup) {
	fmt.Printf(&quot;Worker %d is starting\n&quot;, id)
	select {
	// placeholder for a channel writing the data from the URL
	case url := &lt;-urls:
		fmt.Printf(&quot;Worker :%d received url :%s\n&quot;, id, url)
	// checking if the process is cancelled
	case &lt;-ctx.Done():
		fmt.Printf(&quot;Worker :%d exitting..\n&quot;, id)
	}
	fmt.Printf(&quot;Worker :%d done..\n&quot;, id)
	wg.Done()
}

This doesn't work for me for two reasons,

  1. For an unbuffered channel, writing to it with no goroutines to read from will block it, so once more data is added to the the urls channel, the sender will block.
  2. It returns immediately, once either of the two channel returns.

I also tried wrapping the select in an infinite loop, but adding a break after the context is raising error.

func Worker(id int, client *http.Client, urls chan string, ctx context.Context, wg *sync.WaitGroup) {
	fmt.Printf(&quot;Worker %d is starting\n&quot;, id)
	for {
		select {
		// placeholder for a channel writing the data from the URL
		case url := &lt;-urls:
			fmt.Printf(&quot;Worker :%d received url :%s\n&quot;, id, url)
		// checking if the process is cancelled
		case &lt;-ctx.Done():
			fmt.Printf(&quot;Worker :%d exitting..\n&quot;, id)
			break // raises error :ineffective break statement. Did you mean to break out of the outer loop? (SA4011)go-staticcheck
		}
	}
	fmt.Printf(&quot;Worker :%d done..\n&quot;, id) // code is unreachable
	wg.Done()
}

What is the right approach to implement something like this?

PS : Any resources / references about designing worker processes like these will be helpful too.

答案1

得分: 1

你可以将break替换为return,代码将会正常工作。

然而,更好的方法是:

  1. 工作者在for / range循环中消费通道。
  2. 生产者应该负责检测取消操作并关闭通道。for循环将会级联停止。
英文:

You can substitute the break by return and the code will work.

However, a better approach can be:

  1. Workers consume the channel in for / range loop
  2. The producer should be responsible for detect the cancel and close the channel. The for loop will stop in cascade

答案2

得分: 0

我为此制作了一个专门的Go包。你可以在这里找到它:https://github.com/MicahParks/ctxerrpool

以下是该项目的README.md中的示例代码:

package main

import (
	"bytes"
	"context"
	"log"
	"net/http"
	"os"
	"time"

	"github.com/MicahParks/ctxerrpool"
)

func main() {

	// 创建一个记录所有错误的错误处理程序。
	var errorHandler ctxerrpool.ErrorHandler
	errorHandler = func(pool ctxerrpool.Pool, err error) {
		log.Printf("发生错误。错误信息:%s。\n", err.Error())
	}

	// 创建一个具有4个工作线程的工作池。
	pool := ctxerrpool.New(4, errorHandler)

	// 创建一些通过闭包继承的变量。
	httpClient := &http.Client{}
	u := "https://golang.org"
	logger := log.New(os.Stdout, "状态码:", 0)

	// 创建工作函数。
	var work ctxerrpool.Work
	work = func(ctx context.Context) (err error) {

		// 创建HTTP请求。
		var req *http.Request
		if req, err = http.NewRequestWithContext(ctx, http.MethodGet, u, bytes.NewReader(nil)); err != nil {
			return err
		}

		// 执行HTTP请求。
		var resp *http.Response
		if resp, err = httpClient.Do(req); err != nil {
			return err
		}

		// 记录状态码。
		logger.Println(resp.StatusCode)

		return nil
	}

	// 执行16次工作。
	for i := 0; i < 16; i++ {

		// 为工作创建上下文。
		ctx, cancel := context.WithTimeout(context.Background(), time.Second)
		defer cancel()

		// 将工作发送到工作池。
		pool.AddWorkItem(ctx, work)
	}

	// 等待工作池完成。
	pool.Wait()
}

希望对你有帮助!

英文:

I've made a Go package specifically for this. You can find it here: https://github.com/MicahParks/ctxerrpool

Here's the example from the project's README.md:

package main

import (
	&quot;bytes&quot;
	&quot;context&quot;
	&quot;log&quot;
	&quot;net/http&quot;
	&quot;os&quot;
	&quot;time&quot;

	&quot;github.com/MicahParks/ctxerrpool&quot;
)

func main() {

	// Create an error handler that logs all errors.
	var errorHandler ctxerrpool.ErrorHandler
	errorHandler = func(pool ctxerrpool.Pool, err error) {
		log.Printf(&quot;An error occurred. Error: \&quot;%s\&quot;.\n&quot;, err.Error())
	}

	// Create a worker pool with 4 workers.
	pool := ctxerrpool.New(4, errorHandler)

	// Create some variables to inherit through a closure.
	httpClient := &amp;http.Client{}
	u := &quot;https://golang.org&quot;
	logger := log.New(os.Stdout, &quot;status codes: &quot;, 0)

	// Create the worker function.
	var work ctxerrpool.Work
	work = func(ctx context.Context) (err error) {

		// Create the HTTP request.
		var req *http.Request
		if req, err = http.NewRequestWithContext(ctx, http.MethodGet, u, bytes.NewReader(nil)); err != nil {
			return err
		}

		// Do the HTTP request.
		var resp *http.Response
		if resp, err = httpClient.Do(req); err != nil {
			return err
		}

		// Log the status code.
		logger.Println(resp.StatusCode)

		return nil
	}

	// Do the work 16 times.
	for i := 0; i &lt; 16; i++ {

		// Create a context for the work.
		ctx, cancel := context.WithTimeout(context.Background(), time.Second)
		defer cancel()

		// Send the work to the pool.
		pool.AddWorkItem(ctx, work)
	}

	// Wait for the pool to finish.
	pool.Wait()
}

huangapple
  • 本文由 发表于 2022年4月12日 03:18:38
  • 转载请务必保留本文链接:https://go.coder-hub.com/71833198.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定