在同一事务中更新多个表格且使用不同的goroutine时出现Gorm错误。

huangapple go评论119阅读模式
英文:

Gorm error when updating multiple tables in the same transaction and different goroutines

问题

我有这段代码示例:

  1. err = transaction.WithTransaction(context.Background(), func(txCtx context.Context) error {
  2. errorGroup := &errgroup.Group{}
  3. errorGroup.Go(func() error {
  4. return s.addTotable1(txCtx, *model)
  5. })
  6. errorGroup.Go(func() error {
  7. return s.updateTable1(txCtx, *model)
  8. })
  9. errorGroup.Go(func() error {
  10. return s.updateTable2(txCtx, *model)
  11. })
  12. errorGroup.Go(func() error {
  13. return s.updateTable3(txCtx, *model)
  14. })
  15. errorGroup.Go(func() error {
  16. return s.updateTable4(txCtx, *model)
  17. })
  18. errorGroup.Go(func() error {
  19. return s.updateTable5(txCtx, *model)
  20. })
  21. if err := errorGroup.Wait(); err != nil {
  22. transactionError = err
  23. return err
  24. }
  25. }, func(trCtx context.Context) error {
  26. return transactionError
  27. })

WithTransaction 方法在这里定义:

  1. type txKey struct{}
  2. func injectTx(ctx context.Context, tx *gorm.DB) context.Context {
  3. return context.WithValue(ctx, txKey{}, tx)
  4. }
  5. func ExtractTx(ctx context.Context) *gorm.DB {
  6. if tx, ok := ctx.Value(txKey{}).(*gorm.DB); ok {
  7. return tx
  8. }
  9. return nil
  10. }
  11. func WithTransaction(ctx context.Context, txFunc func(ctx context.Context) error, trFunc func(ctx context.Context) error) error {
  12. gormConnect := postgresGorm.DbConnection
  13. if gormConnect == nil {
  14. return commonerrors.InternalServerError{
  15. ErrorResponse: commonerrors.ErrorResponse{
  16. Message: "error",
  17. },
  18. }
  19. }
  20. tx := gormConnect.Session(&gorm.Session{SkipDefaultTransaction: true}).Begin()
  21. if err := tx.Error; err != nil {
  22. return err
  23. }
  24. err := txFunc(injectTx(ctx, tx))
  25. if err != nil {
  26. tx.Rollback()
  27. tx = postgresGorm.DbConnection.Session(&gorm.Session{SkipDefaultTransaction: true}).Begin()
  28. err = trFunc(injectTx(ctx, tx))
  29. tx.Commit()
  30. return err
  31. }
  32. tx.Commit()
  33. return nil
  34. }

还有我们的 Gorm 配置与 PostgresDB:

  1. var (
  2. DbConnection *gorm.DB = nil
  3. )
  4. const (
  5. connectionFailedMsg = "postgres-gorm connection failed: %s"
  6. )
  7. // OpenConnection open postgres connection
  8. func OpenConnection() {
  9. // PostgreSQL Connection, uncomment to use.
  10. // connection string format: user=USER password=PASSWORD host=/cloudsql/PROJECT_ID:REGION_ID:INSTANCE_ID/[ dbname=DB_NAME]
  11. dbURI := fmt.Sprintf("host=%s%s port=%d user=%s "+
  12. "password=%s dbname=%s sslmode=disable",
  13. configs.PostgresqlGormConfigs.CloudSqlPrefix, configs.PostgresqlGormConfigs.Host,
  14. configs.PostgresqlGormConfigs.Port, configs.PostgresqlGormConfigs.User,
  15. configs.PostgresqlGormConfigs.Password, configs.PostgresqlGormConfigs.DbName)
  16. config := &gorm.Config{
  17. NamingStrategy: schema.NamingStrategy{
  18. TablePrefix: configs.PostgresqlGormConfigs.TableGormPrefix,
  19. SingularTable: true,
  20. }}
  21. var err error
  22. DbConnection, err = gorm.Open(postgres.Open(dbURI), config)
  23. if err != nil {
  24. panic(err)
  25. }
  26. sqlDB, err := DbConnection.DB()
  27. if err != nil {
  28. log.Errorf(connectionFailedMsg, err)
  29. panic(err)
  30. }
  31. if configs.PostgresqlGormConfigs.GormLoggin {
  32. DbConnection.Config.Logger = gormLogger.Default.LogMode(gormLogger.Info)
  33. }
  34. err = sqlDB.Ping()
  35. if err != nil {
  36. log.Errorf(connectionFailedMsg, err)
  37. } else {
  38. log.Info("postgres-gorm connection successfully established")
  39. }
  40. }

