火并忘却的 goroutine(Go 语言)

huangapple go评论93阅读模式
英文:

Fire and forget goroutine golang

问题

我已经编写了一个API,它进行了一些数据库调用和业务逻辑处理。我正在调用一个goroutine,在后台执行一些操作。
由于API调用不应该等待后台任务完成,所以在调用goroutine后立即返回200 OK(假设后台任务永远不会出错)。

我了解到,一旦goroutine完成了它的任务,它就会被终止。
这种"fire and forget"的方式是否安全,不会导致goroutine泄漏?
一旦它们完成任务,goroutine会被终止和清理吗?

func DefaultHandler(w http.ResponseWriter, r *http.Request) {
	// 一些数据库调用
	// 一些业务逻辑处理
	go func() {
		// 一些需要5秒钟的任务
	}()
	w.WriteHeader(http.StatusOK)
}
英文:

I have written an API that makes DB calls and does some business logic. I am invoking a goroutine that must perform some operation in the background.
Since the API call should not wait for this background task to finish, I am returning 200 OK immediately after calling the goroutine (let us assume the background task will never give any error.)

I read that goroutine will be terminated once the goroutine has completed its task.
Is this fire and forget way safe from a goroutine leak?
Are goroutines terminated and cleaned up once they perform the job?

func DefaultHandler(w http.ResponseWriter, r *http.Request) {
	// Some DB calls
	// Some business logics
	go func() {
		// some Task taking 5 sec
	}()
	w.WriteHeader(http.StatusOK)
}

答案1

得分: 5

没有"goroutine清理"需要处理,你只需要启动goroutine,当作为goroutine启动的函数返回时,它们会被清理。引用自规范:Go语句:

当函数终止时,它的goroutine也会终止。如果函数有任何返回值,在函数完成时它们会被丢弃。

所以你的做法是正确的。但请注意,你启动的goroutine不能使用或假设任何关于请求(r)和响应写入器(w)的内容,你只能在处理程序返回之前使用它们。

另请注意,你不必写http.StatusOK,如果你在处理程序中没有写任何内容就返回,那么默认会发送一个成功的HTTP 200 OK回复。

参考/可能的重复问题:https://stackoverflow.com/questions/37782073/webhook-process-run-on-another-goroutine/37783535#37783535

英文:

There is no "goroutine cleaning" you have to handle, you just launch goroutines and they'll be cleaned when the function launched as a goroutine returns. Quoting from Spec: Go statements:

> When the function terminates, its goroutine also terminates. If the function has any return values, they are discarded when the function completes.

So what you do is fine. Note however that your launched goroutine cannot use or assume anything about the request (r) and response writer (w), you may only use them before you return from the handler.

Also note that you don't have to write http.StatusOK, if you return from the handler without writing anything, that's assumed to be a success and HTTP 200 OK will be sent back automatically.

See related / possible duplicate: https://stackoverflow.com/questions/37782073/webhook-process-run-on-another-goroutine/37783535#37783535

答案2

得分: 5

我建议始终控制好你的goroutine,以避免内存和系统耗尽。如果你收到大量请求并且开始无控制地创建goroutine,那么系统很可能会很快崩溃。

在需要立即返回200 OK的情况下,最好的方法是创建一个消息队列,这样服务器只需要在队列中创建一个作业并返回OK,然后忘记它。其余的工作将由一个异步的消费者处理。

生产者(HTTP服务器)>>> 队列 >>> 消费者

通常,队列是一个外部资源(如RabbitMQ、AWS SQS等),但为了教学目的,你可以使用通道作为消息队列来实现相同的效果。

在下面的示例中,你将看到我们创建了一个通道来通信两个进程。然后我们启动一个从通道中读取的工作进程,以及一个带有处理程序的服务器,该处理程序将写入通道。

尝试在发送curl请求时调整缓冲区大小和作业时间。

package main

import (
	"fmt"
	"log"
	"net/http"
	"time"
)

/*
$ go run .

curl "http://localhost:8080?user_id=1"
curl "http://localhost:8080?user_id=2"
curl "http://localhost:8080?user_id=3"
curl "http://localhost:8080?user_id=....."

*/

func main() {

	queueSize := 10
	// 这是我们的队列,一个用于通信进程的通道。队列大小是可以存储在通道中的项目数
	myJobQueue := make(chan string, queueSize) // 搜索'buffered channels'

	// 启动一个将持续从我们的队列中读取的工作进程
	go myBackgroundWorker(myJobQueue)

	// 我们使用一个处理程序启动服务器,该处理程序接收队列并将其写入
	if err := http.ListenAndServe("localhost:8080", myAsyncHandler(myJobQueue)); err != nil {
		panic(err)
	}
}

