回滚在使用Go语言的事务包装器时效果不佳。

huangapple go评论75阅读模式
英文:

Rollback does not work well with Go language transactional wrapper

问题

我最近开始学习Go语言。

我找到了一个用于数据库事务处理的包装器的Github实现,并决定尝试一下。

(源代码)https://github.com/oreilly-japan/practical-go-programming/blob/master/ch09/transaction/wrapper/main.go

我正在使用PostgreSQL作为数据库。

最初,数据库中包含以下数据。

testdb=> select * from products;
 product_id | price
------------+-------
 0001       |   200
 0002       |   100
 0003       |   150
 0004       |   300
(4 rows)

在A进程成功后,故意使B进程失败,并期望回滚A的事务。然而,当我们运行它时,回滚没有发生,结果如下所示:

事实上,由于B失败,进程A应该被回滚,数据库值不应发生任何更改。

我在代码中插入了日志以确认这一点,但我不确定为什么回滚没有执行?

请告诉我如何使用这个包装器来正确处理事务。

PS. 没有包装器的以下代码可以正常工作。

package main

import (
	"context"
	"database/sql"
	"log"

	_ "github.com/jackc/pgx/v4/stdlib"
)

type Service struct {
	db *sql.DB
}

func (s *Service) UpdateProduct(ctx context.Context, productID string) (err error) {
	tx, err := s.db.Begin()
	if err != nil {
		return err
	}
	defer tx.Rollback()

	if _, err = tx.ExecContext(ctx, "UPDATE products SET price = 200 WHERE product_id = $1", productID); err != nil {
		log.Println("update err")
		return err
	}

	if _, err = tx.ExecContext(ctx, "...", productID); err != nil {
		log.Println("update err")
		return err
	}

	return tx.Commit()
}

func main() {
	var database Service
	dbConn, err := sql.Open("pgx", "host=localhost port=5432 user=testuser dbname=testdb password=passs sslmode=disable")
	if nil != err {
		log.Fatal(err)
	}
	database.db = dbConn

	ctx := context.Background()

	database.UpdateProduct(ctx, "0004")
}
英文:

I have recently started learning Go.

I found the following Github implementation of a wrapper for database transaction processing and decided to try it out.

(source) https://github.com/oreilly-japan/practical-go-programming/blob/master/ch09/transaction/wrapper/main.go

I am using PostgreSQL as the database.

Initially, it contains the following data.

testdb=> select * from products;
 product_id | price
------------+-------
 0001       |   200
 0002       |   100
 0003       |   150
 0004       |   300
(4 rows)

After Process A succeeds, Process B is intentionally made to fail, and a rollback of transaction A is expected. However, when we run it, the rollback does not occur and we end up with the following

In truth, since B failed, the process A should be rolled back and there should be no change in the database value.

I have inserted Logs in places to confirm this, but I am not sure. Why is the rollback not executed?

package main

import (
	"context"
	"database/sql"
	"fmt"
	"log"

	_ "github.com/jackc/pgx/v4/stdlib"
)

// transaction-wrapper-start
type txAdmin struct {
	*sql.DB
}

type Service struct {
	tx txAdmin
}

func (t *txAdmin) Transaction(ctx context.Context, f func(ctx context.Context) (err error)) error {
	log.Printf("transaction")
	tx, err := t.BeginTx(ctx, nil)
	if err != nil {
		return err
	}
	defer tx.Rollback()
	if err := f(ctx); err != nil {
		log.Printf("transaction err")
		return fmt.Errorf("transaction query failed: %w", err)
	}

	log.Printf("commit")
	return tx.Commit()
}

func (s *Service) UpdateProduct(ctx context.Context, productID string) error {
	updateFunc := func(ctx context.Context) error {
		log.Printf("first process")
        // Process A
		if _, err := s.tx.ExecContext(ctx, "UPDATE products SET price = 200 WHERE product_id = $1", productID); err != nil {
			log.Printf("first err")
			return err
		}
		log.Printf("second process")

        // Process B(They are intentionally failing.)
		if _, err := s.tx.ExecContext(ctx, "...", productID); err != nil {
			log.Printf("second err")
			return err
		}
		return nil
	}
	log.Printf("update")
	return s.tx.Transaction(ctx, updateFunc)
}

// transaction-wrapper-end
func main() {

	data, err := sql.Open("pgx", "host=localhost port=5432 user=testuser dbname=testdb password=password sslmode=disable")
	if nil != err {
		log.Fatal(err)
	}

	database := Service {tx: txAdmin{data}}

	ctx := context.Background()

	database.UpdateProduct(ctx, "0004")
}

output

2022/05/26 13:28:55 update     
2022/05/26 13:28:55 transaction
2022/05/26 13:28:55 first process
2022/05/26 13:28:55 second process
2022/05/26 13:28:55 second err
2022/05/26 13:28:55 transaction err

database changes(If the rollback works, the PRICE for id 0004 should remain 300.)

testdb=> select * from products;
 product_id | price
------------+-------
 0001       |   200
 0002       |   100
 0003       |   150
 0004       |   200
(4 rows)

Please tell me how I can use the wrapper to correctly process transactions.

=========
PS.
The following code without the wrapper worked properly.

package main

import (
	"context"
	"database/sql"
	"log"

	_ "github.com/jackc/pgx/v4/stdlib"
)

// transaction-defer-start
type Service struct {
	db *sql.DB
}

