英文:
Mongodb aggregation changes not being persisted in go
问题
我正在运行一个聚合操作来删除过期的文档,但实际上这些更改并没有影响到数据库。查询忽略了已经过期的文档,所以每次查询运行后结果的数量应该会发生变化,但实际上并没有变化。
你可以看到,我正在实时记录第一个结果,并使用 Compass 对数据库进行检查,但更改实际上并没有被持久化。查询一遍又一遍地运行,返回的过期货运数量始终相同。
在上面的代码中,我初始化了一个 MongoDB 连接,并进行了一些其他的初始化操作。然后,我使用一个定时任务来定期检查货运是否过期。
我真的不明白出了什么问题。
英文:
I'm running an aggregation to remove stale documents but the changes don't actually affect the database. The query ignores already expired documents, so the number of results should change after each query runs, but it doesn't.
func CheckShipmentExpiryDates(c *mongo.Client) (int, error) {
numberOfExpiredShipments := 0
coll := c.Database(os.Getenv("DATABASE")).Collection("shipments")
update := bson.M{"$set": bson.M{"status": "EXPIRED", "updated_at": time.Now()}}
pipeline := []bson.M{
{"$lookup": bson.M{
"from": "shipment_quotes",
"let": bson.M{"shipmentID": "$_id"},
"pipeline": []bson.M{
{"$match": bson.M{"$expr": bson.M{"$and": []bson.M{{"$eq": []string{"$shipment_id", "$$shipmentID"}}, {"$eq": []string{"$status", "WON"}}}}}},
},
"as": "quotes",
}},
{"$match": bson.M{"expiration_date": bson.M{"$exists": true}}},
{"$match": bson.M{"$expr": bson.M{"$and": []bson.M{
{"$ne": []string{"$status", "EXPIRED"}},
{"$lt": []interface{}{"$expiration_date", time.Now()}},
{"$eq": []interface{}{bson.M{"$size": "$quotes"}, 0}},
{"expiration_date": bson.M{"$type": 9}},
}}}},
update,
}
err := c.UseSession(context.TODO(), func(sessionContext mongo.SessionContext) error {
if err := sessionContext.StartTransaction(); err != nil {
return err
}
cursor, err := coll.Aggregate(sessionContext, pipeline)
if err != nil {
_ = sessionContext.AbortTransaction(sessionContext)
return err
}
var shipments []bson.M
if err := cursor.All(sessionContext, &shipments); err != nil {
_ = sessionContext.AbortTransaction(sessionContext)
return err
}
fmt.Println("~First shipment's status", shipments[0]["shipment_unique_number"], shipments[0]["status"])
numberOfExpiredShipments = len(shipments)
fmt.Println(sessionContext.CommitTransaction(sessionContext))
return nil
})
return numberOfExpiredShipments, err
}
As you can see, I'm logging the first result and checking it against the database in real time, using compass, but the changes aren't actually being persisted. The query runs over and over again, returning the same number of expired shipments.
mc, mongoErr := connection.MongoInit()
if mongoErr != nil {
panic(mongoErr)
}
utils.InitDB(mc)
defer func() {
if err := mc.Disconnect(context.TODO()); err != nil {
panic(err)
}
}()
n := connection.NewNotificationCenter()
sseInit(mc, googleApi, n)
graphSchema, err := schema.InjectSchema(mutationInit(mc, googleApi), queryInit(mc, googleApi))
if err != nil {
panic(err)
}
restApiUseCase := mutationsRestApiInit(mc, googleApi)
connection.InjectGraphqlHandler(graphSchema, n, restApiUseCase)
initIncrementStartdate(mc)
initShipmentExpiredCron(mc)
func initShipmentExpiredCron(mg *mongo.Client) {
c := cron.New()
c.AddFunc("*/5 * * * *", func() {
expiredShipments, err := utils.CheckShipmentExpiryDates(mg)
if err != nil {
log.Println("CRON ERROR: An error occured while trying to check the expiry date for each shipment")
log.Println(err)
} else {
// Print how many shipments are expired
log.Println("CRON SUCCESS: The following number of shipments have expired: ", expiredShipments)
}
})
c.Start()
}
I really don't understand what's wrong with it.
答案1
得分: 0
rickhg12hs是正确的,我需要同时使用$merge
。奇怪的是,这使得聚合不返回任何内容,所以现在我不知道过期的货运数量,但这并不是真正必要的。这是最终的管道:
pipeline := []bson.M{{
"$lookup": bson.M{
"from": "shipment_quotes",
"let": bson.M{
"shipmentID": "$_id"
},
"pipeline": []bson.M{{
"$match": bson.M{
"$expr": bson.M{
"$and": []bson.M{{
"$eq": []string{
"$shipment_id", "$$shipmentID"
}}, {
"$eq": []string{
"$status", "WON"
}
}
}}
}
},
}},
"as": "quotes",
}}, {
"$match": bson.M{
"expiration_date": bson.M{
"$exists": true
}
}
}, {
"$match": bson.M{
"$expr": bson.M{
"$and": []bson.M{{
"$ne": []string{
"$status", "EXPIRED"
}
}, {
"$lt": []interface{}{
"$expiration_date", time.Now()
}
}, {
"$eq": []interface{}{
bson.M{
"$size": "$quotes"
}, 0
}
}, {
"expiration_date": bson.M{
"$type": 9
}
},
}
}
}},
update,
{
"$merge": bson.M{
"into": "shipments",
"on": "_id"
}
},
}
英文:
rickhg12hs was right, I needed to also use $merge
. Oddly, this makes the aggregation not return anything, so now I don't know the number of shipments that have expired, but that was not really necessary. This is the pipeline at the end
pipeline := []bson.M{{
"$lookup": bson.M{
"from": "shipment_quotes",
"let": bson.M{
"shipmentID": "$_id"
},
"pipeline": []bson.M{{
"$match": bson.M{
"$expr": bson.M{
"$and": []bson.M{{
"$eq": []string{
"$shipment_id", "$$shipmentID"
}}, {
"$eq": []string{
"$status", "WON"
}
}
}}
}
},
},
"as": "quotes",
}}, {
"$match": bson.M{
"expiration_date": bson.M{
"$exists": true
}
}
}, {
"$match": bson.M{
"$expr": bson.M{
"$and": []bson.M{{
"$ne": []string{
"$status", "EXPIRED"
}
}, {
"$lt": []interface{}{
"$expiration_date", time.Now()
}
}, {
"$eq": []interface{}{
bson.M{
"$size": "$quotes"
}, 0
}
}, {
"expiration_date": bson.M{
"$type": 9
}
},
}
}
}},
update,
{
"$merge": bson.M{
"into": "shipments",
"on": "_id"
}
},
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论