这是使用 Gorm 更新表的方法示例:

  1. func (o ServiceImpl) UpdateTable1(ctx context.Context, model *model) (*model, error) {
  2. tx := transaction.ExtractTx(ctx)
  3. injectedTransaction := true
  4. if tx == nil {
  5. tx = postgresGorm.DbConnection.Begin()
  6. injectedTransaction = false
  7. }
  8. //Result
  9. queryResult := tx.Save(&model)
  10. // Error
  11. if queryResult.Error != nil {
  12. if !injectedTransaction {
  13. tx.Rollback()
  14. }
  15. errResp := commonerrors.ErrorResponse{
  16. Code: "500",
  17. Message: "Error",
  18. }
  19. return nil, commonerrors.InternalServerError{ErrorResponse: errResp}
  20. }
  21. if !injectedTransaction {
  22. tx.Commit()
  23. }
  24. return shipDetail, nil
  25. }

我们的问题是,在使用此服务时,存在一些与多个 goroutine 相关的问题,我们随机地遇到以下错误:driver: bad connection,但这完全是随机的,第一次尝试总是成功的,之后会失败一次,然后再次成功...你明白我的意思。

我们已经尝试升级到最新版本的 gorm 和 gorm postgres 驱动程序,但没有任何改变。通过阅读 gorm 文档,我们正在使用的所有方法都应该是线程安全的,所以我现在有些困惑。如果我找到任何解决方法,我会更新帖子。谢谢。

英文:

I've got this sample of code

  1. err = transaction.WithTransaction(context.Background(), func(txCtx context.Context) error {
  2. errorGroup := &errgroup.Group{}
  3. errorGroup.Go(func() error {
  4. return s.addTotable1(txCtx, *model)
  5. })
  6. errorGroup.Go(func() error {
  7. return s.updateTable1(txCtx, *model)
  8. })
  9. errorGroup.Go(func() error {
  10. return s.updateTable2(txCtx, *model)
  11. })
  12. errorGroup.Go(func() error {
  13. return s.updateTable3(txCtx, *model)
  14. })
  15. errorGroup.Go(func() error {
  16. return s.updateTable4(txCtx, *model)
  17. })
  18. errorGroup.Go(func() error {
  19. return s.updateTable5(txCtx, *model)
  20. })
  21. if err := errorGroup.Wait(); err != nil {
  22. transactionError = err
  23. return err
  24. }
  25. }, func(trCtx context.Context) error {
  26. return transactionError
  27. })

WithTransaction method is defined here

  1. type txKey struct{}
  2. func injectTx(ctx context.Context, tx *gorm.DB) context.Context {
  3. return context.WithValue(ctx, txKey{}, tx)
  4. }
  5. func ExtractTx(ctx context.Context) *gorm.DB {
  6. if tx, ok := ctx.Value(txKey{}).(*gorm.DB); ok {
  7. return tx
  8. }
  9. return nil
  10. }
  1. func WithTransaction(ctx context.Context, txFunc func(ctx context.Context) error, trFunc func(ctx context.Context) error) error {
  2. gormConnect := postgresGorm.DbConnection
  3. if gormConnect == nil {
  4. return commonerrors.InternalServerError{
  5. ErrorResponse: commonerrors.ErrorResponse{
  6. Message: "error",
  7. },
  8. }
  9. }
  10. tx := gormConnect.Session(&gorm.Session{SkipDefaultTransaction: true}).Begin()
  11. if err := tx.Error; err != nil {
  12. return err
  13. }
  14. err := txFunc(injectTx(ctx, tx))
  15. if err != nil {
  16. tx.Rollback()
  17. tx = postgresGorm.DbConnection.Session(&gorm.Session{SkipDefaultTransaction: true}).Begin()
  18. err = trFunc(injectTx(ctx, tx))
  19. tx.Commit()
  20. return err
  21. }
  22. tx.Commit()
  23. return nil
  24. }

Also Gorm configuration with our PostgresDB

  1. var (
  2. DbConnection *gorm.DB = nil
  3. )
  4. const (
  5. connectionFailedMsg = "postgres-gorm connection failed: %s"
  6. )
  7. // OpenConnection open postgres connection
  8. func OpenConnection() {
  9. // PostgreSQL Connection, uncomment to use.
  10. // connection string format: user=USER password=PASSWORD host=/cloudsql/PROJECT_ID:REGION_ID:INSTANCE_ID/[ dbname=DB_NAME]
  11. dbURI := fmt.Sprintf("host=%s%s port=%d user=%s "+"password=%s dbname=%s sslmode=disable",
  12. configs.PostgresqlGormConfigs.CloudSqlPrefix, configs.PostgresqlGormConfigs.Host,
  13. configs.PostgresqlGormConfigs.Port, configs.PostgresqlGormConfigs.User,
  14. configs.PostgresqlGormConfigs.Password, configs.PostgresqlGormConfigs.DbName)
  15. config := &gorm.Config{
  16. NamingStrategy: schema.NamingStrategy{
  17. TablePrefix: configs.PostgresqlGormConfigs.TableGormPrefix,
  18. SingularTable: true,
  19. }}
  20. var err error
  21. DbConnection, err = gorm.Open(postgres.Open(dbURI), config)
  22. if err != nil {
  23. panic(err)
  24. }
  25. sqlDB, err := DbConnection.DB()
  26. if err != nil {
  27. log.Errorf(connectionFailedMsg, err)
  28. panic(err)
  29. }
  30. if configs.PostgresqlGormConfigs.GormLoggin {
  31. DbConnection.Config.Logger = gormLogger.Default.LogMode(gormLogger.Info)
  32. }
  33. err = sqlDB.Ping()
  34. if err != nil {
  35. log.Errorf(connectionFailedMsg, err)
  36. } else {
  37. log.Info("postgres-gorm connection successfully established")
  38. }
  39. }

