回滚在使用Go语言的事务包装器时效果不佳。

huangapple go评论126阅读模式
英文:

Rollback does not work well with Go language transactional wrapper

问题

我最近开始学习Go语言。

我找到了一个用于数据库事务处理的包装器的Github实现,并决定尝试一下。

(源代码)https://github.com/oreilly-japan/practical-go-programming/blob/master/ch09/transaction/wrapper/main.go

我正在使用PostgreSQL作为数据库。

最初,数据库中包含以下数据。

  1. testdb=> select * from products;
  2. product_id | price
  3. ------------+-------
  4. 0001 | 200
  5. 0002 | 100
  6. 0003 | 150
  7. 0004 | 300
  8. (4 rows)

在A进程成功后,故意使B进程失败,并期望回滚A的事务。然而,当我们运行它时,回滚没有发生,结果如下所示:

事实上,由于B失败,进程A应该被回滚,数据库值不应发生任何更改。

我在代码中插入了日志以确认这一点,但我不确定为什么回滚没有执行?

请告诉我如何使用这个包装器来正确处理事务。

PS. 没有包装器的以下代码可以正常工作。

  1. package main
  2. import (
  3. "context"
  4. "database/sql"
  5. "log"
  6. _ "github.com/jackc/pgx/v4/stdlib"
  7. )
  8. type Service struct {
  9. db *sql.DB
  10. }
  11. func (s *Service) UpdateProduct(ctx context.Context, productID string) (err error) {
  12. tx, err := s.db.Begin()
  13. if err != nil {
  14. return err
  15. }
  16. defer tx.Rollback()
  17. if _, err = tx.ExecContext(ctx, "UPDATE products SET price = 200 WHERE product_id = $1", productID); err != nil {
  18. log.Println("update err")
  19. return err
  20. }
  21. if _, err = tx.ExecContext(ctx, "...", productID); err != nil {
  22. log.Println("update err")
  23. return err
  24. }
  25. return tx.Commit()
  26. }
  27. func main() {
  28. var database Service
  29. dbConn, err := sql.Open("pgx", "host=localhost port=5432 user=testuser dbname=testdb password=passs sslmode=disable")
  30. if nil != err {
  31. log.Fatal(err)
  32. }
  33. database.db = dbConn
  34. ctx := context.Background()
  35. database.UpdateProduct(ctx, "0004")
  36. }
英文:

I have recently started learning Go.

I found the following Github implementation of a wrapper for database transaction processing and decided to try it out.

(source) https://github.com/oreilly-japan/practical-go-programming/blob/master/ch09/transaction/wrapper/main.go

I am using PostgreSQL as the database.

Initially, it contains the following data.

  1. testdb=> select * from products;
  2. product_id | price
  3. ------------+-------
  4. 0001 | 200
  5. 0002 | 100
  6. 0003 | 150
  7. 0004 | 300
  8. (4 rows)

After Process A succeeds, Process B is intentionally made to fail, and a rollback of transaction A is expected. However, when we run it, the rollback does not occur and we end up with the following

In truth, since B failed, the process A should be rolled back and there should be no change in the database value.

I have inserted Logs in places to confirm this, but I am not sure. Why is the rollback not executed?

  1. package main
  2. import (
  3. "context"
  4. "database/sql"
  5. "fmt"
  6. "log"
  7. _ "github.com/jackc/pgx/v4/stdlib"
  8. )
  9. // transaction-wrapper-start
  10. type txAdmin struct {
  11. *sql.DB
  12. }
  13. type Service struct {
  14. tx txAdmin
  15. }
  16. func (t *txAdmin) Transaction(ctx context.Context, f func(ctx context.Context) (err error)) error {
  17. log.Printf("transaction")
  18. tx, err := t.BeginTx(ctx, nil)
  19. if err != nil {
  20. return err
  21. }
  22. defer tx.Rollback()
  23. if err := f(ctx); err != nil {
  24. log.Printf("transaction err")
  25. return fmt.Errorf("transaction query failed: %w", err)
  26. }
  27. log.Printf("commit")
  28. return tx.Commit()
  29. }
  30. func (s *Service) UpdateProduct(ctx context.Context, productID string) error {
  31. updateFunc := func(ctx context.Context) error {
  32. log.Printf("first process")
  33. // Process A
  34. if _, err := s.tx.ExecContext(ctx, "UPDATE products SET price = 200 WHERE product_id = $1", productID); err != nil {
  35. log.Printf("first err")
  36. return err
  37. }
  38. log.Printf("second process")
  39. // Process B(They are intentionally failing.)
  40. if _, err := s.tx.ExecContext(ctx, "...", productID); err != nil {
  41. log.Printf("second err")
  42. return err
  43. }
  44. return nil
  45. }
  46. log.Printf("update")
  47. return s.tx.Transaction(ctx, updateFunc)
  48. }
  49. // transaction-wrapper-end
  50. func main() {
  51. data, err := sql.Open("pgx", "host=localhost port=5432 user=testuser dbname=testdb password=password sslmode=disable")
  52. if nil != err {
  53. log.Fatal(err)
  54. }
  55. database := Service {tx: txAdmin{data}}
  56. ctx := context.Background()
  57. database.UpdateProduct(ctx, "0004")
  58. }

