在同一事务中更新多个表格且使用不同的goroutine时出现Gorm错误。

huangapple go评论88阅读模式
英文:

Gorm error when updating multiple tables in the same transaction and different goroutines

问题

我有这段代码示例:

err = transaction.WithTransaction(context.Background(), func(txCtx context.Context) error {
  errorGroup := &errgroup.Group{}
  errorGroup.Go(func() error {
    return s.addTotable1(txCtx, *model)
  })
  errorGroup.Go(func() error {
    return s.updateTable1(txCtx, *model)
  })
  errorGroup.Go(func() error {
    return s.updateTable2(txCtx, *model)
  })
  errorGroup.Go(func() error {
    return s.updateTable3(txCtx, *model)
  })
  errorGroup.Go(func() error {
    return s.updateTable4(txCtx, *model)
  })
  errorGroup.Go(func() error {
    return s.updateTable5(txCtx, *model)
  })
  if err := errorGroup.Wait(); err != nil {
    transactionError = err
    return err
  }
}, func(trCtx context.Context) error {
  return transactionError
})

WithTransaction 方法在这里定义:

type txKey struct{}

func injectTx(ctx context.Context, tx *gorm.DB) context.Context {
   return context.WithValue(ctx, txKey{}, tx)
}

func ExtractTx(ctx context.Context) *gorm.DB {
   if tx, ok := ctx.Value(txKey{}).(*gorm.DB); ok {
      return tx
   }
   return nil
}

func WithTransaction(ctx context.Context, txFunc func(ctx context.Context) error, trFunc func(ctx context.Context) error) error {
   gormConnect := postgresGorm.DbConnection

   if gormConnect == nil {
      return commonerrors.InternalServerError{
         ErrorResponse: commonerrors.ErrorResponse{
            Message: "error",
         },
      }
   }

   tx := gormConnect.Session(&gorm.Session{SkipDefaultTransaction: true}).Begin()

   if err := tx.Error; err != nil {
      return err
   }

   err := txFunc(injectTx(ctx, tx))

   if err != nil {
      tx.Rollback()
      tx = postgresGorm.DbConnection.Session(&gorm.Session{SkipDefaultTransaction: true}).Begin()
      err = trFunc(injectTx(ctx, tx))
      tx.Commit()
      return err
   }

   tx.Commit()

   return nil
}

还有我们的 Gorm 配置与 PostgresDB:

var (
   DbConnection *gorm.DB = nil
)

const (
   connectionFailedMsg = "postgres-gorm connection failed: %s"
)

// OpenConnection open postgres connection
func OpenConnection() {

   // PostgreSQL Connection, uncomment to use.
   // connection string format: user=USER password=PASSWORD host=/cloudsql/PROJECT_ID:REGION_ID:INSTANCE_ID/[ dbname=DB_NAME]
   dbURI := fmt.Sprintf("host=%s%s port=%d user=%s "+
      "password=%s dbname=%s sslmode=disable",
      configs.PostgresqlGormConfigs.CloudSqlPrefix, configs.PostgresqlGormConfigs.Host,
      configs.PostgresqlGormConfigs.Port, configs.PostgresqlGormConfigs.User,
      configs.PostgresqlGormConfigs.Password, configs.PostgresqlGormConfigs.DbName)
   config := &gorm.Config{
      NamingStrategy: schema.NamingStrategy{
         TablePrefix:   configs.PostgresqlGormConfigs.TableGormPrefix,
         SingularTable: true,
      }}

   var err error

   DbConnection, err = gorm.Open(postgres.Open(dbURI), config)

   if err != nil {
      panic(err)
   }

   sqlDB, err := DbConnection.DB()

   if err != nil {
      log.Errorf(connectionFailedMsg, err)
      panic(err)
   }

   if configs.PostgresqlGormConfigs.GormLoggin {
      DbConnection.Config.Logger = gormLogger.Default.LogMode(gormLogger.Info)
   }

   err = sqlDB.Ping()

   if err != nil {
      log.Errorf(connectionFailedMsg, err)
   } else {
      log.Info("postgres-gorm connection successfully established")
   }
}

这是使用 Gorm 更新表的方法示例:

