英文:
How to cancel a channel job based on an ID in golang
问题
所以我有一个POST
端点,它创建一个作业并将其添加到通道中。workerJobsChan = make(chan Job, maxQueueSize)
这是我在通道中执行作业的方式(main.go
):
for i := 1; i <= maxWorkers; i++ {
go func(i int) {
for job := range workerJobsChan {
ctx, cancel := context.WithCancel(context.Background())
storeJob(job.Search.ID, cancel)
job.Execute(ctx, c.db, i)
}
}(i)
}
我将取消函数存储在一个映射中:cancelJobFuncs = make(map[int]context.CancelFunc)
。
这是作业函数:
func (j *Job) Execute(ctx context.Context, db *sql.DB, workerID int) error {
for {
select {
// 检查取消信号
case <-ctx.Done():
if err := ctx.Err(); err != nil {
fmt.Println("Worker", workerID, "error", err)
}
fmt.Println("Worker", workerID, "cancelled")
return nil
default:
fmt.Printf("worker%d: processing %s\n", workerID, j.Search.Query)
time.Sleep(2 * time.Second)
fmt.Printf("worker%d: active %s\n", workerID, j.Search.Query)
time.Sleep(5 * time.Second)
fmt.Printf("worker%d: completed %s!\n", workerID, j.Search.Query)
}
}
}
我这样取消一个上下文(在一个 HTTP 处理程序中):
cancelJob(search.ID)
但作业仍然继续运行。我尝试了很多方法,但似乎无法使其工作。
英文:
So i have a POST
endpoint that creates a job and adds them to a chan. workerJobsChan = make(chan Job, maxQueueSize)
This is how i execute the jobs in the channel (main.go
):
for i := 1; i <= maxWorkers; i++ {
go func(i int) {
for job := range workerJobsChan {
ctx, cancel := context.WithCancel(context.Background())
storeJob(job.Search.ID, cancel)
job.Execute(ctx, c.db, i)
}
}(i)
}
I store the cancel function in a map: cancelJobFuncs = make(map[int]context.CancelFunc)
.
This is the job function :
func (j *Job) Execute(ctx context.Context, db *sql.DB, workerID int) error {
for {
select {
// Check for cancellation signal
case <-ctx.Done():
if err := ctx.Err(); err != nil {
fmt.Println("Worker", workerID, "error", err)
}
fmt.Println("Worker", workerID, "cancelled")
return nil
default:
fmt.Printf("worker%d: processing %s\n", workerID, j.Search.Query)
time.Sleep(2 * time.Second)
fmt.Printf("worker%d: active %s\n", workerID, j.Search.Query)
time.Sleep(5 * time.Second)
fmt.Printf("worker%d: completed %s!\n", workerID, j.Search.Query)
}
}
}
I cancel a context like this (in a http handler):
cancelJob(search.ID)
But the job just continues to run. I tried a lot of things but i just can't seem to get it working.
答案1
得分: 0
这里有一种明显的观点表达方式:如果你的代码没有检查ctx.Done()
,它就无法知道是否被取消了。(顺便说一下,这是对你问题中@JimB在评论中所说的另一种表述)。
所以当你的代码在.Execute(...)
方法中开始执行这个代码块时:
fmt.Printf("worker%d: processing %s\n", workerID, j.Search.Query)
time.Sleep(2 * time.Second)
fmt.Printf("worker%d: active %s\n", workerID, j.Search.Query)
time.Sleep(5 * time.Second)
fmt.Printf("worker%d: completed %s!\n", workerID, j.Search.Query)
它会执行完这个代码块(7秒)。没有任何指令告诉它在取消时停止。
如果你希望在“Sleep”指令期间能够检测到取消操作,你需要改变你的代码。
这里是一个示例,展示如何在你的示例中实现这一点:
func (j *Job) Execute(ctx context.Context, db *sql.DB, workerID int) error {
for {
fmt.Printf("worker%d: processing %s\n", workerID, j.Search.Query)
// 使用time.After()重写time.Sleep(),以便可以在select语句中组合使用:
select {
case <-ctx.Done():
fmt.Println("Worker", workerID, "cancelled")
return nil
case <-time.After(2 * time.Second):
// 继续执行
}
fmt.Printf("worker%d: active %s\n", workerID, j.Search.Query)
select {
case <-ctx.Done():
fmt.Println("Worker", workerID, "cancelled")
return nil
case <-time.After(5 * time.Second):
// 继续执行
}
fmt.Printf("worker%d: completed %s!\n", workerID, j.Search.Query)
}
}
我猜你实际的代码中并不是使用time.Sleep()
指令,而是使用processSearch(...)
或doQuery(...)
等函数。
如果你需要在这些函数执行期间能够取消操作,你需要以某种方式将取消上下文传递给它们,并让它们检查取消操作。
一种“传递上下文”的方式是将其添加到这些函数的参数中:
processSearch(ctx, ...)
doQuery(ctx, ...)
但根据你现有的代码,有些参数可能已经有了内置的取消方式。举个例子:
// http.Request携带一个上下文:
func doQuery(req *http.Request, ....) {
...
}
// 在调用处:
...
req := http.NewRequestWithContext(ctx, "GET", "https://some.other.service/", nil)
doQuery(req, ...)
英文:
Here is one way to state an obvious point: if your code does not check ctx.Done()
, it can't know it is cancelled.
(by the way this is yet another rewording of what @JimB wrote in comments to your question).
So when your code in your .Execute(...)
method starts executing this block :
fmt.Printf("worker%d: processing %s\n", workerID, j.Search.Query)
time.Sleep(2 * time.Second)
fmt.Printf("worker%d: active %s\n", workerID, j.Search.Query)
time.Sleep(5 * time.Second)
fmt.Printf("worker%d: completed %s!\n", workerID, j.Search.Query)
it will go to the end of this block (7 seconds). There is no instruction telling it to stop on cancellation.
If you want your function to be able to detect cancellation during the "Sleep" instructions, you have to change your code.
Here is one example of how to do that with your example:
func (j *Job) Execute(ctx context.Context, db *sql.DB, workerID int) error {
for {
fmt.Printf("worker%d: processing %s\n", workerID, j.Search.Query)
// rewrite time.Sleep() with time.After() so that it can be composed
// in a select statement:
select {
case <-ctx.Done():
fmt.Println("Worker", workerID, "cancelled")
return nil
case <-time.After(2 * time.Second):
// keep going
}
fmt.Printf("worker%d: active %s\n", workerID, j.Search.Query)
select {
case <-ctx.Done():
fmt.Println("Worker", workerID, "cancelled")
return nil
case <-time.After(5 * time.Second):
// keep going
}
fmt.Printf("worker%d: completed %s!\n", workerID, j.Search.Query)
}
}
https://go.dev/play/p/Tlpggug2AUB
I guess your actual code doesn't have time.Sleep()
instructions but rather processSearch(...)
or doQuery(...)
or ...
If you need these functions to be cancellable during their execution, you will need to somehow pass your cancellation context to them and have them check the cancellation some way or some other.
One way to "pass the context" is obviously to add it to the arguments of said functions :
processSearch(ctx, ...)
doQuery(ctx, ...)
but depending on your existing code, some arguments may already have a built-in way to be cancelled.
As an illustration:
// an http.Request carries a context:
func doQuery(req *http.Request, ....) {
...
}
// at call site:
...
req := http.NewRequestWithContext(ctx, "GET", "https://some.other.service/", nil)
doQuery(req, ...)
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论