将多个goroutine与通道结合使用

huangapple go评论87阅读模式
英文:

Combining several goroutines with channels

问题

我有以下使用案例:

  1. "fetch" goroutine 将根据一些预定义的条件从数据库中获取可用数据。
  2. 然后,我将有两个 goroutine(process1、process2),每个 goroutine 对数据进行一些操作(顺序很重要)。
  3. 最后一个 goroutine(processSave)应该更新数据库中的数据。

我理解我需要使用通道将每个 goroutine 与下一个 goroutine 连接起来:
FetchToP1Chnl、P1ToP2Chnl、P2ToP3Chnl、P3ToSaveChnl。

对于 "working" 对象上的操作必须按顺序运行:fetch -> process1 -> process2 -> processSave。

更新:
我忘了提到 process1 和 process2 函数会更改对象。process2 需要使用 process1 的结果。

我不确定的问题:

  • 在这里,哪种类型的通道更合适:无缓冲通道还是有缓冲通道(如果是有缓冲通道,如何选择最佳大小)?
  • 在哪里更好地打开这些通道?(我认为应该在主函数中)
  • 在哪里更好地关闭这些通道?我的应用程序预计会持续运行。

我考虑了以下方法:

type Object struct {
    ID           string `bson:"_id"`
    Data         string `bson:"data"`
    Subdocument1 string // 由 process1 添加
    Subdocument2 string // 由 process2 添加
}

func main() {
    // 设置 MongoDB 客户端
    clientOptions := options.Client().ApplyURI("mongodb://localhost:27017")
    client, err := mongo.Connect(context.Background(), clientOptions)
    if err != nil {
        log.Fatal(err)
    }

    // 从 MongoDB 获取对象
    collection := client.Database("your-database").Collection("your-collection")
    cursor, err := collection.Find(context.Background(), bson.M{})
    if err != nil {
        log.Fatal(err)
    }

    // 创建用于 goroutine 之间通信的通道
    objectCh1 := make(chan Object)
    objectCh2 := make(chan Object)

    // 创建等待组以等待 goroutine 完成
    var wg sync.WaitGroup

    // 启动 fetcher goroutine
    wg.Add(1)
    go fetchObjects(cursor, objectCh1, &wg)

    // 启动 process1 goroutine
    wg.Add(1)
    go process1(objectCh1, objectCh2, &wg)

    // 启动 process2 goroutine
    wg.Add(1)
    go process2(objectCh2, &wg)

    // 等待所有 goroutine 完成
    wg.Wait()

    // 关闭 MongoDB 客户端
    if err := client.Disconnect(context.Background()); err != nil {
        log.Fatal(err)
    }

    fmt.Println("完成")
}

// fetchObjects 从游标中获取对象并将其发送到 objectCh1
func fetchObjects(cursor *mongo.Cursor, objectCh1 chan<- Object, wg *sync.WaitGroup) {
    defer close(objectCh1)
    defer wg.Done()

    for cursor.Next(context.Background()) {
        var obj Object
        err := cursor.Decode(&obj)
        if err != nil {
            log.Println(err)
            continue
        }

        objectCh1 <- obj
    }

    if err := cursor.Err(); err != nil {
        log.Println(err)
    }
}

func process1(objectCh1 <-chan Object, objectCh2 chan<- Object, wg *sync.WaitGroup) {
    defer wg.Done()

    for obj := range objectCh1 {
        obj.Subdocument1 = "subdocument1"
        // 在 MongoDB 中进行其他处理或更新
        updatedObject := updateObjectInMongoDB(obj)
        objectCh2 <- updatedObject
    }

    close(objectCh2)
}

func process2(objectCh2 <-chan Object, wg *sync.WaitGroup) {
    defer wg.Done()

    for obj := range objectCh2 {
        obj.Subdocument2 = "subdocument2"
        // 在 MongoDB 中进行其他处理或更新
        updateObjectInMongoDB(obj)
    }
}

// updateObjectInMongoDB 是一个占位函数,用于更新 MongoDB 中的对象
func updateObjectInMongoDB(obj Object) Object {
    fmt.Printf("更新的对象:%+v\n", obj)
    // 在 MongoDB 中更新对象的逻辑
}

以上是你提供的代码的翻译。

英文:

I have the following use case:

  1. "fetch" goroutine will fetch available data from db based on some predefined criteria.
  2. then I will have, let's say 2 goroutines (process1, process2), each one of them is making some manipulation to the data (and the order is important).
  3. the last goroutine (processSave) should update data in DB.

I understand that I need to use channels connecting each one of the goroutines with the next one in raw:
FetchToP1Chnl, P1ToP2Chnl, P2ToP3Chnl, P3ToSaveChnl.

Operations on the "working" object have to run in a sequential manner: fetch -> process1 -> process2 -> processSave .

Update:
I forgot to mention that process1 and process2 functions are changing the object. Process2 needs to work with the result of process1.

