Go并发循环逻辑

huangapple go评论150阅读模式
英文:

Go Concurrency Circular Logic

问题

我刚刚开始学习Go语言中的并发编程,并尝试创建一个调度协程,它将任务发送到监听 jobchan 通道的工作池中。如果通过 dispatchchan 通道传入了消息到我的调度函数中,而其他协程都在忙碌,那么消息将被追加到调度器的 stack 切片中,并且调度器将在稍后再次尝试发送,直到有可用的工作协程,或者在 dispatchchan 上不再接收到更多的消息。这是因为 dispatchchan 和 jobchan 都是无缓冲的,而工作协程正在运行的协程将向调度器追加其他消息,直到某个点为止,我不希望工作协程被阻塞在调度器上并导致死锁。以下是我目前想出的调度器代码:

func dispatch() {
    var stack []string
    acount := 0
    for {
        select {
        case d := <-dispatchchan:
            stack = append(stack, d)
        case c := <-mw:
            acount = acount + c
        case jobchan <- stack[0]:
            if len(stack) > 1 {
                stack[0] = stack[len(stack)-1]
                stack = stack[:len(stack)-1]
            } else {
                stack = nil
            }
        default:
            if acount == 0 && len(stack) == 0 {
                close(jobchan)
                close(dispatchchan)
                close(mw)
                wg.Done()
                return
            }
        }
    }
}

完整示例代码可在此处查看:https://play.golang.wiki/p/X6kXVNUn5N7

mw 通道是一个缓冲通道,长度与工作协程的数量相同。它充当工作池的信号量。如果工作协程正在执行有意义的工作,它会在 mw 通道上发送整数 1,当工作完成并回到监听 jobchan 的循环中时,它会在 mw 通道上发送整数 -1。这样,调度器就知道工作池是否有正在进行的工作,或者池是否处于空闲状态。如果池处于空闲状态且堆栈上没有更多的消息,则调度器关闭通道并将控制权返回给主函数。

这一切都很好,但我遇到的问题是堆栈本身可能是空的,所以当我尝试将 stack[0] 发送到 jobchan 时,如果堆栈为空,我会得到一个越界错误。我想要解决的问题是如何确保在遇到该情况时,stack[0] 要么有一个值,要么没有。我不希望该情况向 jobchan 发送一个空字符串。

非常感谢您的帮助。如果有更符合Go语言惯用并发模式的解决方案,我很乐意听取。我对这个解决方案并不完全满意,但这是我目前为止的最远进展。

英文:

I'm just getting into concurrency in Go and trying to create a dispatch go routine that will send jobs to a worker pool listening on the jobchan channel. If a message comes into my dispatch function via the dispatchchan channel and my other go routines are busy, the message is appended onto the stack slice in the dispatcher and the dispatcher will try to send again later when a worker becomes available, and/or no more messages are received on the dispatchchan. This is because the dispatchchan and the jobchan are unbuffered, and the go routine the workers are running will append other messages to the dispatcher up to a certain point and I don't want the workers blocked waiting on the dispatcher and creating a deadlock. Here's the dispatcher code I've come up with so far:

