如何将多个 goroutine 同步到所选 goroutine 的终止(即 Thread.join())?

huangapple go评论82阅读模式
英文:

How to synchronize multiple goroutines to the termination of a selected goroutine (ie. Thread.join())

问题

我之前问过这个问题,但有些人觉得我的原始问题不够详细(“为什么你会想要一个定时条件等待??”),所以这里有一个更具体的问题。

我有一个正在运行的goroutine,称之为server。它已经启动,会执行一段时间,并完成它的任务。然后,它会退出,因为它已经完成了。

在它执行期间,会启动大量的其他goroutine。如果你愿意,可以称它们为“client”线程。它们会执行步骤A和步骤B。然后,它们必须等待“server” goroutine 完成一段指定的时间,并在“server”没有完成时以状态退出,并在它完成时运行步骤C。

(请不要告诉我如何重新组织这个工作流程。这是假设的,并且是一个给定的条件。它不能被改变。)

一个正常、合理的方法是让server线程使用selectAll或Broadcast函数向条件变量发出信号,并让其他线程处于定时等待状态,监视条件变量。

func (s *Server) Join(timeMillis int) error {
  s.mux.Lock()
  defer s.mux.Unlock()
  for !s.isFinished {
     err = s.cond.Wait(timeMillis)
     if err != nil {
        stepC()
     }
  }
  return err
}

在这种情况下,server将进入一个状态,isFinished变为true,并原子地向条件变量广播信号,与互斥锁相对应。但是这是不可能的,因为Go不支持定时条件等待(但有Broadcast())。

那么,有没有符合Go风格的方法来实现这个?我已经阅读了所有关于Go的博客和文档,这种模式或其等效物,尽管显而易见,从未出现过,也没有任何等效的“重新构建”基本问题的方法——即IPC风格的通道只能在一个例程和另一个例程之间。是的,有扇入/扇出,但请记住,这些线程不断出现和消失。这应该很简单——关键是/不要让成千上万的“等待状态”goroutine一直挂在那里等待server死亡,当mux通道的另一个“分支”(计时器)发出信号时。

请注意,上面的一些“client”可能在server goroutine启动之前启动(通常在创建通道时),有些可能在server执行期间出现,有些可能在之后出现...在所有情况下,只有在server在进入Join()函数后的timeMillis毫秒内运行并退出时,它们才应该运行步骤C...

总的来说,当有多个消费者时,通道设施似乎非常缺乏。"首先构建一个将监听器映射到通道的注册表"和"有这个非常巧妙的递归数据结构,它将自己发送到它作为字段持有的通道上"作为对好用、可靠、友好、明显的"等待一段时间"的替代品是不可取的。

英文:

I asked this in a previous question, but some people felt that my original question was not detailed enough ("Why would you ever want a timed condition wait??") so here is a more specific one.

I have a goroutine running, call it server. It is already started, will execute for some amount of time, and do its thing. Then, it will exit since it is done.

During its execution some large number of other goroutines start. Call them "client" threads if you like. They run step A, and step B. Then, they must wait for the "server" goroutine to finish for a specified amount of time, and exit with status <X> if "server is not finished, and say run step C if it finishes.

(Please do not tell me how to restructure this workflow. It is hypothetical and a given. It cannot be changed.)

A normal, sane way to do this is to have the server thread signal a condition variable with a selectAll or Broadcast function, and have the other threads in a timed wait state monitoring the condition variable.

func (s *Server) Join(timeMillis int) error {
  s.mux.Lock()
  defer s.mux.Unlock()
  while !s.isFinished {
     err = s.cond.Wait(timeMillis)
     if err != nil {
        stepC()
     }
  }
  return err
}

Where the server will enter a state where isFinished becomes true and broadcast signal the condition variable atomically with respect to the mutex. Except this is impoosible, since Go does not support timed condition waits. (But there is a Broadcast())

So, what is the "Go-centric" way to do this? I've reall all of the Go blogs and documentation, and this pattern or its equivalent, despite its obviousness, never comes up, nor any equivalent "reframing" of the basic problem - which is that IPC style channels are between one routine and one other routine. Yes, there is fan-in/fan-out, but remember these threads are constantly appearing and vanishing. This should be simple - and crucially /not leave thousands of "wait-state" goroutines hanging around waiting for the server to die when the other "branch" of the mux channel (the timer) has signalled/.

