有条件地运行连续的Go协程

huangapple go评论89阅读模式
英文:

Conditionally Run Consecutive Go Routines

问题

我有以下一段代码。我试图同时运行3个Go协程,但不超过三个。这个代码的运行结果是符合预期的,但代码的目的是更新数据库中的一个表。

所以第一个协程处理前50行,第二个协程处理接下来的50行,然后第三个协程处理再接下来的50行,然后重复这个过程。我不希望两个协程同时处理同一行,但由于更新操作所需的时间较长,几乎每次都会发生这种情况。

为了解决这个问题,我开始给行添加一个新的列processing,它是一个布尔值。当协程开始时,我将所有需要更新的行的processing列设置为true,并让脚本休眠6秒钟以便更新标志。

这个方法在一段时间内有效,但偶尔会看到2-3个任务再次处理相同的行。我觉得我用来防止重复更新的方法有点不太可靠,想知道是否有更好的方法。

stopper := make(chan struct{}, 3)
var counter int
for {
    counter++
    stopper <- struct{}{}
    go func(db *sqlx.DB, c int) {
        fmt.Println("start")
        updateTables(db)
        fmt.Println("stop")
        <-stopper
    }(db, counter)
    time.Sleep(6 * time.Second)
}

updateTables函数中:

var ids []string
err := sqlx.Select(db, &data, "select * from table_data where processing = false")
if err != nil {
    panic(err)
}

for _, row := range data {
    ids = append(ids, row.Id)
}
if len(ids) == 0 {
    return
}

for _, id := range ids {
    _, err = db.Exec("update table_data set processing = true where id = $1", id)
    if err != nil {
        panic(err)
    }
}
// 其他行处理操作
英文:

I have the following piece of code. I'm trying to run 3 GO routines at the same time never exceeding three. This works as expected, but the code is supposed to be running updates a table in the DB.

So the first routine processes the first 50, then the second 50, and then third 50, and it repeats. I don't want two routines processing the same rows at the same time and due to how long the update takes, this happens almost every time.

To solve this, I started flagging the rows with a new column processing which is a bool. I set it to true for all rows to be updated when the routine starts and sleep the script for 6 seconds to allow the flag to be updated.

This works for a random amount of time, but every now and then, I'll see 2-3 jobs processing the same rows again. I feel like the method I'm using to prevent duplicate updates is a bit janky and was wondering if there was a better way.

stopper := make(chan struct{}, 3)
var counter int
for {
	counter++
	stopper &lt;- struct{}{}
	go func(db *sqlx.DB, c int) {
		fmt.Println(&quot;start&quot;)
		updateTables(db)
		fmt.Println(&quot;stop&quot;b)
		&lt;-stopper
	}(db, counter)
	time.Sleep(6 * time.Second)

}

in updateTables

var ids[]string
err := sqlx.Select(db, &amp;data, `select * from table_data where processing = false `)
	if err != nil {
		panic(err)
	}

	for _, row:= range data{
		list = append(ids, row.Id)
	}
	if len(rows) == 0 {
		return
	}

	for _, row:= range data{
		_, err = db.Exec(`update table_data set processing = true where id = $1, row.Id)
		if err != nil {
			panic(err)
		}
	}
    // Additional row processing

答案1

得分: 1

我认为在这种情况下对Go协程的处理存在误解。

在这种情况下,应该像处理工作线程一样处理Go协程,使用通道作为主协程(负责同步)和工作协程(负责实际工作)之间的通信方法。

你甚至可以在主协程中缓冲数据,如果你需要一次更新所有的50个表。

以下是修改后的代码:

package main

import (
	"log"
	"sync"
	"time"
)

type record struct {
	id int
}

func main() {
	const WORKER_COUNT = 10

	recordschan := make(chan record, WORKER_COUNT) // 使用缓冲通道

	var wg sync.WaitGroup
	for k := 0; k < WORKER_COUNT; k++ {
		wg.Add(1)
		// 创建负责更新的工作协程
		go func(workerID int) {
			defer wg.Done() // 标记工作协程已完成
			for record := range recordschan {
				updateRecord(record)
				log.Printf("请求 %d 被工作协程 %d 处理", record.id, workerID)
			}
		}(k)
	}

	// 向通道中发送记录
	for _, record := range fetchRecords() {
		recordschan <- record
	}

	// 关闭通道,因为我们不再使用它
	close(recordschan)

	// 等待所有的协程完成
	wg.Wait()

	log.Println("处理完成!")
}

func fetchRecords() []record {
	result := []record{}
	for k := 0; k < 100; k++ {
		result = append(result, record{k})
	}
	return result
}

func updateRecord(req record) {
	time.Sleep(200 * time.Millisecond)
}

希望对你有帮助!

英文:

I think there's a misunderstanding on approach to go routines in this case.

Go routines to do these kind of work should be approached like worker Threads, using channels as the communication method in between the main routine (which will be doing the synchronization) and the worker go routines (which will be doing the actual job).

package main

import (
	&quot;log&quot;
	&quot;sync&quot;
	&quot;time&quot;
)

type record struct {
	id int
}

func main() {
	const WORKER_COUNT = 10

	recordschan := make(chan record)

	var wg sync.WaitGroup
	for k := 0; k &lt; WORKER_COUNT; k++ {
		wg.Add(1)
		// Create the worker which will be doing the updates
		go func(workerID int) {
			defer wg.Done() // Marking the worker as done
			for record := range recordschan {
				updateRecord(record)
				log.Printf(&quot;req %d processed by worker %d&quot;, record.id, workerID)
			}
		}(k)
	}

	// Feeding the records channel
	for _, record := range fetchRecords() {
		recordschan &lt;- record
	}

	// Closing our channel as we&#39;re not using it anymore
	close(recordschan)

	// Waiting for all the go routines to finish
	wg.Wait()

	log.Println(&quot;we&#39;re done!&quot;)
}

func fetchRecords() []record {
	result := []record{}
	for k := 0; k &lt; 100; k++ {
		result = append(result, record{k})
	}
	return result
}

func updateRecord(req record) {
	time.Sleep(200 * time.Millisecond)
}

You can even buffer things in the main go routine if you need to update all the 50 tables at once.

huangapple
  • 本文由 发表于 2021年12月23日 22:12:06
  • 转载请务必保留本文链接:https://go.coder-hub.com/70463286.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定