半异步代码逻辑

huangapple go评论97阅读模式
英文:

Semi asynchronous code logic

问题

我正在努力找出一个能够将同步流程与异步行为结合在一起的工作设计方案。

我有4个组件:

  1. Seeder(播种器)
  2. Worker(工作者)
  3. Publisher(发布器)
  4. Updater(更新器)

我唯一的限制是,一旦Seeder播种数据,它必须被阻塞,直到Updater完成所有任务的处理。前三个组件可以很容易地同步进行,但Updater必须并行工作,否则完成任务将需要很长时间。

所以流程是这样的:

Seeder -> Worker -> Publisher -> Updater -> Seeder -> Worker -> Publisher -> Updater ...

并且这个流程必须无限循环。

播种和更新是针对数据库的。不幸的是,这个特定的数据库不允许使用不同的设计。

我能想到的最好的方法是使用sync.WaitGroup来同步Updater的goroutine,并将其他所有内容保持同步状态。向Updater提供数据通过一个通道进行。

以下是一个简化的代码(没有错误,逻辑不多):

func main() {
    var wg sync.WaitGroup
    c := make(chan Result, 100)

    for {
        data := Seeder()
        msgs := Worker(data)
        results := Publisher(msgs)

        for i := 0; i < 10; i++ {
            wg.Add(1)
            go func(){
                defer wg.Done()
                data := <- c

                // 这是更新器
            }(&wg)
        }

        for _, result := range results {
            c <- result
        }
        wg.Wait()
    }
}

结果是,代码工作正常,直到在某个循环中停止,并且无法继续向前移动。我尝试了许多变量,加载100行而不是10,000行,结果并没有太大不同。

我还尝试传递一个包含通道的结构体,并以异步方式运行所有内容,但我很难弄清楚Updater何时完成,以便我可以解除对Seeder的阻塞。

如果有任何指导意见,我将不胜感激。

英文:

I'm struggling to figure out a working design that would mix together synchronous flow with asynchronous behavior.

I've 4 components:

  1. Seeder
  2. Worker
  3. Publisher
  4. Updater

The only limitation I've is that once Seeder seeds data it must be blocked up until Updater is not fully finished with processing all tasks. The first 3 components could easily be synchronous but the Updater must work in parallel or it would take forever to finish the tasks.

So the flow is:

Seeder -&gt; Worker -&gt; Publisher -&gt; Updater --&gt; Seeder -&gt; Worker -&gt; Publisher -&gt; Updater ...

and this flow must rotate forever.

The seeding and updating is towards a database. Unfortunately this particular database doesn't allow for a different design.

The best I got to is using sync.WaitGroup to sync the Updater goroutines and leave everything else in a synchronous state. The data to the Updater are provided through a channel.

Here is a simplified code (no errors, not much logic in).

func main() {
    var wg sync.WaitGroup
    c := make(chan Result, 100)

    for {
        data := Seeder()
        msgs := Worker(data)
        results := Publisher(msgs)

        for i := 0; i &lt; 10; i++ {
            wg.Add(1)
            go func(){
                defer wg.Done()
                data := &lt;- c

                // this is the updater
            }(&amp;wg)
        }

        for _, result := range results {
            c &lt;- result
        }
        wg.Wait()
    }
}

The result is that the code works up until it halts at some cycle and never moves forward. I've played with many variables, loading 100 rows instead of 10k and the result is not much different.

I also tried to pass a struct containing channels and run everything asynchronously but I've even harder time figuring out when Updater is finished so I can unblock the seeder.

Any pointers are appreciated.

答案1

得分: 0

很难确定,因为无法编译和运行您的代码,并且不清楚您如何使用c。至少有一件事是确定的:wg应该通过引用传递,而不是通过值传递(sync.WaitGroup具有nocopy注释)。然后,我猜您使用c将值发送给updater。但是您没有提供它们的代码,所以我只能猜测。例如,假设调度发生在这样的情况下,前9个goroutine获取了通道中的所有内容;然后,最后一个例程永远被阻塞,永远不会释放WaitGroup。在这种情况下,一个简单的解决方案是在外部for循环的每次迭代中创建一个新的通道(将第3行下移两行)并在调用wg.Wait()之前关闭c。您的updater必须能够处理从关闭的通道读取

[编辑] 我认为您正在寻找的是这样的东西:

package main

import (
	"fmt"
	"sync"
)

// Result is a type
type Result struct {
	I int
}

// Seeder is a function
func Seeder() []int {
	fmt.Println("Seeding")
	return []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}
}

// Worker is a function
func Worker(data []int) []int {
	return data
}

// Publisher is a function
func Publisher(data []int) []Result {
	var r []Result
	for i := 0; i < len(data); i++ {
		r = append(r, Result{I: data[i]})
	}
	return r
}

func updater(c chan Result, wg *sync.WaitGroup) {
	for _ = range c {
		// update here
		wg.Done()
	}
}

func main() {
	var wg sync.WaitGroup

	c := make(chan Result, 100)
	for i := 0; i < 10; i++ {
		go updater(c, &wg)
	}

	for {
		data := Seeder()
		msgs := Worker(data)
		results := Publisher(msgs)

		wg.Add(len(results))
		for _, result := range results {
			c <- result
		}
		wg.Wait()
	}
}
英文:

It is hard to tell because your code cannot be compiled and run, and it is not clear how you use c. At least one thing is sure : wg should be passed by reference, not by value (sync.WaitGroup has the nocopy annotation). Then, I suppose you use c to send values to the updater. But you don’t provide their code, so I can only guess. For example, suppose that the scheduling happens such that the first 9 goroutines take all there is to read in the channel; then, the last routine is blocked forever and will never release the WaitGroup. In that case, a simple solution is to create a fresh channel in each iteration of your outermost for loop (move line 3 down two lines) and close c right before calling wg.Wait(). Your updaters must be able to handle a read from a close channel.

[edit] I think what you are looking for is something like this:

package main
import (
&quot;fmt&quot;
&quot;sync&quot;
)
// Result is a type
type Result struct {
I int
}
// Seeder is a function
func Seeder() []int {
fmt.Println(&quot;Seeding&quot;)
return []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}
}
// Worker is a function
func Worker(data []int) []int {
return data
}
// Publisher is a function
func Publisher(data []int) []Result {
var r []Result
for i := 0; i &lt; len(data); i++ {
r = append(r, Result{I: data[i]})
}
return r
}
func updater(c chan Result, wg *sync.WaitGroup) {
for _ = range c {
// update here
wg.Done()
}
}
func main() {
var wg sync.WaitGroup
c := make(chan Result, 100)
for i := 0; i &lt; 10; i++ {
go updater(c, &amp;wg)
}
for {
data := Seeder()
msgs := Worker(data)
results := Publisher(msgs)
wg.Add(len(results))
for _, result := range results {
c &lt;- result
}
wg.Wait()
}
}

huangapple
  • 本文由 发表于 2017年3月11日 10:04:30
  • 转载请务必保留本文链接:https://go.coder-hub.com/42730506.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定