英文:
Track progress of long running tasks - correct approach
问题
我想要追踪一些长时间运行的进程,并显示用户的完成百分比和错误(如果有)。如果只有一个长时间运行的进程,那么很容易实现 - 你可以为进度(百分比)和错误创建通道。当我们有X个长时间运行的进程时,实现这种逻辑的正确方法是什么?
下面是一段代码片段,它可以工作,但我不太喜欢它的实现方式。我创建了一个名为ProgressTracker
的结构体,它保持了Url
(作为字段)、Error
、Progress
作为通道。我将这样的ProgressTracker
保存在一个切片中,一旦我提交了所有的任务,我就通过ProgressTracker
切片进行迭代,并监听每个ProgressTracker
中的通道。一旦提交的请求数等于接收到的响应数 - 退出循环。
这是一个Go语言的典型解决方案吗?将ProgressTracker
作为通道传递给函数会更容易,但我不知道如何在这种情况下正确发送"progress"、"error"和"complete"事件。
以下是代码,同样可以在Go Playground上找到:https://go.dev/play/p/f3hXJsZR9WV
package main
import (
"errors"
"fmt"
"strings"
"sync"
"time"
)
type ProgressTracker struct {
Progress chan int
Error chan error
Complete chan bool
Url string
}
/**
This method sleeps for 1 second and sends progress (in %) in each iteration to Progress channel
For .net sites on 3rd iteration fail with error
When everything is completed, send a message to Complete channel
*/
func work(url string, tracker *ProgressTracker) {
tracker.Url = url
fmt.Printf("processing url %s\n", url)
for i := 1; i <= 5; i++ {
time.Sleep(time.Second)
if i == 3 && strings.HasSuffix(url, ".net") {
tracker.Error <- errors.New("emulating error for .net sites")
tracker.Complete <- true
}
progress := 20 * i
tracker.Progress <- progress
}
tracker.Complete <- true
}
func main() {
var trackers []*ProgressTracker
var urls = []string{"google.com", "youtube.com", "someurl.net"}
var wg sync.WaitGroup
wg.Add(len(urls))
for _, url := range urls {
tracker := &ProgressTracker{
Progress: make(chan int),
Error: make(chan error),
Complete: make(chan bool),
}
trackers = append(trackers, tracker)
go func(workUrl string, progressTracker *ProgressTracker) {
work(workUrl, progressTracker)
}(url, tracker)
}
go func() {
wg.Wait()
}()
var processed = 0
//iterate through all trackers and select each channel.
//Exit from this loop when number of processed requests equals the number of trackers
for {
for _, t := range trackers {
select {
case pr := <-t.Progress:
fmt.Printf("Url = %s, progress = %d\n", t.Url, pr)
case err := <-t.Error:
fmt.Printf("Url = %s, error = %s\n", t.Url, err.Error())
case <-t.Complete:
fmt.Printf("Url = %s is completed\n", t.Url)
processed = processed + 1
if processed == len(trackers) {
fmt.Printf("Everything is completed, exit")
return
}
}
}
}
}
更新:
如果我给其中一个任务添加延迟,那么在我选择所有通道的for循环中,也会等待最慢的工作程序的每次迭代。
Go Playground:https://go.dev/play/p/9FvDE7ZGIrP
更新后的work函数:
func work(url string, tracker *ProgressTracker) {
tracker.Url = url
fmt.Printf("processing url %s\n", url)
for i := 1; i <= 5; i++ {
if url == "google.com" {
time.Sleep(time.Second * 3)
}
time.Sleep(time.Second)
if i == 3 && strings.HasSuffix(url, ".net") {
tracker.Error <- errors.New("emulating error for .net sites")
tracker.Complete <- true
return
}
progress := 20 * i
tracker.Progress <- progress
}
tracker.Complete <- true
}
英文:
I'd like to track execution of some long running process and show the user completion percentage and errors (if any). If it's one long running process, then it's easy - you can create channels for progress (percentage) and error. What would the correct way to implement such logic when we have X long running processes?
Below is a snippet of code that works, but I don't really like how it's implemented.
I created a struct ProgressTracker
that keeps Url
(as a field), Error
, Progress
as channels. I keep such ProgressTracker
in a slice and once I submit all tasks I iterate via the slice of ProgressTracker
and listen to channels for each tracker
in ProgressTracker
. Once the number of submitted requests == number of received responses - exit the loop.
Is it Go idiomatic solution? It would be easier to pass ProgressTracker
to the function as a channel, but I don't know how to properly send "progress", "error" and "complete" events in such case.
The code is below, the same is available in Go playground: https://go.dev/play/p/f3hXJsZR9WV
package main
import (
"errors"
"fmt"
"strings"
"sync"
"time"
)
type ProgressTracker struct {
Progress chan int
Error chan error
Complete chan bool
Url string
}
/**
This method sleeps for 1 second and sends progress (in %) in each iteration to Progress channel
For .net sites on 3rd iteration fail with error
When everything is completed, send a message to Complete channel
*/
func work(url string, tracker *ProgressTracker) {
tracker.Url = url
fmt.Printf("processing url %s\n", url)
for i := 1; i <= 5; i++ {
time.Sleep(time.Second)
if i == 3 && strings.HasSuffix(url, ".net") {
tracker.Error <- errors.New("emulating error for .net sites")
tracker.Complete <- true
}
progress := 20 * i
tracker.Progress <- progress
}
tracker.Complete <- true
}
func main() {
var trackers []*ProgressTracker
var urls = []string{"google.com", "youtube.com", "someurl.net"}
var wg sync.WaitGroup
wg.Add(len(urls))
for _, url := range urls {
tracker := &ProgressTracker{
Progress: make(chan int),
Error: make(chan error),
Complete: make(chan bool),
}
trackers = append(trackers, tracker)
go func(workUrl string, progressTracker *ProgressTracker) {
work(workUrl, progressTracker)
}(url, tracker)
}
go func() {
wg.Wait()
}()
var processed = 0
//iterate through all trackers and select each channel.
//Exit from this loop when number of processed requests equals the number of trackers
for {
for _, t := range trackers {
select {
case pr := <-t.Progress:
fmt.Printf("Url = %s, progress = %d\n", t.Url, pr)
case err := <-t.Error:
fmt.Printf("Url = %s, error = %s\n", t.Url, err.Error())
case <-t.Complete:
fmt.Printf("Url = %s is completed\n", t.Url)
processed = processed + 1
if processed == len(trackers) {
fmt.Printf("Everything is completed, exit")
return
}
}
}
}
}
UPD:
If I add a delay to one of the tasks, then the for loop where I select all the channels will also wait for the slowest worker on each iteration.
Go playground: https://go.dev/play/p/9FvDE7ZGIrP
Updated work function:
func work(url string, tracker *ProgressTracker) {
tracker.Url = url
fmt.Printf("processing url %s\n", url)
for i := 1; i <= 5; i++ {
if url == "google.com" {
time.Sleep(time.Second * 3)
}
time.Sleep(time.Second)
if i == 3 && strings.HasSuffix(url, ".net") {
tracker.Error <- errors.New("emulating error for .net sites")
tracker.Complete <- true
return
}
progress := 20 * i
tracker.Progress <- progress
}
tracker.Complete <- true
}
答案1
得分: 0
你因为持续对已完成且没有默认值的跟踪器进行select
操作而发生了死锁。你的内部for
循环每次都会迭代所有跟踪器,包括已完成且不会再发送其他消息的跟踪器。最简单的解决方法是使用一个空的default
,这样在现实生活中它们的行为会更好,因为它们不会以相同的速度进行,但这会将其转换为一个紧密的循环,会消耗更多的CPU。
你的WaitGroup
根本没有起作用;你在一个goroutine中调用了Wait
,但在它返回时什么都没做,并且你从未在它所跟踪的goroutine中调用过Done
,所以它永远不会返回。相反,你单独跟踪了你收到的Complete
消息的数量,并将其用作WaitGroup的替代;目前尚不清楚为什么要以这种方式实现。
修复这两个问题可以解决所述的问题:https://go.dev/play/p/do0g9jrX0mY
然而,这可能不是正确的方法。在一个人为的例子中很难说出正确的方法;如果这个例子是它需要做的全部内容,那么你不需要任何逻辑,你可以将打印语句放在工作线程中,只使用一个waitgroup而不使用通道,然后就完成了。假设你实际上正在处理结果,你可能希望有一个单独的Completed
通道和一个单独的Error
通道,供所有工作线程共享,并且可能需要完全不同的机制来跟踪进度,比如一个原子的整数/浮点数,你只需要在想要知道当前进度时从中读取。然后你就不需要嵌套循环的东西了,你只需要一个循环和一个select
来从共享通道中读取消息。这完全取决于代码的使用环境。
英文:
You're deadlocking because you're continuing to select
against trackers that have finished with no default. Your inner for
loop iterates all trackers every time, which includes trackers that are done and are never going to send another message. The easiest way out of this is an empty default
, which would also make these behave better in real life where they don't all go at the same pace, but it does turn this into a tight loop which will consume more CPU.
Your WaitGroup
doesn't do anything at all; you're calling Wait
in a goroutine but doing nothing when it returns, and you never call Done
in the goroutines it's tracking, so it will never return. Instead, you're separately tracking the number of Complete
messages you get and using that instead of the WaitGroup; it's unclear why this is implemented this way.
Fixing both resolves the stated issue: https://go.dev/play/p/do0g9jrX0mY
However, that's probably not the right approach. It's impossible to say with a contrived example what the right approach would be; if the example is all it needs to do, you don't need any of the logic, you could put your print statements in the workers and just use a waitgroup and no channels and be done with it. Assuming you're actually doing something with the results, you probably want a single Completed
channel and a single Error
channel shared by all the workers, and possibly a different mechanism altogether for tracking progress, like an atomic int/float you can just read from when you want to know the current progress. Then you don't need the nested looping stuff, you just have one loop with one select
to read messages from the shared channels. It all depends on the context in which this code is intended to be used.
答案2
得分: 0
谢谢你的回答!我想出了这个方法,它适合我的需求:
package main
import (
"errors"
"fmt"
"strings"
"sync"
"time"
)
type ProgressTracker struct {
Progress int
Error error
Completed bool
Url string
}
/**
This method sleeps for 1 second and sends progress (in %) in each iteration to Progress channel
For .net sites on 3rd iteration fail with error
When everything is completed, send a message to Complete channel
*/
func work(url string, tracker chan ProgressTracker) {
var internalTracker = ProgressTracker{
Url: url,
}
tracker <- internalTracker
fmt.Printf("processing url %s\n", url)
for i := 1; i <= 5; i++ {
if url == "google.com" {
time.Sleep(time.Second * 3)
}
time.Sleep(time.Second)
if i == 3 && strings.HasSuffix(url, ".net") {
internalTracker.Error = errors.New("error for .net sites")
internalTracker.Completed = true
tracker <- internalTracker
return
}
progress := 20 * i
internalTracker.Progress = progress
internalTracker.Completed = false
tracker <- internalTracker
}
internalTracker.Completed = true
tracker <- internalTracker
}
func main() {
var urls = []string{"google.com", "youtube.com", "someurl.net"}
var tracker = make(chan ProgressTracker, len(urls))
var wg sync.WaitGroup
wg.Add(len(urls))
for _, url := range urls {
go func(workUrl string) {
defer wg.Done()
work(workUrl, tracker)
}(url)
}
go func() {
wg.Wait()
close(tracker)
fmt.Printf("After wg wait")
}()
var completed = 0
for completed < len(urls) {
select {
case t := <-tracker:
if t.Completed {
fmt.Printf("Processing for %s is completed!\n", t.Url)
completed = completed + 1
} else {
fmt.Printf("Processing for %s is in progress: %d\n", t.Url, t.Progress)
}
if t.Error != nil {
fmt.Printf("Url %s has errors %s\n", t.Url, t.Error)
}
}
}
}
在这里,我将ProgressTracker
作为一个通道传递(ProgressTracker
中的字段被声明为普通字段,而不是通道),并在每个工作函数的事件上返回正在进行的完整状态(如果进度增加-设置新值并将结构返回到通道,如果发生错误-设置错误并返回结构等)。
英文:
Thank you for your answers! I came up with this approach and it works for my needs:
package main
import (
"errors"
"fmt"
"strings"
"sync"
"time"
)
type ProgressTracker struct {
Progress int
Error error
Completed bool
Url string
}
/**
This method sleeps for 1 second and sends progress (in %) in each iteration to Progress channel
For .net sites on 3rd iteration fail with error
When everything is completed, send a message to Complete channel
*/
func work(url string, tracker chan ProgressTracker) {
var internalTracker = ProgressTracker{
Url: url,
}
tracker <- internalTracker
fmt.Printf("processing url %s\n", url)
for i := 1; i <= 5; i++ {
if url == "google.com" {
time.Sleep(time.Second * 3)
}
time.Sleep(time.Second)
if i == 3 && strings.HasSuffix(url, ".net") {
internalTracker.Error = errors.New("error for .net sites")
internalTracker.Completed = true
tracker <- internalTracker
return
}
progress := 20 * i
internalTracker.Progress = progress
internalTracker.Completed = false
tracker <- internalTracker
}
internalTracker.Completed = true
tracker <- internalTracker
}
func main() {
var urls = []string{"google.com", "youtube.com", "someurl.net"}
var tracker = make(chan ProgressTracker, len(urls))
var wg sync.WaitGroup
wg.Add(len(urls))
for _, url := range urls {
go func(workUrl string) {
defer wg.Done()
work(workUrl, tracker)
}(url)
}
go func() {
wg.Wait()
close(tracker)
fmt.Printf("After wg wait")
}()
var completed = 0
for completed < len(urls) {
select {
case t := <-tracker:
if t.Completed {
fmt.Printf("Processing for %s is completed!\n", t.Url)
completed = completed + 1
} else {
fmt.Printf("Processing for %s is in progress: %d\n", t.Url, t.Progress)
}
if t.Error != nil {
fmt.Printf("Url %s has errors %s\n", t.Url, t.Error)
}
}
}
}
Here I pass ProgressTracker
as a channel (fields in ProgressTracker
are declared as simple fields, not channels) and on each event from work function return a complete state of what's is going on (if progress increased - set new value and return the a structure
to channel, if error happened - set the error and return the structure, etc).
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论