output

  1. 2022/05/26 13:28:55 update
  2. 2022/05/26 13:28:55 transaction
  3. 2022/05/26 13:28:55 first process
  4. 2022/05/26 13:28:55 second process
  5. 2022/05/26 13:28:55 second err
  6. 2022/05/26 13:28:55 transaction err

database changes(If the rollback works, the PRICE for id 0004 should remain 300.)

  1. testdb=> select * from products;
  2. product_id | price
  3. ------------+-------
  4. 0001 | 200
  5. 0002 | 100
  6. 0003 | 150
  7. 0004 | 200
  8. (4 rows)

Please tell me how I can use the wrapper to correctly process transactions.

=========
PS.
The following code without the wrapper worked properly.

  1. package main
  2. import (
  3. "context"
  4. "database/sql"
  5. "log"
  6. _ "github.com/jackc/pgx/v4/stdlib"
  7. )
  8. // transaction-defer-start
  9. type Service struct {
  10. db *sql.DB
  11. }
  12. func (s *Service) UpdateProduct(ctx context.Context, productID string) (err error) {
  13. tx, err := s.db.Begin()
  14. if err != nil {
  15. return err
  16. }
  17. defer tx.Rollback()
  18. if _, err = tx.ExecContext(ctx, "UPDATE products SET price = 200 WHERE product_id = $1", productID); err != nil {
  19. log.Println("update err")
  20. return err
  21. }
  22. if _, err = tx.ExecContext(ctx, "...", productID); err != nil {
  23. log.Println("update err")
  24. return err
  25. }
  26. return tx.Commit()
  27. }
  28. // transaction-defer-end
  29. func main() {
  30. var database Service
  31. dbConn, err := sql.Open("pgx", "host=localhost port=5432 user=testuser dbname=testdb password=passs sslmode=disable")
  32. if nil != err {
  33. log.Fatal(err)
  34. }
  35. database.db = dbConn
  36. ctx := context.Background()
  37. database.UpdateProduct(ctx, "0004")
  38. }

答案1

得分: 0

根据@Richard Huxton的说法,将tx传递给函数f

以下是步骤:

  1. struct txAdmin上添加一个字段以容纳*sql.Tx,因此txAdmin具有DBTx字段。
  2. Transaction中将tx设置为*txAdmin.Tx
  3. UpdateProduct中,对每个查询使用*Service.tx.Tx