func (s *Service) UpdateProduct(ctx context.Context, productID string) (err error) {
	tx, err := s.db.Begin()
	if err != nil {
		return err
	}
	defer tx.Rollback()

	if _, err = tx.ExecContext(ctx, "UPDATE products SET price = 200 WHERE product_id = $1", productID); err != nil {
		log.Println("update err")
		return err
	}

	if _, err = tx.ExecContext(ctx, "...", productID); err != nil {
		log.Println("update err")
		return err
	}

	return tx.Commit()
}

// transaction-defer-end
func main() {
	var database Service
	dbConn, err := sql.Open("pgx", "host=localhost port=5432 user=testuser dbname=testdb password=passs sslmode=disable")
	if nil != err {
		log.Fatal(err)
	}
	database.db = dbConn

	ctx := context.Background()

	database.UpdateProduct(ctx, "0004")
	
}

答案1

得分: 0

根据@Richard Huxton的说法,将tx传递给函数f

以下是步骤:

  1. struct txAdmin上添加一个字段以容纳*sql.Tx,因此txAdmin具有DBTx字段。
  2. Transaction中将tx设置为*txAdmin.Tx
  3. UpdateProduct中,对每个查询使用*Service.tx.Tx

因此,最终的代码如下所示:

package main

import (
	"context"
	"database/sql"
	"fmt"
	"log"

	_ "github.com/jackc/pgx/v4/stdlib"
)

// transaction-wrapper-start
type txAdmin struct {
	*sql.DB
	*sql.Tx
}

type Service struct {
	tx txAdmin
}

func (t *txAdmin) Transaction(ctx context.Context, f func(ctx context.Context) (err error)) error {
	log.Printf("transaction")
	tx, err := t.DB.BeginTx(ctx, nil)
	if err != nil {
		return err
	}

	// 将tx设置为Tx
	t.Tx = tx

	defer tx.Rollback()
	if err := f(ctx); err != nil {
		log.Printf("transaction err")
		return fmt.Errorf("transaction query failed: %w", err)
	}

	log.Printf("commit")
	return tx.Commit()
}

func (s *Service) UpdateProduct(ctx context.Context, productID string) error {
	updateFunc := func(ctx context.Context) error {
		log.Printf("first process")
		// Process A
		if _, err := s.tx.Tx.ExecContext(ctx, "UPDATE products SET price = 200 WHERE product_id = $1", productID); err != nil {
			log.Printf("first err")
			return err
		}
		log.Printf("second process")

		// Process B(They are intentionally failing.)
		if _, err := s.tx.Tx.ExecContext(ctx, "...", productID); err != nil {
			log.Printf("second err")
			return err
		}
		return nil
	}
	log.Printf("update")
	return s.tx.Transaction(ctx, updateFunc)
}

// transaction-wrapper-end
func main() {
	data, err := sql.Open("pgx", "host=localhost port=5432 user=testuser dbname=testdb password=password sslmode=disable")
	if nil != err {
		log.Fatal(err)
	}

	database := Service{tx: txAdmin{DB: data}}

	ctx := context.Background()

	database.UpdateProduct(ctx, "0004")
}
英文:

As @Richard Huxton said, pass tx into a function f

here are the steps:

  1. add one field on struct txAdmin to accommodate *sql.Tx, so txAdmin have DB and Tx fields
  2. inside Transaction set tx to *txAdmin.Tx
  3. inside UpdateProduct use *Service.tx.Tx for every query

so the final code looks like this:

package main

import (
	"context"
	"database/sql"
	"fmt"
	"log"

	_ "github.com/jackc/pgx/v4/stdlib"
)

// transaction-wrapper-start
type txAdmin struct {
	*sql.DB
	*sql.Tx
}

type Service struct {
	tx txAdmin
}

func (t *txAdmin) Transaction(ctx context.Context, f func(ctx context.Context) (err error)) error {
	log.Printf("transaction")
	tx, err := t.DB.BeginTx(ctx, nil)
	if err != nil {
		return err
	}

    // set tx to Tx
	t.Tx = tx

	defer tx.Rollback()
	if err := f(ctx); err != nil {
		log.Printf("transaction err")
		return fmt.Errorf("transaction query failed: %w", err)
	}

	log.Printf("commit")
	return tx.Commit()
}

func (s *Service) UpdateProduct(ctx context.Context, productID string) error {
	updateFunc := func(ctx context.Context) error {
		log.Printf("first process")
		// Process A
		if _, err := s.tx.Tx.ExecContext(ctx, "UPDATE products SET price = 200 WHERE product_id = $1", productID); err != nil {
			log.Printf("first err")
			return err
		}
		log.Printf("second process")

		// Process B(They are intentionally failing.)
		if _, err := s.tx.Tx.ExecContext(ctx, "...", productID); err != nil {
			log.Printf("second err")
			return err
		}
		return nil
	}
	log.Printf("update")
	return s.tx.Transaction(ctx, updateFunc)
}

// transaction-wrapper-end
func main() {
	data, err := sql.Open("pgx", "host=localhost port=5432 user=testuser dbname=testdb password=password sslmode=disable")
	if nil != err {
		log.Fatal(err)
	}

	database := Service{tx: txAdmin{DB: data}}

	ctx := context.Background()

	database.UpdateProduct(ctx, "0004")
}

huangapple
  • 本文由 发表于 2022年5月26日 12:47:05
  • 转载请务必保留本文链接:https://go.coder-hub.com/72386821.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定