
huangapple go评论97阅读模式

Semi asynchronous code logic




  1. Seeder(播种器)
  2. Worker(工作者)
  3. Publisher(发布器)
  4. Updater(更新器)



Seeder -> Worker -> Publisher -> Updater -> Seeder -> Worker -> Publisher -> Updater ...





func main() {
    var wg sync.WaitGroup
    c := make(chan Result, 100)

    for {
        data := Seeder()
        msgs := Worker(data)
        results := Publisher(msgs)

        for i := 0; i < 10; i++ {
            go func(){
                defer wg.Done()
                data := <- c

                // 这是更新器

        for _, result := range results {
            c <- result





I'm struggling to figure out a working design that would mix together synchronous flow with asynchronous behavior.

I've 4 components:

  1. Seeder
  2. Worker
  3. Publisher
  4. Updater

The only limitation I've is that once Seeder seeds data it must be blocked up until Updater is not fully finished with processing all tasks. The first 3 components could easily be synchronous but the Updater must work in parallel or it would take forever to finish the tasks.

So the flow is:

Seeder -&gt; Worker -&gt; Publisher -&gt; Updater --&gt; Seeder -&gt; Worker -&gt; Publisher -&gt; Updater ...

and this flow must rotate forever.

The seeding and updating is towards a database. Unfortunately this particular database doesn't allow for a different design.

The best I got to is using sync.WaitGroup to sync the Updater goroutines and leave everything else in a synchronous state. The data to the Updater are provided through a channel.

Here is a simplified code (no errors, not much logic in).

func main() {
    var wg sync.WaitGroup
    c := make(chan Result, 100)

    for {
        data := Seeder()
        msgs := Worker(data)
        results := Publisher(msgs)

        for i := 0; i &lt; 10; i++ {
            go func(){
                defer wg.Done()
                data := &lt;- c

                // this is the updater

        for _, result := range results {
            c &lt;- result

The result is that the code works up until it halts at some cycle and never moves forward. I've played with many variables, loading 100 rows instead of 10k and the result is not much different.

I also tried to pass a struct containing channels and run everything asynchronously but I've even harder time figuring out when Updater is finished so I can unblock the seeder.

Any pointers are appreciated.


得分: 0


[编辑] 我认为您正在寻找的是这样的东西:

package main

import (

// Result is a type
type Result struct {
	I int

// Seeder is a function
func Seeder() []int {
	return []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}

// Worker is a function
func Worker(data []int) []int {
	return data

// Publisher is a function
func Publisher(data []int) []Result {
	var r []Result
	for i := 0; i < len(data); i++ {
		r = append(r, Result{I: data[i]})
	return r

func updater(c chan Result, wg *sync.WaitGroup) {
	for _ = range c {
		// update here

func main() {
	var wg sync.WaitGroup

	c := make(chan Result, 100)
	for i := 0; i < 10; i++ {
		go updater(c, &wg)

	for {
		data := Seeder()
		msgs := Worker(data)
		results := Publisher(msgs)

		for _, result := range results {
			c <- result

It is hard to tell because your code cannot be compiled and run, and it is not clear how you use c. At least one thing is sure : wg should be passed by reference, not by value (sync.WaitGroup has the nocopy annotation). Then, I suppose you use c to send values to the updater. But you don’t provide their code, so I can only guess. For example, suppose that the scheduling happens such that the first 9 goroutines take all there is to read in the channel; then, the last routine is blocked forever and will never release the WaitGroup. In that case, a simple solution is to create a fresh channel in each iteration of your outermost for loop (move line 3 down two lines) and close c right before calling wg.Wait(). Your updaters must be able to handle a read from a close channel.

[edit] I think what you are looking for is something like this:

package main
import (
// Result is a type
type Result struct {
I int
// Seeder is a function
func Seeder() []int {
return []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}
// Worker is a function
func Worker(data []int) []int {
return data
// Publisher is a function
func Publisher(data []int) []Result {
var r []Result
for i := 0; i &lt; len(data); i++ {
r = append(r, Result{I: data[i]})
return r
func updater(c chan Result, wg *sync.WaitGroup) {
for _ = range c {
// update here
func main() {
var wg sync.WaitGroup
c := make(chan Result, 100)
for i := 0; i &lt; 10; i++ {
go updater(c, &amp;wg)
for {
data := Seeder()
msgs := Worker(data)
results := Publisher(msgs)
for _, result := range results {
c &lt;- result

  • 本文由 发表于 2017年3月11日 10:04:30
  • 转载请务必保留本文链接:https://go.coder-hub.com/42730506.html



:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:
