使用Go语言编写的MongoDB代码,通过管道监听指定id的文档更新。

huangapple go评论75阅读模式
英文:

MongoDB | Go using a pipeline to listen to updates on a document by id

问题

我正在尝试创建一个函数,用于监视数据库中特定id的文档是否更新,但它不起作用。它在更新文档时仍然保持活动状态,而函数应该返回。我尝试了多种方法,代码的其余部分都正常工作。当我删除id部分,并监听该集合中所有文档的更新时,函数按预期工作。

func iterateChangeStream(routineCtx context.Context, stream *mongo.ChangeStream, chn chan string) {
	defer stream.Close(routineCtx)

	for stream.Next(routineCtx) {
		var data bson.M
		if err := stream.Decode(&data); err != nil {
			fmt.Println(err)
		}
		chn <- "updated"
		err := stream.Close(routineCtx)
		if err != nil {
			return
		}
		return
	}
	return
}

func (s Storage) ListenForScannerUpdateById(id primitive.ObjectID) {
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
	defer cancel()
	chn := make(chan string)

	coll := s.db.Collection("scanners")

	scan, err := s.GetScannerById(id)
	fmt.Println(scan)

	matchPipeline := bson.D{
		{
			"$match", bson.D{
				{"operationType", "update"},
				{"fullDocument._id", bson.D{
					{"$eq", id},
				}},
			},
		},
	}

	scannerStream, err := coll.Watch(ctx, mongo.Pipeline{matchPipeline})
	if err != nil {
		err := scannerStream.Close(ctx)
		if err != nil {
			panic(err)
		}
		fmt.Printf("err: %v", err)
	}

	routineCtx, _ := context.WithCancel(context.Background())

	go iterateChangeStream(routineCtx, scannerStream, chn)
	msg, _ := <-chn
	defer close(chn)
	fmt.Println(msg)
	return
}

希望这可以帮助到你。

英文:

I'm trying to make a function that watches the database for a certain document with a certain id to update but it does not work. It just stays alive while updating the document while the function should return. I've tried multiple things and the rest of the code works fine. When i remove the id part and listen for all document updates in that collection the function does as it should

func iterateChangeStream(routineCtx context.Context,stream *mongo.ChangeStream, chn chan string) {
defer stream.Close(routineCtx)
for stream.Next(routineCtx) {
var data bson.M
if err := stream.Decode(&amp;data); err != nil {
fmt.Println(err)
}
chn &lt;- &quot;updated&quot;
err := stream.Close(routineCtx)
if err != nil {
return
}
return
}
return
}
func (s Storage) ListenForScannerUpdateById(id primitive.ObjectID) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
chn := make(chan string)
coll := s.db.Collection(&quot;scanners&quot;)
scan, err := s.GetScannerById(id)
fmt.Println(scan)
matchPipeline := bson.D{
{
&quot;$match&quot;, bson.D{
{&quot;operationType&quot;, &quot;update&quot;},
{&quot;fullDocument._id&quot;, bson.D{
{&quot;$eq&quot;, id},
}},
},
},
}
scannerStream, err := coll.Watch(ctx, mongo.Pipeline{matchPipeline})
if err != nil {
err := scannerStream.Close(ctx)
if err != nil {
panic( err)
}
fmt.Printf(&quot;err: %v&quot;, err)
}
routineCtx, _ := context.WithCancel(context.Background())
go iterateChangeStream(routineCtx, scannerStream, chn)
msg, _ := &lt;- chn
defer close(chn)
fmt.Println(msg)
return
}

答案1

得分: 1

好的,以下是翻译好的内容:

好的,所以在第二次阅读文档后,我找到了这个信息:

对于update操作,只有在将更改流配置为fullDocument设置为updateLookup时,此字段才会出现。然后,该字段表示由更新操作修改的文档的最新的大多数提交版本。如果在原始更新操作和完整文档查找之间有其他大多数提交操作修改了文档,则此文档可能与updateDescription中描述的更改不同。

所以,将fullDocument选项设置为updateLookup后,它就可以正常工作:

scannerStream, err := coll.Watch(ctx, mongo.Pipeline{matchPipeline}, options.ChangeStream().SetFullDocument(options.UpdateLookup))
英文:

Ok, so after reading the documentation for a seccond time i found this:

> For update operations, this field only appears if you configured the change stream with fullDocument set to updateLookup. This field then represents the most current majority-committed version of the document modified by the update operation. This document may differ from the changes described in updateDescription if other majority-committed operations modified the document between the original update operation and the full document lookup.

so after setting the fullDocument option to updateLookup like this it works perfect:

	scannerStream, err := coll.Watch(ctx, mongo.Pipeline{matchPipeline}, options.ChangeStream().SetFullDocument(options.UpdateLookup))

huangapple
  • 本文由 发表于 2022年6月2日 22:58:04
  • 转载请务必保留本文链接:https://go.coder-hub.com/72478283.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定