英文:
MongoDB | Go using a pipeline to listen to updates on a document by id
问题
我正在尝试创建一个函数,用于监视数据库中特定id的文档是否更新,但它不起作用。它在更新文档时仍然保持活动状态,而函数应该返回。我尝试了多种方法,代码的其余部分都正常工作。当我删除id部分,并监听该集合中所有文档的更新时,函数按预期工作。
func iterateChangeStream(routineCtx context.Context, stream *mongo.ChangeStream, chn chan string) {
defer stream.Close(routineCtx)
for stream.Next(routineCtx) {
var data bson.M
if err := stream.Decode(&data); err != nil {
fmt.Println(err)
}
chn <- "updated"
err := stream.Close(routineCtx)
if err != nil {
return
}
return
}
return
}
func (s Storage) ListenForScannerUpdateById(id primitive.ObjectID) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
chn := make(chan string)
coll := s.db.Collection("scanners")
scan, err := s.GetScannerById(id)
fmt.Println(scan)
matchPipeline := bson.D{
{
"$match", bson.D{
{"operationType", "update"},
{"fullDocument._id", bson.D{
{"$eq", id},
}},
},
},
}
scannerStream, err := coll.Watch(ctx, mongo.Pipeline{matchPipeline})
if err != nil {
err := scannerStream.Close(ctx)
if err != nil {
panic(err)
}
fmt.Printf("err: %v", err)
}
routineCtx, _ := context.WithCancel(context.Background())
go iterateChangeStream(routineCtx, scannerStream, chn)
msg, _ := <-chn
defer close(chn)
fmt.Println(msg)
return
}
希望这可以帮助到你。
英文:
I'm trying to make a function that watches the database for a certain document with a certain id to update but it does not work. It just stays alive while updating the document while the function should return. I've tried multiple things and the rest of the code works fine. When i remove the id part and listen for all document updates in that collection the function does as it should
func iterateChangeStream(routineCtx context.Context,stream *mongo.ChangeStream, chn chan string) {
defer stream.Close(routineCtx)
for stream.Next(routineCtx) {
var data bson.M
if err := stream.Decode(&data); err != nil {
fmt.Println(err)
}
chn <- "updated"
err := stream.Close(routineCtx)
if err != nil {
return
}
return
}
return
}
func (s Storage) ListenForScannerUpdateById(id primitive.ObjectID) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
chn := make(chan string)
coll := s.db.Collection("scanners")
scan, err := s.GetScannerById(id)
fmt.Println(scan)
matchPipeline := bson.D{
{
"$match", bson.D{
{"operationType", "update"},
{"fullDocument._id", bson.D{
{"$eq", id},
}},
},
},
}
scannerStream, err := coll.Watch(ctx, mongo.Pipeline{matchPipeline})
if err != nil {
err := scannerStream.Close(ctx)
if err != nil {
panic( err)
}
fmt.Printf("err: %v", err)
}
routineCtx, _ := context.WithCancel(context.Background())
go iterateChangeStream(routineCtx, scannerStream, chn)
msg, _ := <- chn
defer close(chn)
fmt.Println(msg)
return
}
答案1
得分: 1
好的,以下是翻译好的内容:
好的,所以在第二次阅读文档后,我找到了这个信息:
对于
update
操作,只有在将更改流配置为fullDocument
设置为updateLookup
时,此字段才会出现。然后,该字段表示由更新操作修改的文档的最新的大多数提交版本。如果在原始更新操作和完整文档查找之间有其他大多数提交操作修改了文档,则此文档可能与updateDescription中描述的更改不同。
所以,将fullDocument选项设置为updateLookup后,它就可以正常工作:
scannerStream, err := coll.Watch(ctx, mongo.Pipeline{matchPipeline}, options.ChangeStream().SetFullDocument(options.UpdateLookup))
英文:
Ok, so after reading the documentation for a seccond time i found this:
> For update
operations, this field only appears if you configured the change stream with fullDocument
set to updateLookup
. This field then represents the most current majority-committed version of the document modified by the update operation. This document may differ from the changes described in updateDescription if other majority-committed operations modified the document between the original update operation and the full document lookup.
so after setting the fullDocument option to updateLookup like this it works perfect:
scannerStream, err := coll.Watch(ctx, mongo.Pipeline{matchPipeline}, options.ChangeStream().SetFullDocument(options.UpdateLookup))
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论