And here an example of the method to update a table using Gorm

  1. func (o ServiceImpl) UpdateTable1(ctx context.Context, model *model) (*model, error) {
  2. tx := transaction.ExtractTx(ctx)
  3. injectedTransaction := true
  4. if tx == nil {
  5. tx = postgresGorm.DbConnection.Begin()
  6. injectedTransaction = false
  7. }
  8. //Result
  9. queryResult := tx.Save(&model)
  10. // Error
  11. if queryResult.Error != nil {
  12. if !injectedTransaction {
  13. tx.Rollback()
  14. }
  15. errResp := commonerrors.ErrorResponse{
  16. Code: "500",
  17. Message: "Error",
  18. }
  19. return nil, commonerrors.InternalServerError{ErrorResponse: errResp}
  20. }
  21. if !injectedTransaction {
  22. tx.Commit()
  23. }
  24. return shipDetail, nil
  25. }

Our problem is that something is going on with multiple goroutines and we are getting this error randomly when using this service: driver: bad connection But it is completly random, first try always is successful and after that it fails once, then success again... You get the idea.

We've tried upgrading to the latest version of gorm and gorm postgres driver but it didnt change a thing. Reading through gorm doc, all the methods we are using should be thread safe, so Im kinda stuck right now. If I find any fix to this I will update the post. Thanks.

答案1

得分: 1

我将为您翻译以下内容:

我在这里发布我们目前正在使用的答案,以防将来有人遇到这个问题。主要问题是在所有线程中重用相同的gorm.Session,所以我创建了一个通用的解决方法:

首先对WithTransaction函数进行轻微修改:

  1. func WithTransaction(ctx context.Context, txFunc func(ctx context.Context) error, trFunc func(ctx context.Context) error, nestedTx bool) error {
  2. gormConnect := postgresGorm.DbConnection
  3. if gormConnect == nil {
  4. return commonerrors.InternalServerError{
  5. ErrorResponse: commonerrors.ErrorResponse{
  6. Message: "Error intentando conseguir la conexión con bbdd",
  7. },
  8. }
  9. }
  10. tx := gormConnect.Session(&gorm.Session{SkipDefaultTransaction: true}).Begin()
  11. if err := tx.Error; err != nil {
  12. return err
  13. }
  14. err := txFunc(injectTx(ctx, tx))
  15. if err != nil {
  16. tx.Rollback()
  17. tx = postgresGorm.DbConnection.Session(&gorm.Session{SkipDefaultTransaction: true}).Begin()
  18. err = trFunc(injectTx(ctx, tx))
  19. tx.Commit()
  20. return err
  21. }
  22. if !nestedTx {
  23. tx.Commit()
  24. }
  25. return nil
  26. }

现在,如果我们有嵌套事务,它将不会提交。然后,我添加了以下函数来处理errorGroup并为每个Goroutine创建一个新的会话:

  1. func RoutineTransaction(errorGroup *errgroup.Group, transactions chan<- *gorm.DB, subroutineFuncTx func(txCtx context.Context) error) {
  2. errorGroup.Go(func() error {
  3. var routineTxErr error
  4. routineTxErr = WithTransaction(context.Background(), func(txCtx context.Context) error {
  5. transactions <- ExtractTx(txCtx)
  6. routineTxErr = subroutineFuncTx(txCtx)
  7. return routineTxErr
  8. }, func(trCtx context.Context) error {
  9. return routineTxErr
  10. }, true)
  11. return routineTxErr
  12. })
  13. }
  14. func RoutinesTransactionsCommit(transactions chan *gorm.DB) {
  15. close(transactions)
  16. for tx := range transactions {
  17. tx.Commit()
  18. }
  19. }
  20. func RoutinesTransactionsRollback(transactions chan *gorm.DB) {
  21. close(transactions)
  22. for tx := range transactions {
  23. tx.Rollback()
  24. }
  25. }

