MongoDB变更流在插入操作时返回空的fullDocument。

huangapple go评论115阅读模式
英文:

MongoDB change stream returns empty fullDocument on insert

问题

Mongo 4.4和相应的Golang驱动程序被使用。数据库的副本集在本地运行,地址为localhost:27017localhost:27020。我还尝试使用Atlas的沙盒集群,但结果相同。

根据Mongo的文档,在处理新文档插入时,事件数据的fullDocument字段应该包含新插入的文档,但出于某种原因,对我来说并非如此。ns字段应该包含数据库和集合名称,documentKey字段应该包含受影响文档的_id,但它们都为空。operationType字段包含正确的操作类型。在另一个测试中,更新操作根本不会出现在更改流中。

它曾经按预期工作,但现在不行了。为什么会这样,我做错了什么?

代码

// ds是与discord的连接,用于在处理程序内部执行操作
func iterateChangeStream(stream *mongo.ChangeStream, ds *discordgo.Session, ctx context.Context, cancel context.CancelFunc) {
	defer stream.Close(ctx)
	defer cancel() // 用于优雅地崩溃

	for stream.Next(ctx) {
		var event bson.M
		err := stream.Decode(&event)
		if err != nil {
			log.Print(errors.Errorf("Failed to decode event: %w\n", err))
			return
		}

		rv := reflect.ValueOf(event["operationType"]) // 获取操作类型
		opType, ok := rv.Interface().(string)
		if !ok {
			log.Print("String expected in operationType\n")
			return
		}
        
        // 即使处理插入时,event["fullDocument"]也为空
        // models.Player是表示我正在监视的集合中的文档的结构体
		doc, ok := event["fullDocument"].(models.Player)
		if !ok {
			log.Print("Failed to convert document into Player type")
			return
		}
		handlerCtx := context.WithValue(ctx, "doc", doc)
        // handlerToEvent将操作类型映射到相应的处理程序
		go handlerToEvent[opType](ds, handlerCtx, cancel)
	}
}

func WatchEvents(ds *discordgo.Session, ctx context.Context, cancel context.CancelFunc) {

	pipeline := mongo.Pipeline{
		bson.D{{
			"$match",
			bson.D{{
				"$or", bson.A{
					bson.D{{"operationType", "insert"}}, // !!!
					bson.D{{"operationType", "delete"}},
					bson.D{{"operationType", "invalidate"}},
				},
			}},
		}},
	}
    // 在程序启动时初始化mongo实例,并存储在全局变量中
    opts := options.ChangeStream().SetFullDocument(options.UpdateLookup)
	stream, err := db.Instance.Collection.Watch(ctx, pipeline, opts)
	if err != nil {
		log.Panic(err)
	}
	defer stream.Close(ctx)

	iterateChangeStream(stream, ds, ctx, cancel)
}

我的问题可能与这个问题有关,只是它在插入时始终发生,而不是有时发生在更新时。
如果你知道如何启用上面链接中提到的更改流优化功能标志,请告诉我。

如果需要更多解释,请随时提问。

英文:

Mongo 4.4 and respective Golang driver are used. Database’s replica set is being run locally at localhost:27017, localhost:27020. I’ve also tried using Atlas’s sandbox cluster which gave me the same results.

According to Mongo's documentation when handling insertion of a new document fullDocument field of event data is supposed to contain newly inserted document which for some reason is not the case for me. ns field where database and collection name are supposed to be and documentKey where affected document _id is stored are empty as well. operationType field contains correct operation type. In another test it appeared that update operations do not appear in a change stream at all.

It used to work as it should but now it doesn't. Why does it happen and what am I doing wrong?

Code

// ds is the connection to discord, required for doing stuff inside handlers
func iterateChangeStream(stream *mongo.ChangeStream, ds *discordgo.Session, ctx context.Context, cancel context.CancelFunc) {
defer stream.Close(ctx)
defer cancel() // for graceful crashing
for stream.Next(ctx) {
var event bson.M
err := stream.Decode(&event)
if err != nil {
log.Print(errors.Errorf("Failed to decode event: %w\n", err))
return
}
rv := reflect.ValueOf(event["operationType"]) // getting operation type
opType, ok := rv.Interface().(string)
if !ok {
log.Print("String expected in operationType\n")
return
}
// event["fullDocument"] will be empty even when handling insertion
// models.Player is a struct representing a document of the collection
// I'm watching over
doc, ok := event["fullDocument"].(models.Player)
if !ok {
log.Print("Failed to convert document into Player type")
return
}
handlerCtx := context.WithValue(ctx, "doc", doc)
// handlerToEvent maps operationType to respective handler
go handlerToEvent[opType](ds, handlerCtx, cancel)
}
}
func WatchEvents(ds *discordgo.Session, ctx context.Context, cancel context.CancelFunc) {
pipeline := mongo.Pipeline{
bson.D{{
"$match",
bson.D{{
"$or", bson.A{
bson.D{{"operationType", "insert"}}, // !!!
bson.D{{"operationType", "delete"}},
bson.D{{"operationType", "invalidate"}},
},
}},
}},
}
// mongo instance is initialized on program startup and stored in a global variable
opts := options.ChangeStream().SetFullDocument(options.UpdateLookup)
stream, err := db.Instance.Collection.Watch(ctx, pipeline, opts)
if err != nil {
log.Panic(err)
}
defer stream.Close(ctx)
iterateChangeStream(stream, ds, ctx, cancel)
}

