英文:
Rollback does not work well with Go language transactional wrapper
问题
我最近开始学习Go语言。
我找到了一个用于数据库事务处理的包装器的Github实现,并决定尝试一下。
(源代码)https://github.com/oreilly-japan/practical-go-programming/blob/master/ch09/transaction/wrapper/main.go
我正在使用PostgreSQL作为数据库。
最初,数据库中包含以下数据。
testdb=> select * from products;
product_id | price
------------+-------
0001 | 200
0002 | 100
0003 | 150
0004 | 300
(4 rows)
在A进程成功后,故意使B进程失败,并期望回滚A的事务。然而,当我们运行它时,回滚没有发生,结果如下所示:
事实上,由于B失败,进程A应该被回滚,数据库值不应发生任何更改。
我在代码中插入了日志以确认这一点,但我不确定为什么回滚没有执行?
请告诉我如何使用这个包装器来正确处理事务。
PS. 没有包装器的以下代码可以正常工作。
package main
import (
"context"
"database/sql"
"log"
_ "github.com/jackc/pgx/v4/stdlib"
)
type Service struct {
db *sql.DB
}
func (s *Service) UpdateProduct(ctx context.Context, productID string) (err error) {
tx, err := s.db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
if _, err = tx.ExecContext(ctx, "UPDATE products SET price = 200 WHERE product_id = $1", productID); err != nil {
log.Println("update err")
return err
}
if _, err = tx.ExecContext(ctx, "...", productID); err != nil {
log.Println("update err")
return err
}
return tx.Commit()
}
func main() {
var database Service
dbConn, err := sql.Open("pgx", "host=localhost port=5432 user=testuser dbname=testdb password=passs sslmode=disable")
if nil != err {
log.Fatal(err)
}
database.db = dbConn
ctx := context.Background()
database.UpdateProduct(ctx, "0004")
}
英文:
I have recently started learning Go.
I found the following Github implementation of a wrapper for database transaction processing and decided to try it out.
I am using PostgreSQL as the database.
Initially, it contains the following data.
testdb=> select * from products;
product_id | price
------------+-------
0001 | 200
0002 | 100
0003 | 150
0004 | 300
(4 rows)
After Process A succeeds, Process B is intentionally made to fail, and a rollback of transaction A is expected. However, when we run it, the rollback does not occur and we end up with the following
In truth, since B failed, the process A should be rolled back and there should be no change in the database value.
I have inserted Logs in places to confirm this, but I am not sure. Why is the rollback not executed?
package main
import (
"context"
"database/sql"
"fmt"
"log"
_ "github.com/jackc/pgx/v4/stdlib"
)
// transaction-wrapper-start
type txAdmin struct {
*sql.DB
}
type Service struct {
tx txAdmin
}
func (t *txAdmin) Transaction(ctx context.Context, f func(ctx context.Context) (err error)) error {
log.Printf("transaction")
tx, err := t.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
if err := f(ctx); err != nil {
log.Printf("transaction err")
return fmt.Errorf("transaction query failed: %w", err)
}
log.Printf("commit")
return tx.Commit()
}
func (s *Service) UpdateProduct(ctx context.Context, productID string) error {
updateFunc := func(ctx context.Context) error {
log.Printf("first process")
// Process A
if _, err := s.tx.ExecContext(ctx, "UPDATE products SET price = 200 WHERE product_id = $1", productID); err != nil {
log.Printf("first err")
return err
}
log.Printf("second process")
// Process B(They are intentionally failing.)
if _, err := s.tx.ExecContext(ctx, "...", productID); err != nil {
log.Printf("second err")
return err
}
return nil
}
log.Printf("update")
return s.tx.Transaction(ctx, updateFunc)
}
// transaction-wrapper-end
func main() {
data, err := sql.Open("pgx", "host=localhost port=5432 user=testuser dbname=testdb password=password sslmode=disable")
if nil != err {
log.Fatal(err)
}
database := Service {tx: txAdmin{data}}
ctx := context.Background()
database.UpdateProduct(ctx, "0004")
}
output
2022/05/26 13:28:55 update
2022/05/26 13:28:55 transaction
2022/05/26 13:28:55 first process
2022/05/26 13:28:55 second process
2022/05/26 13:28:55 second err
2022/05/26 13:28:55 transaction err
database changes(If the rollback works, the PRICE for id 0004 should remain 300.)
testdb=> select * from products;
product_id | price
------------+-------
0001 | 200
0002 | 100
0003 | 150
0004 | 200
(4 rows)
Please tell me how I can use the wrapper to correctly process transactions.
=========
PS.
The following code without the wrapper worked properly.
package main
import (
"context"
"database/sql"
"log"
_ "github.com/jackc/pgx/v4/stdlib"
)
// transaction-defer-start
type Service struct {
db *sql.DB
}
func (s *Service) UpdateProduct(ctx context.Context, productID string) (err error) {
tx, err := s.db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
if _, err = tx.ExecContext(ctx, "UPDATE products SET price = 200 WHERE product_id = $1", productID); err != nil {
log.Println("update err")
return err
}
if _, err = tx.ExecContext(ctx, "...", productID); err != nil {
log.Println("update err")
return err
}
return tx.Commit()
}
// transaction-defer-end
func main() {
var database Service
dbConn, err := sql.Open("pgx", "host=localhost port=5432 user=testuser dbname=testdb password=passs sslmode=disable")
if nil != err {
log.Fatal(err)
}
database.db = dbConn
ctx := context.Background()
database.UpdateProduct(ctx, "0004")
}
答案1
得分: 0
根据@Richard Huxton的说法,将tx
传递给函数f
。
以下是步骤:
- 在
struct txAdmin
上添加一个字段以容纳*sql.Tx
,因此txAdmin
具有DB
和Tx
字段。 - 在
Transaction
中将tx
设置为*txAdmin.Tx
。 - 在
UpdateProduct
中,对每个查询使用*Service.tx.Tx
。
因此,最终的代码如下所示:
package main
import (
"context"
"database/sql"
"fmt"
"log"
_ "github.com/jackc/pgx/v4/stdlib"
)
// transaction-wrapper-start
type txAdmin struct {
*sql.DB
*sql.Tx
}
type Service struct {
tx txAdmin
}
func (t *txAdmin) Transaction(ctx context.Context, f func(ctx context.Context) (err error)) error {
log.Printf("transaction")
tx, err := t.DB.BeginTx(ctx, nil)
if err != nil {
return err
}
// 将tx设置为Tx
t.Tx = tx
defer tx.Rollback()
if err := f(ctx); err != nil {
log.Printf("transaction err")
return fmt.Errorf("transaction query failed: %w", err)
}
log.Printf("commit")
return tx.Commit()
}
func (s *Service) UpdateProduct(ctx context.Context, productID string) error {
updateFunc := func(ctx context.Context) error {
log.Printf("first process")
// Process A
if _, err := s.tx.Tx.ExecContext(ctx, "UPDATE products SET price = 200 WHERE product_id = $1", productID); err != nil {
log.Printf("first err")
return err
}
log.Printf("second process")
// Process B(They are intentionally failing.)
if _, err := s.tx.Tx.ExecContext(ctx, "...", productID); err != nil {
log.Printf("second err")
return err
}
return nil
}
log.Printf("update")
return s.tx.Transaction(ctx, updateFunc)
}
// transaction-wrapper-end
func main() {
data, err := sql.Open("pgx", "host=localhost port=5432 user=testuser dbname=testdb password=password sslmode=disable")
if nil != err {
log.Fatal(err)
}
database := Service{tx: txAdmin{DB: data}}
ctx := context.Background()
database.UpdateProduct(ctx, "0004")
}
英文:
As @Richard Huxton said, pass tx
into a function f
here are the steps:
- add one field on
struct txAdmin
to accommodate*sql.Tx
, sotxAdmin
haveDB
andTx
fields - inside
Transaction
settx
to*txAdmin.Tx
- inside
UpdateProduct
use*Service.tx.Tx
for every query
so the final code looks like this:
package main
import (
"context"
"database/sql"
"fmt"
"log"
_ "github.com/jackc/pgx/v4/stdlib"
)
// transaction-wrapper-start
type txAdmin struct {
*sql.DB
*sql.Tx
}
type Service struct {
tx txAdmin
}
func (t *txAdmin) Transaction(ctx context.Context, f func(ctx context.Context) (err error)) error {
log.Printf("transaction")
tx, err := t.DB.BeginTx(ctx, nil)
if err != nil {
return err
}
// set tx to Tx
t.Tx = tx
defer tx.Rollback()
if err := f(ctx); err != nil {
log.Printf("transaction err")
return fmt.Errorf("transaction query failed: %w", err)
}
log.Printf("commit")
return tx.Commit()
}
func (s *Service) UpdateProduct(ctx context.Context, productID string) error {
updateFunc := func(ctx context.Context) error {
log.Printf("first process")
// Process A
if _, err := s.tx.Tx.ExecContext(ctx, "UPDATE products SET price = 200 WHERE product_id = $1", productID); err != nil {
log.Printf("first err")
return err
}
log.Printf("second process")
// Process B(They are intentionally failing.)
if _, err := s.tx.Tx.ExecContext(ctx, "...", productID); err != nil {
log.Printf("second err")
return err
}
return nil
}
log.Printf("update")
return s.tx.Transaction(ctx, updateFunc)
}
// transaction-wrapper-end
func main() {
data, err := sql.Open("pgx", "host=localhost port=5432 user=testuser dbname=testdb password=password sslmode=disable")
if nil != err {
log.Fatal(err)
}
database := Service{tx: txAdmin{DB: data}}
ctx := context.Background()
database.UpdateProduct(ctx, "0004")
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论