func myAsyncHandler(myJobQueue chan<- string) http.HandlerFunc {
	return func(rw http.ResponseWriter, r *http.Request) {
		// 检查查询字符串中是否有'user_id'查询参数
		if userID := r.URL.Query().Get("user_id"); userID != "" {
			select {
			case myJobQueue <- userID: // 尝试将项目放入队列...
				rw.WriteHeader(http.StatusOK)
				rw.Write([]byte(fmt.Sprintf("queuing user process: %s", userID)))
			default: // 如果无法写入队列,那是因为队列已满!
				rw.WriteHeader(http.StatusInternalServerError)
				rw.Write([]byte(`our internal queue is full, try it later`))
			}
			return
		}
		rw.WriteHeader(http.StatusBadRequest)
		rw.Write([]byte(`missing 'user_id' in query params`))
	}
}

func myBackgroundWorker(myJobQueue <-chan string) {
	const (
		jobDuration = 10 * time.Second // 模拟一个耗时的后台进程
	)

	// 我们持续从队列中读取并逐个处理队列。
	// 在这个循环中,我们可以以受控的方式创建更多的goroutine来并行处理工作并增加读取吞吐量,但我不想过于复杂化示例。
	for userID := range myJobQueue {
		// 速率限制器在这里...
		// go func(u string){
		log.Printf("processing user: %s, started", userID)
		time.Sleep(jobDuration)
		log.Printf("processing user: %s, finished", userID)
		// }(userID)
	}
}

英文:

I would recommend always having your goroutines under control to avoid memory and system exhaustion.
If you are receiving a spike of requests and you start spawning goroutines without control, probably the system will go down soon or later.

In those cases where you need to return an immediate 200Ok the best approach is to create a message queue, so the server only needs to create a job in the queue and return the ok and forget. The rest will be handled by a consumer asynchronously.

Producer (HTTP server) >>> Queue >>> Consumer

Normally, the queue is an external resource (RabbitMQ, AWS SQS...) but for teaching purposes, you can achieve the same effect using a channel as a message queue.

In the example you'll see how we create a channel to communicate 2 processes.
Then we start the worker process that will read from the channel and later the server with a handler that will write to the channel.

Try to play with the buffer size and job time while sending curl requests.

package main

import (
	&quot;fmt&quot;
	&quot;log&quot;
	&quot;net/http&quot;
	&quot;time&quot;
)

/*
$ go run .

curl &quot;http://localhost:8080?user_id=1&quot;
curl &quot;http://localhost:8080?user_id=2&quot;
curl &quot;http://localhost:8080?user_id=3&quot;
curl &quot;http://localhost:8080?user_id=.....&quot;

*/

func main() {

	queueSize := 10
	// This is our queue, a channel to communicate processes. Queue size is the number of items that can be stored in the channel
	myJobQueue := make(chan string, queueSize) // Search for &#39;buffered channels&#39;

	// Starts a worker that will read continuously from our queue
	go myBackgroundWorker(myJobQueue)

	// We start our server with a handler that is receiving the queue to write to it
	if err := http.ListenAndServe(&quot;localhost:8080&quot;, myAsyncHandler(myJobQueue)); err != nil {
		panic(err)
	}
}

func myAsyncHandler(myJobQueue chan&lt;- string) http.HandlerFunc {
	return func(rw http.ResponseWriter, r *http.Request) {
		// We check that in the query string we have a &#39;user_id&#39; query param
		if userID := r.URL.Query().Get(&quot;user_id&quot;); userID != &quot;&quot; {
			select {
			case myJobQueue &lt;- userID: // We try to put the item into the queue ...
				rw.WriteHeader(http.StatusOK)
				rw.Write([]byte(fmt.Sprintf(&quot;queuing user process: %s&quot;, userID)))
			default: // If we cannot write to the queue it&#39;s because is full!
				rw.WriteHeader(http.StatusInternalServerError)
				rw.Write([]byte(`our internal queue is full, try it later`))
			}
			return
		}
		rw.WriteHeader(http.StatusBadRequest)
		rw.Write([]byte(`missing &#39;user_id&#39; in query params`))
	}
}

