使用PGX进行Golang中的事务处理

huangapple go评论74阅读模式
英文:

Transaction in Golang with PGX

问题

我目前正在创建一个小的Go应用程序。现在我正在处理数据库部分。我使用的库是这个:https://github.com/jackc/pgx

我遇到的问题是,每次我尝试执行数据库读取操作时,它告诉我我的'conn忙'。我阅读了关于使用pgxpool而不是单个连接的方法,但它仍然不起作用。我做错了什么?

func (postgre *PostgreClient) read(query string) (pgx.Row, error) {
	client, err := postgre.client.Acquire(context.TODO())
	transaction, err := client.BeginTx(context.TODO(), pgx.TxOptions{})
	if err != nil {
		return nil, err
	}
	defer transaction.Rollback(context.TODO())

	rows := transaction.QueryRow(context.TODO(), query)
	if err != nil {
		return nil, err
	}
	err = transaction.Commit(context.TODO())
	return rows, err
}

提前感谢。

英文:

<br>
I am currently in the process of creating a little Go App. Right now I am working on the DB part. The Library I use is this one: https://github.com/jackc/pgx <br>
The problem I have is that every time I try to execute the database read, it tells me that my 'conn is busy'. I read about using a pgxpool instead of a single connection, but it still does not work. What am I doing wrong?

func (postgre *PostgreClient) read(query string) (pgx.Row, error) {
	client, err := postgre.client.Acquire(context.TODO())
	transaction, err := client.BeginTx(context.TODO(), pgx.TxOptions{})
	if err != nil {
		return nil, err
	}
	defer transaction.Rollback(context.TODO())

	rows := transaction.QueryRow(context.TODO(), query)
	if err != nil {
		return nil, err
	}
	err = transaction.Commit(context.TODO())
	return rows, err
}

Thanks in advance.

答案1

得分: 5

在提交事务之前,你需要扫描行。

如果你希望事务的处理保持在函数内部,你可以在函数内部传递一个执行扫描的接口。

例如:

// 由 *sql.Row 和 *sql.Rows 实现
type Row interface {
	Scan(dst ...interface{}) error
}

// 由你的 "models" 实现
type RowScanner interface {
	ScanRow(r Row) error
}
type User struct {
	Id    int
	Email string
}

func (u *User) ScanRow(r Row) error {
	return r.Scan(
		&u.Id,
		&u.Email,
	)
}
func (postgre *PostgreClient) read(query string, rs RowScanner) (err error) {
	conn, err := postgre.client.Acquire(context.TODO())
	if err != nil {
		return err
	}
	defer conn.Release()
	
	tx, err := conn.BeginTx(context.TODO(), pgx.TxOptions{})
	if err != nil {
		return err
	}
	defer func() {
		if err != nil {
			tx.Rollback(context.TODO())
		} else {
			tx.Commit(context.TODO())
		}
	}()

	row := tx.QueryRow(context.TODO(), query)
	if err != nil {
		return nil, err
	}
	return rs.ScanRow(row) 
}
u := new(User)
if err := pg.read("select id, email from users limit 1", u); err != nil {
	panic(err)
}

用于扫描模型列表的代码如下:

type UserList []*User

func (ul *UserList) ScanRow(r Row) error {
    u := new(User)
    if err := u.ScanRow(r); err != nil {
        return err
    }

    *ul = append(*ul, u)
	return nil
}
func (postgre *PostgreClient) list(query string, rs RowScanner) (err error) {
	conn, err := postgre.client.Acquire(context.TODO())
	if err != nil {
		return err
	}
	defer conn.Release()
	
	tx, err := conn.BeginTx(context.TODO(), pgx.TxOptions{})
	if err != nil {
		return err
	}
	defer func() {
		if err != nil {
			tx.Rollback(context.TODO())
		} else {
			tx.Commit(context.TODO())
		}
	}()

	rows, err := tx.Query(context.TODO(), query)
	if err != nil {
		return err
	}
	defer rows.Close()
	
	for rows.Next() {
		if err := rs.ScanRow(rows); err != nil {
			return err
		}
	}
	return rows.Err()
}
ul := new(UserList)
if err := pg.list("select id, email from users", ul); err != nil {
	panic(err)
}
英文:

You have to scan the row before you commit the transaction.

If you want the handling of the transaction to remain within the function you can pass an interface that does the scanning also inside the function.

For example:

// implemented by *sql.Row &amp; *sql.Rows
type Row interface {
	Scan(dst ...interface{}) error
}

// implemented by your &quot;models&quot;
type RowScanner interface {
	ScanRow(r Row) error
}
type User struct {
	Id    int
	Email string
}

func (u *User) ScanRow(r Row) error {
	return r.Scan(
		&amp;u.Id,
		&amp;u.Email,
	)
}
func (postgre *PostgreClient) read(query string, rs RowScanner) (err error) {
	conn, err := postgre.client.Acquire(context.TODO())
	if err != nil {
		return err
	}
	defer conn.Release()
	
	tx, err := conn.BeginTx(context.TODO(), pgx.TxOptions{})
	if err != nil {
		return err
	}
	defer func() {
		if err != nil {
			tx.Rollback(context.TODO())
		} else {
			tx.Commit(context.TODO())
		}
	}()

	row := tx.QueryRow(context.TODO(), query)
	if err != nil {
		return nil, err
	}
	return rs.ScanRow(row) 
}
u := new(User)
if err := pg.read(&quot;select id, email from users limit 1&quot;, u); err != nil {
	panic(err)
}

For scanning a list of models:

type UserList []*User

func (ul *UserList) ScanRow(r Row) error {
    u := new(User)
    if err := u.ScanRow(r); err != nil {
        return err
    }

    *ul = append(*ul, u)
	return nil
}
func (postgre *PostgreClient) list(query string, rs RowScanner) (err error) {
	conn, err := postgre.client.Acquire(context.TODO())
	if err != nil {
		return err
	}
	defer conn.Release()
	
	tx, err := conn.BeginTx(context.TODO(), pgx.TxOptions{})
	if err != nil {
		return err
	}
	defer func() {
		if err != nil {
			tx.Rollback(context.TODO())
		} else {
			tx.Commit(context.TODO())
		}
	}()

	rows, err := tx.Query(context.TODO(), query)
	if err != nil {
		return err
	}
	defer rows.Close()
	
	for rows.Next() {
		if err := rs.ScanRow(rows); err != nil {
			return err
		}
	}
	return rows.Err()
}
ul := new(UserList)
if err := pg.list(&quot;select id, email from users&quot;, ul); err != nil {
	panic(err)
}

huangapple
  • 本文由 发表于 2021年10月1日 18:59:13
  • 转载请务必保留本文链接:https://go.coder-hub.com/69404758.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定