因此,最终的代码如下所示:

  1. package main
  2. import (
  3. "context"
  4. "database/sql"
  5. "fmt"
  6. "log"
  7. _ "github.com/jackc/pgx/v4/stdlib"
  8. )
  9. // transaction-wrapper-start
  10. type txAdmin struct {
  11. *sql.DB
  12. *sql.Tx
  13. }
  14. type Service struct {
  15. tx txAdmin
  16. }
  17. func (t *txAdmin) Transaction(ctx context.Context, f func(ctx context.Context) (err error)) error {
  18. log.Printf("transaction")
  19. tx, err := t.DB.BeginTx(ctx, nil)
  20. if err != nil {
  21. return err
  22. }
  23. // 将tx设置为Tx
  24. t.Tx = tx
  25. defer tx.Rollback()
  26. if err := f(ctx); err != nil {
  27. log.Printf("transaction err")
  28. return fmt.Errorf("transaction query failed: %w", err)
  29. }
  30. log.Printf("commit")
  31. return tx.Commit()
  32. }
  33. func (s *Service) UpdateProduct(ctx context.Context, productID string) error {
  34. updateFunc := func(ctx context.Context) error {
  35. log.Printf("first process")
  36. // Process A
  37. if _, err := s.tx.Tx.ExecContext(ctx, "UPDATE products SET price = 200 WHERE product_id = $1", productID); err != nil {
  38. log.Printf("first err")
  39. return err
  40. }
  41. log.Printf("second process")
  42. // Process B(They are intentionally failing.)
  43. if _, err := s.tx.Tx.ExecContext(ctx, "...", productID); err != nil {
  44. log.Printf("second err")
  45. return err
  46. }
  47. return nil
  48. }
  49. log.Printf("update")
  50. return s.tx.Transaction(ctx, updateFunc)
  51. }
  52. // transaction-wrapper-end
  53. func main() {
  54. data, err := sql.Open("pgx", "host=localhost port=5432 user=testuser dbname=testdb password=password sslmode=disable")
  55. if nil != err {
  56. log.Fatal(err)
  57. }
  58. database := Service{tx: txAdmin{DB: data}}
  59. ctx := context.Background()
  60. database.UpdateProduct(ctx, "0004")
  61. }
英文:

As @Richard Huxton said, pass tx into a function f

here are the steps:

  1. add one field on struct txAdmin to accommodate *sql.Tx, so txAdmin have DB and Tx fields
  2. inside Transaction set tx to *txAdmin.Tx
  3. inside UpdateProduct use *Service.tx.Tx for every query

so the final code looks like this:

  1. package main
  2. import (
  3. "context"
  4. "database/sql"
  5. "fmt"
  6. "log"
  7. _ "github.com/jackc/pgx/v4/stdlib"
  8. )
  9. // transaction-wrapper-start
  10. type txAdmin struct {
  11. *sql.DB
  12. *sql.Tx
  13. }
  14. type Service struct {
  15. tx txAdmin
  16. }
  17. func (t *txAdmin) Transaction(ctx context.Context, f func(ctx context.Context) (err error)) error {
  18. log.Printf("transaction")
  19. tx, err := t.DB.BeginTx(ctx, nil)
  20. if err != nil {
  21. return err
  22. }
  23. // set tx to Tx
  24. t.Tx = tx
  25. defer tx.Rollback()
  26. if err := f(ctx); err != nil {
  27. log.Printf("transaction err")
  28. return fmt.Errorf("transaction query failed: %w", err)
  29. }
  30. log.Printf("commit")
  31. return tx.Commit()
  32. }
  33. func (s *Service) UpdateProduct(ctx context.Context, productID string) error {
  34. updateFunc := func(ctx context.Context) error {
  35. log.Printf("first process")
  36. // Process A
  37. if _, err := s.tx.Tx.ExecContext(ctx, "UPDATE products SET price = 200 WHERE product_id = $1", productID); err != nil {
  38. log.Printf("first err")
  39. return err
  40. }
  41. log.Printf("second process")
  42. // Process B(They are intentionally failing.)
  43. if _, err := s.tx.Tx.ExecContext(ctx, "...", productID); err != nil {
  44. log.Printf("second err")
  45. return err
  46. }
  47. return nil
  48. }
  49. log.Printf("update")
  50. return s.tx.Transaction(ctx, updateFunc)
  51. }
  52. // transaction-wrapper-end
  53. func main() {
  54. data, err := sql.Open("pgx", "host=localhost port=5432 user=testuser dbname=testdb password=password sslmode=disable")
  55. if nil != err {
  56. log.Fatal(err)
  57. }
  58. database := Service{tx: txAdmin{DB: data}}
  59. ctx := context.Background()
  60. database.UpdateProduct(ctx, "0004")
  61. }

huangapple
  • 本文由 发表于 2022年5月26日 12:47:05
  • 转载请务必保留本文链接:https://go.coder-hub.com/72386821.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定