func (o ServiceImpl) UpdateTable1(ctx context.Context, model *model) (*model, error) {

   tx := transaction.ExtractTx(ctx)
   injectedTransaction := true

   if tx == nil {
      tx = postgresGorm.DbConnection.Begin()
      injectedTransaction = false
   }

   //Result
   queryResult := tx.Save(&model)

   // Error
   if queryResult.Error != nil {
      if !injectedTransaction {
         tx.Rollback()
      }
      errResp := commonerrors.ErrorResponse{
         Code:    "500",
         Message: "Error",
      }
      return nil, commonerrors.InternalServerError{ErrorResponse: errResp}
   }

   if !injectedTransaction {
      tx.Commit()
   }
   return shipDetail, nil
}

我们的问题是,在使用此服务时,存在一些与多个 goroutine 相关的问题,我们随机地遇到以下错误:driver: bad connection,但这完全是随机的,第一次尝试总是成功的,之后会失败一次,然后再次成功...你明白我的意思。

我们已经尝试升级到最新版本的 gorm 和 gorm postgres 驱动程序,但没有任何改变。通过阅读 gorm 文档,我们正在使用的所有方法都应该是线程安全的,所以我现在有些困惑。如果我找到任何解决方法,我会更新帖子。谢谢。

英文:

I've got this sample of code

err = transaction.WithTransaction(context.Background(), func(txCtx context.Context) error {
  errorGroup := &errgroup.Group{}
  errorGroup.Go(func() error {
    return s.addTotable1(txCtx, *model)
  })
  errorGroup.Go(func() error {
    return s.updateTable1(txCtx, *model)
  })
  errorGroup.Go(func() error {
    return s.updateTable2(txCtx, *model)
  })
  errorGroup.Go(func() error {
    return s.updateTable3(txCtx, *model)
  })
  errorGroup.Go(func() error {
    return s.updateTable4(txCtx, *model)
  })
  errorGroup.Go(func() error {
    return s.updateTable5(txCtx, *model)
  })
  if err := errorGroup.Wait(); err != nil {
    transactionError = err
    return err
  }
  }, func(trCtx context.Context) error {
    return transactionError
  })

WithTransaction method is defined here

type txKey struct{}

func injectTx(ctx context.Context, tx *gorm.DB) context.Context {
   return context.WithValue(ctx, txKey{}, tx)
}

func ExtractTx(ctx context.Context) *gorm.DB {
   if tx, ok := ctx.Value(txKey{}).(*gorm.DB); ok {
      return tx
   }
   return nil
}
func WithTransaction(ctx context.Context, txFunc func(ctx context.Context) error, trFunc func(ctx context.Context) error) error {
   gormConnect := postgresGorm.DbConnection

   if gormConnect == nil {
      return commonerrors.InternalServerError{
         ErrorResponse: commonerrors.ErrorResponse{
            Message: "error",
         },
      }
   }

   tx := gormConnect.Session(&gorm.Session{SkipDefaultTransaction: true}).Begin()

   if err := tx.Error; err != nil {
      return err
   }

   err := txFunc(injectTx(ctx, tx))

   if err != nil {
      tx.Rollback()
      tx = postgresGorm.DbConnection.Session(&gorm.Session{SkipDefaultTransaction: true}).Begin()
      err = trFunc(injectTx(ctx, tx))
      tx.Commit()
      return err
   }

   tx.Commit()

   return nil

}

Also Gorm configuration with our PostgresDB

var (
   DbConnection *gorm.DB = nil
)

const (
   connectionFailedMsg = "postgres-gorm connection failed: %s"
)