然后,您只需在需要的地方简单地使用它:

  1. errorGroup := &errgroup.Group{}
  2. transactions := make(chan *gorm.DB, 6)
  3. transaction.RoutineTransaction(errorGroup, transactions, func(txCtx context.Context) error {
  4. return s.addToTable1(txCtx, *model)
  5. })
  6. transaction.RoutineTransaction(errorGroup, transactions, func(txCtx context.Context) error {
  7. return s.updateTable1(txCtx, *model)
  8. })
  9. transaction.RoutineTransaction(errorGroup, transactions, func(txCtx context.Context) error {
  10. return s.updateTable2(txCtx, *model)
  11. })
  12. transaction.RoutineTransaction(errorGroup, transactions, func(txCtx context.Context) error {
  13. return s.updateTable3(txCtx, *model)
  14. })
  15. if err := errorGroup.Wait(); err != nil {
  16. transaction.RoutinesTransactionsRollback(transactions)
  17. transactionError = err
  18. return err
  19. }
  20. transaction.RoutinesTransactionsCommit(transactions)
英文:

Im posting the answer that is currently working for us in case someone enconters this problem in the future. The main problem was reusing the same gorm.Session for all the threads, so I've created a generic workarround:

First a slight modification to the WithTransaction function

  1. func WithTransaction(ctx context.Context, txFunc func(ctx context.Context) error, trFunc func(ctx context.Context) error, nestedTx bool) error {
  2. gormConnect := postgresGorm.DbConnection
  3. if gormConnect == nil {
  4. return commonerrors.InternalServerError{
  5. ErrorResponse: commonerrors.ErrorResponse{
  6. Message: &quot;Error intentando conseguir la conexi&#243;n con bbdd&quot;,
  7. },
  8. }
  9. }
  10. tx := gormConnect.Session(&amp;gorm.Session{SkipDefaultTransaction: true}).Begin()
  11. if err := tx.Error; err != nil {
  12. return err
  13. }
  14. err := txFunc(injectTx(ctx, tx))
  15. if err != nil {
  16. tx.Rollback()
  17. tx = postgresGorm.DbConnection.Session(&amp;gorm.Session{SkipDefaultTransaction: true}).Begin()
  18. err = trFunc(injectTx(ctx, tx))
  19. tx.Commit()
  20. return err
  21. }
  22. if !nestedTx {
  23. tx.Commit()
  24. }
  25. return nil
  26. }

Now if we have nested transactions it will not commit.
Then I've added this functions to handle an errorGroup and create a new session for each Goroutine:

  1. func RoutineTransaction(errorGroup *errgroup.Group, transactions chan&lt;- *gorm.DB, subroutineFuncTx func(txCtx context.Context) error) {
  2. errorGroup.Go(func() error {
  3. var routineTxErr error
  4. routineTxErr = WithTransaction(context.Background(), func(txCtx context.Context) error {
  5. transactions &lt;- ExtractTx(txCtx)
  6. routineTxErr = subroutineFuncTx(txCtx)
  7. return routineTxErr
  8. }, func(trCtx context.Context) error {
  9. return routineTxErr
  10. }, true)
  11. return routineTxErr
  12. })
  13. func RoutinesTransactionsCommit(transactions chan *gorm.DB) {
  14. close(transactions)
  15. for tx := range transactions {
  16. tx.Commit()
  17. }
  18. func RoutinesTransactionsRollback(transactions chan *gorm.DB) {
  19. close(transactions)
  20. for tx := range transactions {
  21. tx.Rollback()
  22. }
  23. }

And then you just simply use it like this wherever you need it:

  1. errorGroup := &amp;errgroup.Group{}
  2. transactions := make(chan *gorm.DB, 6)
  3. transaction.RoutineTransaction(errorGroup, transactions, func(txCtx context.Context) error {
  4. return s.addToTable1(txCtx, *model)
  5. })
  6. transaction.RoutineTransaction(errorGroup, transactions, func(txCtx context.Context) error {
  7. return s.updateTable1(txCtx, *model)
  8. })
  9. transaction.RoutineTransaction(errorGroup, transactions, func(txCtx context.Context) error {
  10. return s.updateTable2(txCtx, *model)
  11. })
  12. transaction.RoutineTransaction(errorGroup, transactions, func(txCtx context.Context) error {
  13. return s.updateTable3(txCtx, *model)
  14. })
  15. if err := errorGroup.Wait(); err != nil {
  16. transaction.RoutinesTransactionsRollback(transactions)
  17. transactionError = err
  18. return err
  19. }
  20. transaction.RoutinesTransactionsCommit(transactions)

huangapple
  • 本文由 发表于 2022年10月27日 04:10:08
  • 转载请务必保留本文链接:https://go.coder-hub.com/74213692.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定