My issue might be related to this, except that it consistently occurs on insertion instead ocuring sometimes on updates.
If you know how to enable change stream optimization feature flag mentioned inside link above, let me know.

Feel free to ask for more clarifications.

答案1

得分: 2

问题的答案在这里

TLDR

你需要创建以下结构来将事件解组为:

type CSEvent struct {
	OperationType string        `bson:"operationType"`
	FullDocument  models.Player `bson:"fullDocument"`
}
var event CSEvent
err := stream.Decode(&event)

event将包含插入文档的副本。

英文:

The question was answered here.

TLDR

You need to create the following structure to unmarshal event into:

type CSEvent struct {
OperationType string        `bson:"operationType"`
FullDocument  models.Player `bson:"fullDocument"`
}
var event CSEvent
err := stream.Decode(&event)

event will contain a copy of the inserted document.

答案2

得分: 1

从我看到的这个链接中的示例事件中,我们可以看到fullDocument只存在于operationType: 'insert'的情况下。

 { 
_id: { _data: '825DE67A42000000072B022C0100296E5A10046BBC1C6A9CBB4B6E9CA9447925E693EF46645F696400645DE67A42113EA7DE6472E7680004' },
operationType: 'insert',
clusterTime: Timestamp { _bsontype: 'Timestamp', low_: 7, high_: 1575385666 },
fullDocument: { 
_id: 5de67a42113ea7de6472e768,
name: 'Sydney Harbour Home',
bedrooms: 4,
bathrooms: 2.5,
address: { market: 'Sydney', country: 'Australia' } },
ns: { db: 'sample_airbnb', coll: 'listingsAndReviews' },
documentKey: { _id: 5de67a42113ea7de6472e768 } 
}
{ 
_id: { _data: '825DE67A42000000082B022C0100296E5A10046BBC1C6A9CBB4B6E9CA9447925E693EF46645F696400645DE67A42113EA7DE6472E7680004' },
operationType: 'delete',
clusterTime: Timestamp { _bsontype: 'Timestamp', low_: 8, high_: 1575385666 },
ns: { db: 'sample_airbnb', coll: 'listingsAndReviews' },
documentKey: { _id: 5de67a42113ea7de6472e768 } 
}

所以我建议您:

  1. 将您的$match限制为insert
  2. 或者在operationType上添加if语句。
      if opType == "insert" {
        doc, ok := event["fullDocument"].(models.Player)
        if !ok {
            log.Print("Failed to convert document into Player type")
            return
        }
        handlerCtx := context.WithValue(ctx, "doc", doc)
        // handlerToEvent maps operationType to respective handler
        go handlerToEvent[opType](ds, handlerCtx, cancel)
        return
      }
  1. 或者确保您使用event["documentKey"]["_id"]中的文档ID获取文档,并调用playersCollection.findOne({_id: event["documentKey"]["_id"]})
英文:

From sample events that I see from this link we can see that fullDocument exists only on operationType: 'insert'.

 { 
_id: { _data: '825DE67A42000000072B022C0100296E5A10046BBC1C6A9CBB4B6E9CA9447925E693EF46645F696400645DE67A42113EA7DE6472E7680004' },
operationType: 'insert',
clusterTime: Timestamp { _bsontype: 'Timestamp', low_: 7, high_: 1575385666 },
fullDocument: { 
_id: 5de67a42113ea7de6472e768,
name: 'Sydney Harbour Home',
bedrooms: 4,
bathrooms: 2.5,
address: { market: 'Sydney', country: 'Australia' } },
ns: { db: 'sample_airbnb', coll: 'listingsAndReviews' },
documentKey: { _id: 5de67a42113ea7de6472e768 } 
}
{ 
_id: { _data: '825DE67A42000000082B022C0100296E5A10046BBC1C6A9CBB4B6E9CA9447925E693EF46645F696400645DE67A42113EA7DE6472E7680004' },
operationType: 'delete',
clusterTime: Timestamp { _bsontype: 'Timestamp', low_: 8, high_: 1575385666 },
ns: { db: 'sample_airbnb', coll: 'listingsAndReviews' },
documentKey: { _id: 5de67a42113ea7de6472e768 } 
}

So I recommend You

  1. to limit Your $match to insert
  2. or add if statement to operationType.
      if opType == "insert" {
        doc, ok := event["fullDocument"].(models.Player)
        if !ok {
            log.Print("Failed to convert document into Player type")
            return
        }
        handlerCtx := context.WithValue(ctx, "doc", doc)
        // handlerToEvent maps operationType to respective handler
        go handlerToEvent[opType](ds, handlerCtx, cancel)
        return
      }
  1. or make sure You're getting document using id of document from event["documentKey"]["_id"] and call playersCollection.findOne({_id: event["documentKey"]["_id"]})

huangapple
  • 本文由 发表于 2022年3月13日 19:03:06
  • 转载请务必保留本文链接:https://go.coder-hub.com/71456092.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定