Questions that I'm not sure about:

  • What kind of a channel is more suitable here: unbuffered or buffered (it the buffered one, so how to choose the optimal size)
  • where is better to open those channels? (I believe that it should be done in main)
  • where is better to place the closing for the channels? My application is expected to be running non-stop

I was thinking about the following approach:

type Object struct {
ID           string `bson:&quot;_id&quot;`
Data         string `bson:&quot;data&quot;`
Subdocument1 string // Added by process1
Subdocument2 string // Added by process2
}
func main() {
// Set up MongoDB client
clientOptions := options.Client().ApplyURI(&quot;mongodb://localhost:27017&quot;)
client, err := mongo.Connect(context.Background(), clientOptions)
if err != nil {
log.Fatal(err)
}
// Fetch objects from MongoDB
collection := client.Database(&quot;your-database&quot;).Collection(&quot;your-collection&quot;)
cursor, err := collection.Find(context.Background(), bson.M{})
if err != nil {
log.Fatal(err)
}
// Create channels for communication between goroutines
objectCh1 := make(chan Object)
objectCh2 := make(chan Object)
// Create wait group to wait for goroutines to finish
var wg sync.WaitGroup
// Start the fetcher goroutine
wg.Add(1)
go fetchObjects(cursor, objectCh1, &amp;wg)
// Start process1 goroutine
wg.Add(1)
go process1(objectCh1, objectCh2, &amp;wg)
// Start process2 goroutine
wg.Add(1)
go process2(objectCh2, &amp;wg)
// Wait for all goroutines to finish
wg.Wait()
// Close MongoDB client
if err := client.Disconnect(context.Background()); err != nil {
log.Fatal(err)
}
fmt.Println(&quot;Done&quot;)
}
// fetchObjects fetches objects from the cursor and sends them to objectCh1
func fetchObjects(cursor *mongo.Cursor, objectCh1 chan&lt;- Object, wg *sync.WaitGroup) {
defer close(objectCh1)
defer wg.Done()
for cursor.Next(context.Background()) {
var obj Object
err := cursor.Decode(&amp;obj)
if err != nil {
log.Println(err)
continue
}
objectCh1 &lt;- obj
}
if err := cursor.Err(); err != nil {
log.Println(err)
}
}
func process1(objectCh1 &lt;-chan Object, objectCh2 chan&lt;- Object, wg *sync.WaitGroup) {
defer wg.Done()
for obj := range objectCh1 {
obj.Subdocument1 = &quot;subdocument1&quot;
// Do additional processing or update in MongoDB
updatedObject:=updateObjectInMongoDB(obj)
objectCh2 &lt;- updatedObject
}
close(objectCh2)
}
func process2(objectCh2 &lt;-chan Object, wg *sync.WaitGroup) {
defer wg.Done()
for obj := range objectCh2 {
obj.Subdocument2 = &quot;subdocument2&quot;
// Do additional processing or update in MongoDB
updateObjectInMongoDB(obj)
}
}
// updateObjectInMongoDB is a placeholder function to update the object in MongoDB
func updateObjectInMongoDB(obj Object) Object {
fmt.Printf(&quot;Updated Object: %+v\n&quot;, obj)
// Your logic to update the object in MongoDB
}

答案1

得分: 1

通道:你可能想要使用带缓冲的通道,这样即使读取者暂时忙碌,发送者协程也可以继续工作(我在这里假设你想增加并发性)。

在哪里打开通道:良好的实践是在协程之外的地方“连接”协程/通道图,所以是的,可能是在主函数中。

关闭通道:如果你的应用程序一直在运行,那么你可能根本不需要关闭通道。但这很棘手 - 大多数服务应用程序实际上并不会永远运行。你可能希望实现优雅的关闭(例如响应信号)。经验法则是发送者负责关闭通道。

另外一个要点:不明显为什么你需要以这种方式设计并发性。你可以有一个获取器协程(我假设数据库读取不应该并行化),然后将对象分配给多个工作协程,每个协程在给定对象上按顺序执行 process1、process2 和保存操作。

英文:

Channels: you probably want buffered ones, so as to make it possible for sender goroutines to continue working even when reader ones are temporarily busy (I'm assuming here that you want to increase concurrency).

Where to open channels: it is good practice to "wire" the goroutines/channels graph outside the goroutines themselves, so yes, probably in main.

Closing channels: if your application is going non stop, then you may not have to close channels at all. But this is tricky - most serving applications don't really run forever. You may want to implement a graceful shutdown (e.g. in response to a signal). The rule of thumb is that it's the sender responsibility to close channels.

One additional point: it is not obvious why you need to design concurrency this way. You could have one fetcher goroutine (I assume the DB reads should not be parallelized) and then split the objects between several worker goroutines, each doing process1, process2 & save sequentially on a given object.

huangapple
  • 本文由 发表于 2023年7月11日 21:52:17
  • 转载请务必保留本文链接:https://go.coder-hub.com/76662721.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定