使用PostgreSQL的COPY FROM STDIN功能。

huangapple go评论130阅读模式
英文:

Using PostgreSQL COPY FROM STDIN

问题

可以使用PostgreSQL的COPY FROM STDIN语句通过传递某种类型的ReaderWriter对象来从CSV文件加载数据,就像在Java中那样。您可以使用pgjdbc-ng库来实现这个功能。以下是一个Kotlin示例:

import com.impossibl.postgres.api.jdbc.PGConnection
import com.impossibl.postgres.api.jdbc.PGCopyInputStream
import com.impossibl.postgres.api.jdbc.PGCopyOutputStream

val conn = DriverManager.getConnection("jdbc:pgsql://localhost:5432/mydatabase", "username", "password")
val pgConn = conn.unwrap(PGConnection::class.java)

val inputStream = FileInputStream("path/to/csv/file.csv")
val outputStream = PGCopyOutputStream(pgConn, "COPY my_table FROM STDIN FORMAT csv")

val buffer = ByteArray(8192)
var bytesRead = inputStream.read(buffer)
while (bytesRead != -1) {
    outputStream.write(buffer, 0, bytesRead)
    bytesRead = inputStream.read(buffer)
}

outputStream.close()
inputStream.close()

请注意,您需要将pgjdbc-ng库添加到您的项目依赖中。

英文:

Is it possible to use PostgreSQL's COPY FROM STDIN statement to load data from CSV file by passing some sort of Reader or Writer object the same way it's done in Java? What library should I use? Kotlin example for reference:

val cm = CopyManager(conn as BaseConnection)
val total = cm.copyIn("COPY my_table FROM STDIN FORMAT csv", inputStream)

答案1

得分: 2

看着两个最受欢迎的Golang postgres库:

lib/pq

该驱动程序通过标准库方法支持此操作,但您需要使用io.Reader变量来传递每个CSV行。

package main

import (
	"bufio"
	"context"
	"database/sql"
	"fmt"
	"io"
	"log"
	"os"
	"os/signal"

	"github.com/lib/pq"
)

func main() {
	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
	defer cancel()

	if err := run(ctx); err != nil {
		log.Fatal(err)
	}
}

func run(ctx context.Context) error {
	// 打开CSV文件(也可以是os.Stdin等)
	f, err := os.Open("/tmp/csv")
	if err != nil {
		return err
	}
	defer f.Close()

	// 使用pq驱动程序打开数据库连接
	db, err := sql.Open("postgres", "postgres://postgres@localhost:5432/postgres?sslmode=disable")
	if err != nil {
		return err
	}
	defer db.Close()

	// 执行复制操作
	if err = copyFrom(ctx, db, "my_table", f); err != nil {
		return err
	}

	return nil
}

func copyFrom(ctx context.Context, db *sql.DB, table string, r io.Reader) error {
	query := fmt.Sprintf("COPY %s FROM STDIN WITH (FORMAT csv)", pq.QuoteIdentifier(table))

	tx, err := db.BeginTx(ctx, nil)
	if err != nil {
		return err
	}
	defer tx.Rollback()

	stmt, err := tx.PrepareContext(ctx, query)
	if err != nil {
		return err
	}

	sc := bufio.NewScanner(r)
	for sc.Scan() {
		if _, err = stmt.ExecContext(ctx, sc.Text()); err != nil {
			return err
		}
	}
	if err = sc.Err(); err != nil {
		return err
	}

	if _, err = stmt.ExecContext(ctx); err != nil {
		return err
	}

	return tx.Commit()
}

注意:

  1. lib/pq处于维护模式
  2. 已经报告了处理COPY语句的底层代码中的多个数据竞争问题,至少还有一个问题尚未解决

jackc/pgx

较低级别的pgconn.PgConn类型具有CopyFrom方法,允许您传递任意语句和io.Reader。如果您通过stdlib的db包进行连接,仍然可以访问底层的pgconn.PgConn,如下所示。但是,在使用pgx时,还有其他处理连接/池等的方法,因此值得一看。

package main

import (
	"context"
	"database/sql"
	"fmt"
	"io"
	"log"
	"os"
	"os/signal"

	"github.com/jackc/pgx/v5"
	"github.com/jackc/pgx/v5/stdlib"
)

func main() {
	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
	defer cancel()

	if err := run(ctx); err != nil {
		log.Fatal(err)
	}
}