Note that some of the "client" above might be started before the server goroutine has started (this is when channel is usually created), some might appear during, and some might appear after... in all cases they should run stepC if and only if the server has run and exited after timeMillis milliseconds post entering the Join() function...

In general the channels facility seems sorely lacking when there's more than one consumer. "First build a registry of channels to which listeners are mapped" and "there's this really nifty recursive data structure which sends itself over a channel it holds as field" are so.not.ok as replacements to the nice, reliable, friendly, obvious: wait(forSomeTime)

答案1

得分: 2

我认为你想要的可以通过在单个共享通道上进行选择,并在完成后由服务器关闭来实现。

假设我们创建一个全局的“退出通道”,它在所有goroutine之间共享。可以在创建“服务器”goroutine之前创建它。重要的是,服务器goroutine永远不会向通道发送任何内容,而只是关闭它。

现在,客户端goroutine只需执行以下操作:

select {
    case <-ch:
        fmt.Println("通道已关闭,服务器完成!")
    case <-time.After(time.Second):
        fmt.Println("超时。执行恢复操作")
}

服务器goroutine只需执行以下操作:

close(ch)

更完整的示例:

package main

import (
    "fmt"
    "time"
)

func waiter(ch chan struct{}) {
    fmt.Println("正在处理")
    fmt.Println("等待中...")
    select {
        case <-ch:
            fmt.Println("通道已关闭")
        case <-time.After(time.Second):
            fmt.Println("超时。执行恢复操作")
    }
}

func main() {
    ch := make(chan struct{})
    go waiter(ch)
    go waiter(ch)
    time.Sleep(100 * time.Millisecond)
    fmt.Println("关闭通道")
    close(ch)
    time.Sleep(time.Second)
}

这可以抽象为以下实用程序API:

type TimedCondition struct {
    ch chan struct{}
}

func NewTimedCondition() *TimedCondition {
    return &TimedCondition{
        ch: make(chan struct{}),
    }
}

func (c *TimedCondition) Broadcast() {
    close(c.ch)
}

func (c *TimedCondition) Wait(t time.Duration) error {
    select {
        // 通道关闭,表示调用了广播
        case <-c.ch:
            return nil
        case <-time.After(t):
            return errors.New("超时")
    }
}
英文:

I think what you want can be done by selecting on a single shared channel, and then having the server close it when it's done.

Say we create a global "Exit channel", that's shared across all goroutines. It can be created before the "server" goroutine is created. The important part is that the server goroutine never sends anything down the channel, but simply closes it.

Now the client goroutines, simply do this:

select {
	case &lt;- ch:
	fmt.Println(&quot;Channel closed, server is done!&quot;)
	case &lt;-time.After(time.Second):
	fmt.Println(&quot;Timed out. do recovery stuff&quot;)
	
}

and the server goroutine just does:

close(ch)

More complete example:

package main

import(
	&quot;fmt&quot;
	&quot;time&quot;
	
)
	

func waiter(ch chan struct{}) {
	fmt.Println(&quot;Doing stuff&quot;)
	
	fmt.Println(&quot;Waiting...&quot;)
	
	select {
		case &lt;- ch:
		fmt.Println(&quot;Channel closed&quot;)
		case &lt;-time.After(time.Second):
		fmt.Println(&quot;Timed out. do recovery stuff&quot;)
		
	}
}
	

func main(){
	
	ch := make(chan struct{})
	
	go waiter(ch)
	go waiter(ch)
	time.Sleep(100*time.Millisecond)
	fmt.Println(&quot;Closing channel&quot;)
	close(ch)
	
	time.Sleep(time.Second)
	
}

This can be abstracted as the following utility API:

type TimedCondition struct {
	ch chan struct{}
}

func NewTimedCondition()*TimedCondition {
 	return &amp;TimedCondition {
		ch: make(chan struct{}),
	}
}

func (c *TimedCondition)Broadcast() {
	close(c.ch)
}

func (c *TimedCondition)Wait(t time.Duration) error {
	select {
		// channel closed, meaning broadcast was called
		case &lt;- c.ch:
			return nil
		case &lt;-time.After(t):
			return errors.New(&quot;Time out&quot;)	
	}
}

huangapple
  • 本文由 发表于 2015年4月29日 06:10:31
  • 转载请务必保留本文链接:https://go.coder-hub.com/29930861.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定