func dispatch() {
var stack []string
acount := 0
for {
	select {
	case d := &lt;-dispatchchan:
		stack = append(stack, d)
	case c := &lt;-mw:
		acount = acount + c
	case jobchan &lt;-stack[0]:
		if len(stack) &gt; 1 {
			stack[0] = stack[len(stack)-1]
			stack = stack[:len(stack)-1]
		} else {
			stack = nil
		}
	default:
		if acount == 0 &amp;&amp; len(stack) == 0 {
			close(jobchan)
			close(dispatchchan)
			close(mw)
			wg.Done()
			return
		}
	}
}

Complete example at https://play.golang.wiki/p/X6kXVNUn5N7

The mw channel is a buffered channel the same length as the number of worker go routines. It acts as a semaphore for the worker pool. If the worker routine is doing [m]eaningful [w]ork it throws int 1 on the mw channel and when it finishes its work and goes back into the for loop listening to the jobchan it throws int -1 on the mw. This way the dispatcher knows if there's any work being done by the worker pool, or if the pool is idle. If the pool is idle and there are no more messages on the stack, then the dispatcher closes the channels and return control to the main func.

This is all good but the issue I have is that the stack itself could be zero length so the case where I attempt to send stack[0] to the jobchan, if the stack is empty, I get an out of bounds error. What I'm trying to figure out is how to ensure that when I hit that case, either stack[0] has a value in it or not. I don't want that case to send an empty string to the jobchan.

Any help is greatly appreciated. If there's a more idomatic concurrency pattern I should consider, I'd love to hear about it. I'm not 100% sold on this solution but this is the farthest I've gotten so far.

答案1

得分: 2

这一切都很好,但我遇到的问题是堆栈本身可能是零长度的,所以当我尝试将stack[0]发送到jobchan时,如果堆栈为空,我会得到一个越界错误。

我无法通过你提供的playground链接重现这个问题,但这是可以相信的,因为至少一个gofunc工作器可能已经准备好在该通道上接收。

我的输出是Msgcnt: 0,这也很容易解释,因为当dispatch()运行其select时,gofunc可能没有准备好在jobschan上接收。这些操作的顺序没有定义。

尝试创建一个调度go例程,将作业发送到监听jobchan通道的工作池。

通道不需要调度程序。通道本身就是调度程序。

如果通过dispatchchan通道进入我的调度函数的消息,而我的其他go例程都很忙,那么当有一个工作器可用时,消息将稍后再次发送,或者在dispatchchan上不再接收到更多消息时,消息将稍后再次发送。

通过一些创造性的编辑,很容易将其转化为接近缓冲通道定义的内容。它可以立即读取,或者可以接受一定数量的无法立即调度的消息的“limit”。你确实定义了limit,尽管在代码的其他地方没有使用它。

在任何函数中,定义一个你不读取的变量将导致编译时错误,如“limit declared but not used”。这种限制提高了代码质量,并有助于识别拼写错误。但在包范围内,你已经定义了未使用的limit作为“全局”变量,从而避免了有用的错误-你没有限制任何东西。

不要使用全局变量。使用传递的参数来定义作用域,因为作用域的定义等同于使用go关键字表示的函数并发。将在稍后详细介绍这一点。

回到“limit”,限制你排队的作业数量是有意义的,因为所有资源都是有限的,接受比你预期处理的消息更多需要比进程内存提供的更持久的存储。如果你不感觉有义务无论如何都要满足这些请求,请不要在第一次接受“太多”请求。

那么,哪个函数有dispatchchandispatch()?为了在可以处理它们之前存储一定数量的待处理请求(如果有的话),然后将它们发送给下一个可用的工作器?这正是缓冲通道的用途。

循环逻辑

你的程序何时“知道”完成?main()提供了初始输入,但你在dispatch()中关闭了所有3个通道:

            close(jobchan)
            close(dispatchchan)
            close(mw)

你的工作器将写入它们自己的作业队列,因此只有当工作器完成对其的写入时,才能关闭传入的作业队列。然而,单个工作器也不知道何时关闭作业队列,因为其他工作器正在向其写入。没有人知道你的算法何时完成。这就是你的循环逻辑。

mw通道是一个缓冲通道,长度与工作器go例程的数量相同。它充当工作池的信号量。

这里存在竞争条件。考虑这样一种情况,所有n个工作器刚刚接收到最后的n个作业。它们每个人都从jobschan中读取,并检查ok的值。disptatcher继续运行其select。此时没有人正在写入dispatchchan或从jobschan中读取,因此立即匹配了default情况。len(stack)0,当前没有job,因此dispatcher关闭了所有通道,包括mw。此后的某个时刻,一个工作器尝试向已关闭的通道写入并发生恐慌。

所以现在我准备好提供一些代码了,但我还有一个问题:我没有一个明确的问题陈述来编写代码。

我刚刚开始使用Go中的并发,并尝试创建一个调度go例程,将作业发送到监听jobchan通道的工作池。

goroutine之间的通道就像同步齿轮一样。但是,齿轮转动的目的是什么?你不是在尝试计时,也不是在构建一个发条玩具。你的齿轮可以转动,但成功看起来像什么?它们的转动?

让我们尝试为通道定义一个更具体的用例:给定一组任意长的持续时间作为标准输入中的字符串*,在n个工作器中睡眠这么多秒。为了实际返回一个*结果,我们将说每个工作器将返回持续时间运行的开始和结束时间。

  • 为了使其在playground中运行,我将使用硬编码的字节缓冲区模拟标准输入。
package main

import (
	&quot;bufio&quot;
	&quot;bytes&quot;
	&quot;fmt&quot;
	&quot;os&quot;
	&quot;strings&quot;
	&quot;sync&quot;
	&quot;time&quot;
)

type SleepResult struct {
	worker_id int
	duration  time.Duration
	start     time.Time
	end       time.Time
}

func main() {
	var num_workers = 2
	workchan := make(chan time.Duration)
	resultschan := make(chan SleepResult)
	var wg sync.WaitGroup
	var resultswg sync.WaitGroup
	resultswg.Add(1)
	go results(&amp;resultswg, resultschan)
	for i := 0; i &lt; num_workers; i++ {
		wg.Add(1)
		go worker(i, &amp;wg, workchan, resultschan)
	}
	// playground doesn&#39;t have stdin
	var input = bytes.NewBufferString(
		strings.Join([]string{
			&quot;3ms&quot;,
			&quot;1 seconds&quot;,
			&quot;3600ms&quot;,
			&quot;300 ms&quot;,
			&quot;5s&quot;,
			&quot;0.05min&quot;}, &quot;\n&quot;) + &quot;\n&quot;)

	var scanner = bufio.NewScanner(input)
	for scanner.Scan() {
		text := scanner.Text()
		if dur, err := time.ParseDuration(text); err != nil {
			fmt.Fprintln(os.Stderr, &quot;Invalid duration&quot;, text)
		} else {
			workchan &lt;- dur
		}
	}
	close(workchan) // we know when our inputs are done
	wg.Wait()       // and when our jobs are done
	close(resultschan)
	resultswg.Wait()
}

func results(wg *sync.WaitGroup, resultschan &lt;-chan SleepResult) {
	for res := range resultschan {
		fmt.Printf(&quot;Worker %d: %s : %s =&gt; %s\n&quot;,
			res.worker_id, res.duration,
			res.start.Format(time.RFC3339Nano), res.end.Format(time.RFC3339Nano))
	}
	wg.Done()
}

func worker(id int, wg *sync.WaitGroup, jobchan &lt;-chan time.Duration, resultschan chan&lt;- SleepResult) {
	var res = SleepResult{worker_id: id}
	for dur := range jobchan {
		res.duration = dur
		res.start = time.Now()
		time.Sleep(res.duration)
		res.end = time.Now()
		resultschan &lt;- res
	}
	wg.Done()
}

在这里,我使用了2个等待组,一个用于工作器,一个用于结果。这确保在main()结束之前,我完成了所有结果的写入。我通过使每个函数一次只做一件事情来保持我的函数简单:main读取输入,解析其中的持续时间,并将其发送到下一个工作器。results函数收集结果并将其打印到标准输出。工作器执行睡眠操作,从jobchan读取并写入resultschan

workchan可以是缓冲的(或者像这种情况下一样不缓冲);这并不重要,因为输入将以可以处理的速度读取。我们可以缓冲任意数量的输入,但我们无法缓冲无限数量。我将通道大小设置为最大为1e6-但是一百万远远小于无限。对于我的用例,我根本不需要进行任何缓冲。

main知道输入何时完成,并且可以关闭jobschanmain还知道作业何时完成(wg.Wait()),并且可以关闭结果通道。关闭这些通道对于workerresults goroutine来说是一个重要的信号-它们可以区分空通道和保证不会有任何新添加的通道。

for job := range jobchan {...}是你更冗长的缩写形式:

for {
  job, ok :=  &lt;- jobchan
  if !ok {
    wg.Done()
    return
  }
  ...
}

请注意,此代码创建了2个工作器,但它可以创建20个、2000个甚至1个。无论工作池中有多少个工作器,程序都可以正常运行。它可以处理任何数量的输入(尽管无休止的输入当然会导致无休止的程序)。它不会创建输出到输入的循环。如果你的用例需要作业创建更多作业,那么这是一种更具挑战性的情况,通常可以通过仔细规划来避免。

希望这给你一些关于如何更好地在Go应用程序中使用并发的好主意。

https://play.golang.wiki/p/cZuI9YXypxI

英文:

> This is all good but the issue I have is that the stack itself could be zero length so the case where I attempt to send stack[0] to the jobchan, if the stack is empty, I get an out of bounds error.

I can't reproduce it with your playground link, but it's believable, because at lest one gofunc worker might have been ready to receive on that channel.

My output has been Msgcnt: 0, which is also easily explained, because gofunc might not have been ready to receive on jobschan when dispatch() runs its select. The order of these operations is not defined.

> trying to create a dispatch go routine that will send jobs to a worker pool listening on the jobchan channel

A channel needs no dispatcher. A channel is the dispatcher.

>If a message comes into my dispatch function via the dispatchchan channel and my other go routines are busy, the message is [...] will [...] send again later when a worker becomes available, [...] or no more messages are received on the dispatchchan.

With a few creative edits, it was easy to turn that into something close to the definition of a buffered channel. It can be read from immediately, or it can take up to some "limit" of messages that can't be immediately dispatched. You do define limit, though it's not used elsewhere within your code.

In any function, defining a variable you don't read will result in a compile time error like limit declared but not used. This stricture improves code quality and helps identify typeos. But at package scope, you've gotten away with defining the unused limit as a "global" and thus avoided a useful error - you haven't limited anything.

Don't use globals. Use passed parameters to define scope, because the definition of scope is tantamount to functional concurrency as expressed with the go keyword. Pass the relevant channels defined in local scope to functions defined at package scope so that you can easily track their relationships. And use directional channels to enforce the producer/consumer relationship between your functions. More on this later.

Going back to "limit", it makes sense to limit the quantity of jobs you're queueing because all resources are limited, and accepting more messages than you have any expectation of processing requires more durable storage than process memory provides. If you don't feel obligated to fulfill those requests no matter what, don't accept "too many" of them in the first place.

So then, what function has dispatchchan and dispatch()? To store a limited number of pending requests, if any, before they can be processed, and then to send them to the next available worker? That's exactly what a buffered channel is for.

> Circular Logic

Who "knows" when your program is done? main() provides the initial input, but you close all 3 channels in `dispatch():

            close(jobchan)
            close(dispatchchan)
            close(mw)

Your workers write to their own job queue so only when the workers are done writing to it can the incoming job queue be closed. However, individual workers also don't know when to close the jobs queue because other workers are writing to it. Nobody knows when your algorithm is done. There's your circular logic.

> The mw channel is a buffered channel the same length as the number of worker go routines. It acts as a semaphore for the worker pool.

There's a race condition here. Consider the case where all n workers have just received the last n jobs. They've each read from jobschan and they're checking the value of ok. disptatcher proceeds to run its select. Nobody is writing to dispatchchan or reading from jobschan right now so the default case is immediately matched. len(stack) is 0 and there's no current job so dispatcher closes all channels including mw. At some point thereafter, a worker tries to write to a closed channel and panics.

So finally I'm ready to provide some code, but I have one more problem: I don't have a clear problem statement to write code around.

> I'm just getting into concurrency in Go and trying to create a dispatch go routine that will send jobs to a worker pool listening on the jobchan channel.

Channels between goroutines are like the teeth that synchronize gears. But to what end do the gears turn? You're not trying to keep time, nor construct a wind-up toy. Your gears could be made to turn, but what would success look like? Their turning?

Let's try to define a more specific use case for channels: given an arbitrarily long set of durations coming in as strings on standard input*, sleep that many seconds in one of n workers. So that we actually have a result to return, we'll say each worker will return the start and end time the duration was run for.

  • So that it can run in the playground, I'll simulate standard input with a hard-coded byte buffer.
package main
import (
&quot;bufio&quot;
&quot;bytes&quot;
&quot;fmt&quot;
&quot;os&quot;
&quot;strings&quot;
&quot;sync&quot;
&quot;time&quot;
)
type SleepResult struct {
worker_id int
duration  time.Duration
start     time.Time
end       time.Time
}
func main() {
var num_workers = 2
workchan := make(chan time.Duration)
resultschan := make(chan SleepResult)
var wg sync.WaitGroup
var resultswg sync.WaitGroup
resultswg.Add(1)
go results(&amp;resultswg, resultschan)
for i := 0; i &lt; num_workers; i++ {
wg.Add(1)
go worker(i, &amp;wg, workchan, resultschan)
}
// playground doesn&#39;t have stdin
var input = bytes.NewBufferString(
strings.Join([]string{
&quot;3ms&quot;,
&quot;1 seconds&quot;,
&quot;3600ms&quot;,
&quot;300 ms&quot;,
&quot;5s&quot;,
&quot;0.05min&quot;}, &quot;\n&quot;) + &quot;\n&quot;)
var scanner = bufio.NewScanner(input)
for scanner.Scan() {
text := scanner.Text()
if dur, err := time.ParseDuration(text); err != nil {
fmt.Fprintln(os.Stderr, &quot;Invalid duration&quot;, text)
} else {
workchan &lt;- dur
}
}
close(workchan) // we know when our inputs are done
wg.Wait()       // and when our jobs are done
close(resultschan)
resultswg.Wait()
}
func results(wg *sync.WaitGroup, resultschan &lt;-chan SleepResult) {
for res := range resultschan {
fmt.Printf(&quot;Worker %d: %s : %s =&gt; %s\n&quot;,
res.worker_id, res.duration,
res.start.Format(time.RFC3339Nano), res.end.Format(time.RFC3339Nano))
}
wg.Done()
}
func worker(id int, wg *sync.WaitGroup, jobchan &lt;-chan time.Duration, resultschan chan&lt;- SleepResult) {
var res = SleepResult{worker_id: id}
for dur := range jobchan {
res.duration = dur
res.start = time.Now()
time.Sleep(res.duration)
res.end = time.Now()
resultschan &lt;- res
}
wg.Done()
}

Here I use 2 wait groups, one for the workers, one for the results. This makes sure Im done writing all the results before main() ends. I keep my functions simple by having each function do exactly one thing at a time: main reads inputs, parses durations from them, and sends them off to the next worker. The results function collects results and prints them to standard output. The worker does the sleeping, reading from jobchan and writing to resultschan.

workchan can be buffered (or not, as in this case); it doesn't matter because the input will be read at the rate it can be processed. We can buffer as much input as we want, but we can't buffer an infinite amount. I've set channel sizes as big as 1e6 - but a million is a lot less than infinite. For my use case, I don't need to do any buffering at all.

main knows when the input is done and can close the jobschan. main also knows when jobs are done (wg.Wait()) and can close the results channel. Closing these channels is an important signal to the worker and results goroutines - they can distinguish between a channel that is empty and a channel that is guaranteed not to have any new additions.

for job := range jobchan {...} is shorthand for your more verbose:

for {
job, ok :=  &lt;- jobchan
if !ok {
wg.Done()
return
}
...
}

Note that this code creates 2 workers, but it could create 20 or 2000, or even 1. The program functions regardless of how many workers are in the pool. It can handle any volume of input (though interminable input of course leads to an interminable program). It does not create a cyclic loop of output to input. If your use case requires jobs to create more jobs, that's a more challenging scenario that can typically be avoided with careful planning.

I hope this gives you some good ideas about how you can better use concurrency in your Go applications.

https://play.golang.wiki/p/cZuI9YXypxI

huangapple
  • 本文由 发表于 2021年11月20日 11:56:49
  • 转载请务必保留本文链接:https://go.coder-hub.com/70043127.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定