// OpenConnection open postgres connection
func OpenConnection() {

   // PostgreSQL Connection, uncomment to use.
   // connection string format: user=USER password=PASSWORD host=/cloudsql/PROJECT_ID:REGION_ID:INSTANCE_ID/[ dbname=DB_NAME]
   dbURI := fmt.Sprintf("host=%s%s port=%d user=%s "+"password=%s dbname=%s sslmode=disable",
      configs.PostgresqlGormConfigs.CloudSqlPrefix, configs.PostgresqlGormConfigs.Host,
      configs.PostgresqlGormConfigs.Port, configs.PostgresqlGormConfigs.User,
      configs.PostgresqlGormConfigs.Password, configs.PostgresqlGormConfigs.DbName)
   config := &gorm.Config{
      NamingStrategy: schema.NamingStrategy{
         TablePrefix:   configs.PostgresqlGormConfigs.TableGormPrefix,
         SingularTable: true,
      }}

   var err error

   DbConnection, err = gorm.Open(postgres.Open(dbURI), config)

   if err != nil {
      panic(err)
   }

   sqlDB, err := DbConnection.DB()

   if err != nil {
      log.Errorf(connectionFailedMsg, err)
      panic(err)
   }

   if configs.PostgresqlGormConfigs.GormLoggin {
      DbConnection.Config.Logger = gormLogger.Default.LogMode(gormLogger.Info)
   }

   err = sqlDB.Ping()

   if err != nil {
      log.Errorf(connectionFailedMsg, err)
   } else {
      log.Info("postgres-gorm connection successfully established")

   }
}

And here an example of the method to update a table using Gorm

func (o ServiceImpl) UpdateTable1(ctx context.Context, model *model) (*model, error) {

   tx := transaction.ExtractTx(ctx)
   injectedTransaction := true

   if tx == nil {
      tx = postgresGorm.DbConnection.Begin()
      injectedTransaction = false
   }

   //Result
   queryResult := tx.Save(&model)

   // Error
   if queryResult.Error != nil {
      if !injectedTransaction {
         tx.Rollback()
      }
      errResp := commonerrors.ErrorResponse{
         Code:    "500",
         Message: "Error",
      }
      return nil, commonerrors.InternalServerError{ErrorResponse: errResp}
   }

   if !injectedTransaction {
      tx.Commit()
   }
   return shipDetail, nil
}

Our problem is that something is going on with multiple goroutines and we are getting this error randomly when using this service: driver: bad connection But it is completly random, first try always is successful and after that it fails once, then success again... You get the idea.

We've tried upgrading to the latest version of gorm and gorm postgres driver but it didnt change a thing. Reading through gorm doc, all the methods we are using should be thread safe, so Im kinda stuck right now. If I find any fix to this I will update the post. Thanks.

答案1

得分: 1

我将为您翻译以下内容:

我在这里发布我们目前正在使用的答案,以防将来有人遇到这个问题。主要问题是在所有线程中重用相同的gorm.Session,所以我创建了一个通用的解决方法:

首先对WithTransaction函数进行轻微修改:

func WithTransaction(ctx context.Context, txFunc func(ctx context.Context) error, trFunc func(ctx context.Context) error, nestedTx bool) error {
    gormConnect := postgresGorm.DbConnection

    if gormConnect == nil {
        return commonerrors.InternalServerError{
            ErrorResponse: commonerrors.ErrorResponse{
                Message: "Error intentando conseguir la conexión con bbdd",
            },
        }
    }

    tx := gormConnect.Session(&gorm.Session{SkipDefaultTransaction: true}).Begin()

    if err := tx.Error; err != nil {
        return err
    }

    err := txFunc(injectTx(ctx, tx))

    if err != nil {
        tx.Rollback()
        tx = postgresGorm.DbConnection.Session(&gorm.Session{SkipDefaultTransaction: true}).Begin()
        err = trFunc(injectTx(ctx, tx))
        tx.Commit()
        return err
    }

    if !nestedTx {
        tx.Commit()
    }

    return nil
}

现在,如果我们有嵌套事务,它将不会提交。然后,我添加了以下函数来处理errorGroup并为每个Goroutine创建一个新的会话:

func RoutineTransaction(errorGroup *errgroup.Group, transactions chan<- *gorm.DB, subroutineFuncTx func(txCtx context.Context) error) {
    errorGroup.Go(func() error {
        var routineTxErr error
        routineTxErr = WithTransaction(context.Background(), func(txCtx context.Context) error {
            transactions <- ExtractTx(txCtx)
            routineTxErr = subroutineFuncTx(txCtx)
            return routineTxErr
        }, func(trCtx context.Context) error {
            return routineTxErr
        }, true)
        return routineTxErr
    })
}

func RoutinesTransactionsCommit(transactions chan *gorm.DB) {
    close(transactions)
    for tx := range transactions {
        tx.Commit()
    }
}

func RoutinesTransactionsRollback(transactions chan *gorm.DB) {
    close(transactions)
    for tx := range transactions {
        tx.Rollback()
    }
}

