接受来自最快工作函数的请求。

huangapple go评论82阅读模式
英文:

Go Accept from the fastest worker function

问题

请考虑这个问题。

目前我有一个完成工作的方法:

for {
  var result string 
  var resultOK bool
  result, resultOK = worker1(job) 
  if resultOK {
     // 继续其他操作
  }
  ...
}

假设现在我们有另一种可能能更快完成这个工作的机制 - 让我们称之为 worker2。请注意,在某些情况下,worker1 可能更快。

启动这两个工作器并接受首先成功完成的工作器的结果的惯用方法是什么?

我知道 select 机制,但它不关心 resultOK 布尔值。

select {
case <- c1:
  // worker1 完成
case <- c2:
  // worker2 完成
case <- time.After(10 * time.Second):
  // 我们需要继续
}

非常感谢任何建议!

英文:

Please consider this question.

Currently I've one method that completes a job:

for {
  var result string 
  var resultOK bool
  result, resultOK = worker1(job) 
  if resultOK {
     // go on to other things
  }
  ...
}

Suppose we now have another mechanism that may be able to finish this job faster - let's call it worker2. Note that in some cases worker1 may be faster.

What's the idiomatic way to launch these two workers and accept the result from the one which finishes first successfully?

I know of select mechanism, but it does not care for resultOK bool.

select {
case &lt;- c1:
  // worker1 finished
case &lt;- c2:
  // worker2 finished
case &lt;- time.After(10 * time.Second):
  // we need to move on
}

Any advise would be much appreciated!

答案1

得分: 2

通常,可以通过让每个工作线程在同一个通道上提交结果并从通道接收结果来解决这个问题(只接收一次)。无论哪个线程更快,其结果将被使用。还可以通过使用context.Context来向其他较慢的工作线程发出信号(告知它们不再需要他们的工作,并尽快中止)。

如果只从通道接收一次,必须注意不要阻塞那些在完成工作并尝试发送结果时变慢的工作线程:它们可能会永远被阻塞。因此,通道应具有足够大的缓冲区,以不阻塞工作线程,或者工作线程应使用select语句进行发送,并使用default分支以防止在无法执行发送操作时被阻塞。

如果工作线程生成的结果不可接受(例如发生错误),工作线程可以选择不发送任何结果。当然,必须再次小心处理,因为如果所有工作线程都失败,将不会传递任何结果,接收方可能会一直等待。可以通过使用超时机制或使工作线程发送指示失败的结果(接收方必须处理该结果,并在这种情况下继续接收,直到接收到一个好的结果或不再有结果到来)来避免这种情况。

英文:

Usually this is solved by making each worker deliver the result on the same channel, and receive from the channel (once). Whoever is faster, its result will be used. It's also a good idea to signal other, slow workers this by e.g. using a context.Context (that their work is no longer needed, and they should abort as soon as possible).

If you only receive once from the channel, care must be taken not to block the slow workers in case they end up finishing their work and trying to send the result: they may get blocked forever. So the channel should have sufficiently large buffer to not block the workers, or the workers should send using a select statement having a default branch to not get blocked in case the send operation cannot be executed.

If the result produced by workers is not acceptable (e.g. an error occurred), the workers may of course decide not to send any results. Care must be taken again of course, because if all workers fail, no result would be delivered, so the receiving party could wait forever. This can be avoided by using a timeout, or making the workers send a result that indicates failure (which the receiver has to process, and in which case have to keep receiving until a good result is received or no more results are coming).

答案2

得分: 1

一个使用代码的示例:

func main() {
    result, err := RunConcurrent(1000, TimeConsumingWork)
    fmt.Println("result =", result, "error =", err)
}

type workFn func(ctx context.Context, input any) (any, error)

// RunConcurrent 在`n`个goroutine上执行workFn
func RunConcurrent(n int, fn workFn) (any, error) {

    wg := sync.WaitGroup{}
    wg.Add(n)

    resultChan := make(chan any, n) // 可能有`n`个结果

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel() // 这将通知其他例程停止

    for i := 0; i < n; i++ {
        go func(v any) { // 在新的例程中运行函数
            defer wg.Done() // 通知该例程已完成

            result, err := fn(ctx, v)
            if err != nil {
                return
            }

            select { // 尝试将结果存储在结果通道中。如果ctx.Done,则停止尝试。
            case resultChan <- result:
            case <-ctx.Done():
            }
        }(i)
    }

    go func() {
        wg.Wait()
        close(resultChan) // 在所有例程完成之前不要关闭。这是创建死锁的简单方法...
    }()

    v, ok := <-resultChan // 检查结果是否有效且不是关闭通道的结果
    if !ok {
        return nil, errors.New("没有结果")
    }
    return v, nil
}

func TimeConsumingWork(ctx context.Context, input any) (any, error) {
    simulatedExecutionTime := time.Duration(10+rand.Intn(50)) * time.Millisecond

    select { // 进行耗时的工作,直到上下文完成
    case <-time.After(simulatedExecutionTime):
        if rand.Intn(2) == 0 {
            return nil, errors.New("模拟随机错误")
        }
        return input, nil
    case <-ctx.Done():
        return nil, ctx.Err()
    }
}
英文:

An example using code:

func main() {
result, err := RunConcurrent(1000, TimeConsumingWork)
fmt.Println(&quot;result =&quot;, result, &quot;error =&quot;, err)
}
type workFn func(ctx context.Context, input any) (any, error)
// RunConcurrent executes the workFn on `n` amount of goroutines
func RunConcurrent(n int, fn workFn) (any, error) {
wg := sync.WaitGroup{}
wg.Add(n)
resultChan := make(chan any, n) // there are potentially `n` results
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // this will signal other routines to stop
for i := 0; i &lt; n; i++ {
go func(v any) { // run function as in new routine
defer wg.Done() // signal this routine is done
result, err := fn(ctx, v)
if err != nil {
return
}
select { // try to store result on result channel. If ctx.Done, stop trying.
case resultChan &lt;- result:
case &lt;-ctx.Done():
}
}(i)
}
go func() {
wg.Wait()
close(resultChan) // DO NOT CLOSE before ALL routines are done. This is an easy way to create a deadlock...
}()
v, ok := &lt;-resultChan // check if result is valid and not the result of closed channel
if !ok {
return nil, errors.New(&quot;no results&quot;)
}
return v, nil
}
func TimeConsumingWork(ctx context.Context, input any) (any, error) {
simulatedExecutionTime := time.Duration(10+rand.Intn(50)) * time.Millisecond
select { // do time consuming work, until context is done
case &lt;-time.After(simulatedExecutionTime):
if rand.Intn(2) == 0 {
return nil, errors.New(&quot;simulated a random error&quot;)
}
return input, nil
case &lt;-ctx.Done():
return nil, ctx.Err()
}
}

答案3

得分: 0

你可以创建一个自定义类型的结构体,将result和resultOK组合在一起,并将该类型用于通道。

英文:

> I know of select mechanism, but it does not care for resultOK bool.

You can create a custom type struct which combines result and resultOK and use this type for the channel.

huangapple
  • 本文由 发表于 2022年8月16日 13:47:24
  • 转载请务必保留本文链接:https://go.coder-hub.com/73369277.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定