func run(ctx context.Context) error {
	// 打开CSV文件(也可以是os.Stdin等)
	f, err := os.Open("/tmp/csv")
	if err != nil {
		return err
	}
	defer f.Close()

	// 使用pgx驱动程序打开数据库连接
	db, err := sql.Open("pgx", "postgres://postgres@localhost:5432/postgres?sslmode=disable")
	if err != nil {
		return err
	}
	defer db.Close()

	// 执行复制操作
	if err = copyFrom(ctx, db, "my_table", f); err != nil {
		return err
	}

	return nil
}

func copyFrom(ctx context.Context, db *sql.DB, table string, r io.Reader) error {
	query := fmt.Sprintf("COPY %s FROM STDIN WITH (FORMAT csv)", pgx.Identifier{table}.Sanitize())

	conn, err := db.Conn(ctx)
	if err != nil {
		return err
	}
	defer conn.Close()

	return conn.Raw(func(driverConn any) error {
		pgConn := driverConn.(*stdlib.Conn).Conn().PgConn()
		_, err := pgConn.CopyFrom(ctx, r, query)
		return err
	})
}
英文:

Looking at the two most popular Golang postgres libraries:

lib/pq

The driver supports this via the standard library methods, but you'll need to consume the io.Reader variable to pass each CSV row through.

package main
import (
"bufio"
"context"
"database/sql"
"fmt"
"io"
"log"
"os"
"os/signal"
"github.com/lib/pq"
)
func main() {
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()
if err := run(ctx); err != nil {
log.Fatal(err)
}
}
func run(ctx context.Context) error {
// Open CSV file (this could also be os.Stdin, etc)
f, err := os.Open("/tmp/csv")
if err != nil {
return err
}
defer f.Close()
// Open database connection using pq driver
db, err := sql.Open("postgres", "postgres://postgres@localhost:5432/postgres?sslmode=disable")
if err != nil {
return err
}
defer db.Close()
// Execute copy
if err = copyFrom(ctx, db, "my_table", f); err != nil {
return err
}
return nil
}
func copyFrom(ctx context.Context, db *sql.DB, table string, r io.Reader) error {
query := fmt.Sprintf("COPY %s FROM STDIN WITH (FORMAT csv)", pq.QuoteIdentifier(table))
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
stmt, err := tx.PrepareContext(ctx, query)
if err != nil {
return err
}
sc := bufio.NewScanner(r)
for sc.Scan() {
if _, err = stmt.ExecContext(ctx, sc.Text()); err != nil {
return err
}
}
if err = sc.Err(); err != nil {
return err
}
if _, err = stmt.ExecContext(ctx); err != nil {
return err
}
return tx.Commit()
}

Note

  1. lib/pq is in maintenance mode
  2. Numerous data races have been reported in the underlying code that handles COPY statements, with at least one still open

jackc/pgx

The lower level pgconn.PgConn type has a CopyFrom method that allows you to pass an arbitrary statement and io.Reader. If you're connecting via the stdlib db package, you can still get access to the underlying pgconn.PgConn as shown below, although there are other ways of handling connections / pools, etc, when using pgx, so it's worth taking a look at those too.

package main
import (
"context"
"database/sql"
"fmt"
"io"
"log"
"os"
"os/signal"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/stdlib"
)
func main() {
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()
if err := run(ctx); err != nil {
log.Fatal(err)
}
}
func run(ctx context.Context) error {
// Open CSV file (this could also be os.Stdin, etc)
f, err := os.Open("/tmp/csv")
if err != nil {
return err
}
defer f.Close()
// Open database connection using pgx driver
db, err := sql.Open("pgx", "postgres://postgres@localhost:5432/postgres?sslmode=disable")
if err != nil {
return err
}
defer db.Close()
// Execute copy
if err = copyFrom(ctx, db, "my_table", f); err != nil {
return err
}
return nil
}
func copyFrom(ctx context.Context, db *sql.DB, table string, r io.Reader) error {
query := fmt.Sprintf("COPY %s FROM STDIN WITH (FORMAT csv)", pgx.Identifier{table}.Sanitize())
conn, err := db.Conn(ctx)
if err != nil {
return err
}
defer conn.Close()
return conn.Raw(func(driverConn any) error {
pgConn := driverConn.(*stdlib.Conn).Conn().PgConn()
_, err := pgConn.CopyFrom(ctx, r, query)
return err
})
}

huangapple
  • 本文由 发表于 2022年11月23日 06:07:24
  • 转载请务必保留本文链接:https://go.coder-hub.com/74539520.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定