func myBackgroundWorker(myJobQueue &lt;-chan string) {
	const (
		jobDuration = 10 * time.Second // simulation of a heavy background process
	)

	// We continuosly read from our queue and process the queue 1 by 1.
	// In this loop we could spawn more goroutines in a controlled way to paralelize work and increase the read throughput, but i don&#39;t want to overcomplicate the example.
	for userID := range myJobQueue {
		// rate limiter here ...
		// go func(u string){
		log.Printf(&quot;processing user: %s, started&quot;, userID)
		time.Sleep(jobDuration)
		log.Printf(&quot;processing user: %s, finisehd&quot;, userID)
		// }(userID)
	}
}

答案3

得分: 2

@icza是完全正确的,没有"goroutine cleaning",你可以使用webhook或像gocraft这样的后台作业。我能想到的唯一使用你的解决方案的方法是使用sync包进行学习目的。

func DefaultHandler(w http.ResponseWriter, r *http.Request) {
    // Some DB calls
    // Some business logics
    var wg sync.WaitGroup
    wg.Add(1)
    go func() {
        defer wg.Done()
        // some Task taking 5 sec
    }()
    w.WriteHeader(http.StatusOK)
    wg.Wait()
}
英文:

@icza is absolutely right there is no "goroutine cleaning" you can use a webhook or a background job like gocraft. The only way I can think of using your solution is to use the sync package for learning purposes.

func DefaultHandler(w http.ResponseWriter, r *http.Request) {
// Some DB calls
// Some business logics
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
// some Task taking 5 sec
}()
w.WriteHeader(http.StatusOK)
wg.wait()

}

答案4

得分: 1

你可以使用&sync.WaitGroup来等待goroutine完成:

// BusyTask
func BusyTask(t interface{}) error {
    var wg = &sync.WaitGroup{}

    wg.Add(1)
    go func() {
        // 忙碌处理任务
        time.Sleep(5 * time.Second)
        wg.Done()
    }()
    wg.Wait() // 等待goroutine完成

    return nil
}

// 这将等待5秒直到goroutine完成
func main() {
    fmt.Println("hello")

    BusyTask("some task...")

    fmt.Println("done")
}

另一种方法是将context.Context附加到goroutine并设置超时。

//
func BusyTaskContext(ctx context.Context, t string) error {
    done := make(chan struct{}, 1)
    //
    go func() {
        // 等待5秒
        time.Sleep(5 * time.Second)
        // 执行任务并发送完成信号
        done <- struct{}{}
        close(done)
    }()
    //
    select {
    case <-ctx.Done():
        return errors.New("timeout")
    case <-done:
        return nil
    }
}

//
func main() {
    fmt.Println("hello")

    ctx, cancel := context.WithTimeout(context.TODO(), 2*time.Second)
    defer cancel()

    if err := BusyTaskContext(ctx, "some task..."); err != nil {
        fmt.Println(err)
        return
    }

    fmt.Println("done")
}
英文:

you can wait for a goroutine to finish using &amp;sync.WaitGroup:

// BusyTask
func BusyTask(t interface{}) error {
	var wg = &amp;sync.WaitGroup{}

	wg.Add(1)
	go func() {
		// busy doing stuff
		time.Sleep(5 * time.Second)
		wg.Done()
	}()
	wg.Wait() // wait for goroutine

	return nil
}

// this will wait 5 second till goroutune finish
func main() {
	fmt.Println(&quot;hello&quot;)

	BusyTask(&quot;some task...&quot;)

	fmt.Println(&quot;done&quot;)
}

Other way is to attach a context.Context to goroutine and time it out.

//
func BusyTaskContext(ctx context.Context, t string) error {
	done := make(chan struct{}, 1)
	//
	go func() {
		// time sleep 5 second
		time.Sleep(5 * time.Second)
		// do tasks and signle done
		done &lt;- struct{}{}
		close(done)
	}()
	//
	select {
	case &lt;-ctx.Done():
		return errors.New(&quot;timeout&quot;)
	case &lt;-done:
		return nil
	}
}

//
func main() {
	fmt.Println(&quot;hello&quot;)

	ctx, cancel := context.WithTimeout(context.TODO(), 2*time.Second)
	defer cancel()

	if err := BusyTaskContext(ctx, &quot;some task...&quot;); err != nil {
		fmt.Println(err)
		return
	}

	fmt.Println(&quot;done&quot;)
}

huangapple
  • 本文由 发表于 2021年7月13日 17:15:59
  • 转载请务必保留本文链接:https://go.coder-hub.com/68359637.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定