在Golang驱动程序中,Mongo DB的”Collection.Watch”方法与聚合管道不兼容。

huangapple go评论89阅读模式
英文:

In Golang Driver for Mongo DB "Collection.Watch" doesn't work with Aggregation Pipeline

问题

我正在尝试在集合上创建一个 MongoDB 的“Change Stream”,并监听特定文档的更改。

matchID := bson.D{
    {"$match", bson.M{"_id": tid}},
}
stream, err := collection.Watch(ctx, mongo.Pipeline{matchID}, options.ChangeStream().SetFullDocument(options.UpdateLookup))

然后,使用一个循环来监听流中的更改。

for stream.Next(ctx) {
    log.Println("stream.Next")
    if err = stream.Decode(&event); err != nil {
        log.Printf("error decoding: %s", err)
    }
    log.Printf("change event: %v", event)
    publisher.NotifyEvent(event["fullDocument"])
}

当我移除 pipeline 参数时,整个设置可以正常工作,并且我可以在循环中获取数据。但是当我添加了 pipeline 过滤器后,它就停止工作了。

英文:

I am trying to create a mongodb "Change Stream" on collection and listen to changes on a specific document.

matchID := bson.D{
		{"$match", bson.M{"_id": tid}},
	}
stream, err := collection.Watch(ctx, mongo.Pipeline{matchID}, options.ChangeStream().SetFullDocument(options.UpdateLookup))

Then listen on the stream with a for loop.

for stream.Next(ctx) {
		log.Println("stream.Next")
		if err = stream.Decode(&event); err != nil {
			log.Printf("error decoding: %s", err)
		}
		log.Printf("change event: %v", event)
		publisher.NotifyEvent(event["fullDocument"])
	}

The whole setup works when I remove the pipeline argument, and I start getting data in loop. But when I add the pipeline filter, it stops working.

答案1

得分: 2

我使用的过滤器是错误的,因为管道过滤的是事件流而不是实际文档。我必须将过滤器更改为:

matchID := bson.D{
    {"$match", bson.M{"fullDocument._id": tid}},
}

而不是:

matchID := bson.D{
    {"$match", bson.M{"_id": tid}},
}

额外的关键字"fullDocument"是必需的,因为事件流将更新的文档保存在"fullDocument"字段下。

更多详细信息可以在这篇文章中找到:
https://www.mongodb.com/basics/change-streams

英文:

The filter I used was wrong, since the pipeline filters the event stream rather that the actual document.
I had the change the filter to

 matchID := bson.D{
        {"$match", bson.M{"fullDocument._id": tid}},
    }

instead of

matchID := bson.D{
            {"$match", bson.M{"_id": tid}},
        }

the additional keyword "fullDocument" is necessary because the event stream holds the updated document under the field "fullDocument"

More details can be found in this article.
https://www.mongodb.com/basics/change-streams

huangapple
  • 本文由 发表于 2021年7月10日 06:55:22
  • 转载请务必保留本文链接:https://go.coder-hub.com/68323390.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定