为什么 pgx 在提交时返回连接忙碌?

huangapple go评论94阅读模式
英文:

Why pgx returns connection busy on commit?

问题

我有一个函数用于批量向表中插入值。tx.Commit()返回conn busy。从阅读代码中我得知,conn.Begin()实际上使其忙碌。

所以问题是如何正确处理这个问题?我是否应该完全使用事务和批量查询?或者事务是在底层创建的?

// InsertItems向表中添加项目
func (r *Repository) InsertItems(ctx context.Context, values []service.Transaction) error {

	conn, err := r.pool.Acquire(ctx)
	if err != nil {
		return fmt.Errorf("获取连接时出错: %w", err)
	}
	defer conn.Release()

	tx, err := conn.Begin(ctx)
	if err != nil {
		return fmt.Errorf("开始pgx事务时出错: %w", err)
	}
	defer func() { _ = tx.Rollback(ctx) }()

	batch := pgx.Batch{}

	for _, v := range values {

		query := fmt.Sprintf(`INSERT INTO %v (id, date, amount) VALUES ($1, $2, $3)`, r.tableName)

		batch.Queue(query, v.ID, v.Date, v.Amount)
	}

	batchRes := tx.SendBatch(ctx, &batch)
	defer func() {
		if err := batchRes.Close(); err != nil {
			logger.Errorf("关闭批处理结果时出错: %v", err)
		}
	}()

	cmdTag, err := batchRes.Exec()
	if err != nil {
		return fmt.Errorf("批处理结果执行时出错: %w", err)
	}

	logger.Debugf("插入的行数: %d", cmdTag.RowsAffected())

	if err := tx.Commit(ctx); err != nil {
		return fmt.Errorf("提交pgx事务时出错: %w", err)
	}

	return nil
}
英文:

I have a function to insert values to the table in bulk. tx.Commit() returns conn busy. As I got from reading code is that conn.Begin() actually makes it busy.

So the question is how to do this correctly? Should I use transactions together with batch queries at all? Or transaction is created under the hood?

// InsertItems adds items to the table
func (r *Repository) InsertItems(ctx context.Context, values []service.Transaction) error {

	conn, err := r.pool.Acquire(ctx)
	if err != nil {
		return fmt.Errorf("acquire connection: %w", err)
	}
	defer conn.Release()

	tx, err := conn.Begin(ctx)
	if err != nil {
		return fmt.Errorf("starting pgx transaction: %w", err)
	}
	defer func() { _ = tx.Rollback(ctx) }()

	batch := pgx.Batch{}

	for _, v := range values {

		query := fmt.Sprintf(`INSERT INTO %v (id, date, amount) VALUES ($1, $2, $3)`, r.tableName)

		batch.Queue(query, v.ID, v.Date, v.Amount)
	}

	batchRes := tx.SendBatch(ctx, &batch)
	defer func() {
		if err := batchRes.Close(); err != nil {
			logger.Errorf("closing batch result: %v", err)
		}
	}()

	cmdTag, err := batchRes.Exec()
	if err != nil {
		return fmt.Errorf("batch res exec: %w", err)
	}

	logger.Debugf("inserted rows: %d", cmdTag.RowsAffected())

	if err := tx.Commit(ctx); err != nil {
		return fmt.Errorf("commiting pgx transaction: %w", err)
	}

	return nil
}

答案1

得分: 1

你在调用 commit 之前调用了 close,但是在调用 close 之前你需要先关闭批处理结果,然后才能再次使用底层连接。

为了确保延迟操作的正确顺序,你可以像下面这样做:

// InsertItems 向表中添加项目
func (r *Repository) InsertItems(ctx context.Context, values []service.Transaction) (err error) {
	conn, err := r.pool.Acquire(ctx)
	if err != nil {
		return fmt.Errorf("获取连接时出错: %w", err)
	}
	defer conn.Release()

	batch := new(pgx.Batch)
	for _, v := range values {
		query := fmt.Sprintf(`INSERT INTO %v (id, date, amount) VALUES ($1, $2, $3)`, r.tableName)
		_ = batch.Queue(query, v.ID, v.Date, v.Amount)
	}

	tx, err := conn.Begin(ctx)
	if err != nil {
		return fmt.Errorf("开始 pgx 事务时出错: %w", err)
	}	
	result := tx.SendBatch(ctx, batch)
	defer func() {
		if e := result.Close(); e != nil {
			logger.Errorf("关闭批处理结果时出错: %v", e)
			err = e
		}
		
		if err != nil {
			_ = tx.Rollback(ctx)
		} else {
			if e := tx.Commit(ctx); e != nil {
				err = e
			}
		}
	}()
	tag, err := result.Exec()
	if err != nil {
		return fmt.Errorf("执行批处理结果时出错: %w", err)
	}
	
	logger.Debugf("插入的行数: %d", tag.RowsAffected())
	return nil
}
英文:

You are calling commit before calling close on the batch result. You need to first close the batch result before you can use the underlying connection again.

To enforce the correct order of the deferred operations you could do something like the following:

// InsertItems adds items to the table
func (r *Repository) InsertItems(ctx context.Context, values []service.Transaction) (err error) {
	conn, err := r.pool.Acquire(ctx)
	if err != nil {
		return fmt.Errorf("acquire connection: %w", err)
	}
	defer conn.Release()

	batch := new(pgx.Batch)
	for _, v := range values {
		query := fmt.Sprintf(`INSERT INTO %v (id, date, amount) VALUES ($1, $2, $3)`, r.tableName)
		_ = batch.Queue(query, v.ID, v.Date, v.Amount)
	}

	tx, err := conn.Begin(ctx)
	if err != nil {
		return fmt.Errorf("starting pgx transaction: %w", err)
	}	
	result := tx.SendBatch(ctx, batch)
	defer func() {
		if e := result.Close(); e != nil {
			logger.Errorf("closing batch result: %v", e)
			err = e
		}
		
		if err != nil {
			_ = tx.Rollback(ctx)
		} else {
			if e := tx.Commit(ctx); e != nil {
				err = e
			}
		}
	}()
	tag, err := result.Exec()
	if err != nil {
		return fmt.Errorf("batch res exec: %w", err)
	}
	
	logger.Debugf("inserted rows: %d", tag.RowsAffected())
	return nil
}

huangapple
  • 本文由 发表于 2022年11月30日 22:26:38
  • 转载请务必保留本文链接:https://go.coder-hub.com/74629374.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定