然后,您只需在需要的地方简单地使用它:

errorGroup := &errgroup.Group{}
transactions := make(chan *gorm.DB, 6)
transaction.RoutineTransaction(errorGroup, transactions, func(txCtx context.Context) error {
    return s.addToTable1(txCtx, *model)
})
transaction.RoutineTransaction(errorGroup, transactions, func(txCtx context.Context) error {
    return s.updateTable1(txCtx, *model)
})
transaction.RoutineTransaction(errorGroup, transactions, func(txCtx context.Context) error {
    return s.updateTable2(txCtx, *model)
})
transaction.RoutineTransaction(errorGroup, transactions, func(txCtx context.Context) error {
    return s.updateTable3(txCtx, *model)
})
if err := errorGroup.Wait(); err != nil {
    transaction.RoutinesTransactionsRollback(transactions)
    transactionError = err
    return err
}
transaction.RoutinesTransactionsCommit(transactions)
英文:

Im posting the answer that is currently working for us in case someone enconters this problem in the future. The main problem was reusing the same gorm.Session for all the threads, so I've created a generic workarround:

First a slight modification to the WithTransaction function

func WithTransaction(ctx context.Context, txFunc func(ctx context.Context) error, trFunc func(ctx context.Context) error, nestedTx bool) error {
gormConnect := postgresGorm.DbConnection

if gormConnect == nil {
	return commonerrors.InternalServerError{
		ErrorResponse: commonerrors.ErrorResponse{
			Message: &quot;Error intentando conseguir la conexi&#243;n con bbdd&quot;,
		},
	}
}

tx := gormConnect.Session(&amp;gorm.Session{SkipDefaultTransaction: true}).Begin()

if err := tx.Error; err != nil {
	return err
}

err := txFunc(injectTx(ctx, tx))

if err != nil {
	tx.Rollback()
	tx = postgresGorm.DbConnection.Session(&amp;gorm.Session{SkipDefaultTransaction: true}).Begin()
	err = trFunc(injectTx(ctx, tx))
	tx.Commit()
	return err
}

if !nestedTx {
	tx.Commit()
}

return nil
}

Now if we have nested transactions it will not commit.
Then I've added this functions to handle an errorGroup and create a new session for each Goroutine:

func RoutineTransaction(errorGroup *errgroup.Group, transactions chan&lt;- *gorm.DB, subroutineFuncTx func(txCtx context.Context) error) {
errorGroup.Go(func() error {
	var routineTxErr error
	routineTxErr = WithTransaction(context.Background(), func(txCtx context.Context) error {
		transactions &lt;- ExtractTx(txCtx)
		routineTxErr = subroutineFuncTx(txCtx)
		return routineTxErr
	}, func(trCtx context.Context) error {
		return routineTxErr
	}, true)
	return routineTxErr
})


func RoutinesTransactionsCommit(transactions chan *gorm.DB) {
close(transactions)
for tx := range transactions {
	tx.Commit()
}
func RoutinesTransactionsRollback(transactions chan *gorm.DB) {
close(transactions)
for tx := range transactions {
	tx.Rollback()
}
}

And then you just simply use it like this wherever you need it:

    errorGroup := &amp;errgroup.Group{}
	transactions := make(chan *gorm.DB, 6)
	transaction.RoutineTransaction(errorGroup, transactions, func(txCtx context.Context) error {
		return s.addToTable1(txCtx, *model)
	})
	transaction.RoutineTransaction(errorGroup, transactions, func(txCtx context.Context) error {
		return s.updateTable1(txCtx, *model)
	})
	transaction.RoutineTransaction(errorGroup, transactions, func(txCtx context.Context) error {
		return s.updateTable2(txCtx, *model)
	})
	transaction.RoutineTransaction(errorGroup, transactions, func(txCtx context.Context) error {
		return s.updateTable3(txCtx, *model)
	})
	if err := errorGroup.Wait(); err != nil {
		transaction.RoutinesTransactionsRollback(transactions)
		transactionError = err
		return err
	}
	transaction.RoutinesTransactionsCommit(transactions)

huangapple
  • 本文由 发表于 2022年10月27日 04:10:08
  • 转载请务必保留本文链接:https://go.coder-hub.com/74213692.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定