英文:
Using PostgreSQL COPY FROM STDIN
问题
可以使用PostgreSQL的COPY FROM STDIN
语句通过传递某种类型的Reader
或Writer
对象来从CSV文件加载数据,就像在Java中那样。您可以使用pgjdbc-ng
库来实现这个功能。以下是一个Kotlin示例:
import com.impossibl.postgres.api.jdbc.PGConnection
import com.impossibl.postgres.api.jdbc.PGCopyInputStream
import com.impossibl.postgres.api.jdbc.PGCopyOutputStream
val conn = DriverManager.getConnection("jdbc:pgsql://localhost:5432/mydatabase", "username", "password")
val pgConn = conn.unwrap(PGConnection::class.java)
val inputStream = FileInputStream("path/to/csv/file.csv")
val outputStream = PGCopyOutputStream(pgConn, "COPY my_table FROM STDIN FORMAT csv")
val buffer = ByteArray(8192)
var bytesRead = inputStream.read(buffer)
while (bytesRead != -1) {
outputStream.write(buffer, 0, bytesRead)
bytesRead = inputStream.read(buffer)
}
outputStream.close()
inputStream.close()
请注意,您需要将pgjdbc-ng
库添加到您的项目依赖中。
英文:
Is it possible to use PostgreSQL's COPY FROM STDIN
statement to load data from CSV file by passing some sort of Reader
or Writer
object the same way it's done in Java? What library should I use? Kotlin example for reference:
val cm = CopyManager(conn as BaseConnection)
val total = cm.copyIn("COPY my_table FROM STDIN FORMAT csv", inputStream)
答案1
得分: 2
看着两个最受欢迎的Golang postgres库:
lib/pq
该驱动程序通过标准库方法支持此操作,但您需要使用io.Reader
变量来传递每个CSV行。
package main
import (
"bufio"
"context"
"database/sql"
"fmt"
"io"
"log"
"os"
"os/signal"
"github.com/lib/pq"
)
func main() {
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()
if err := run(ctx); err != nil {
log.Fatal(err)
}
}
func run(ctx context.Context) error {
// 打开CSV文件(也可以是os.Stdin等)
f, err := os.Open("/tmp/csv")
if err != nil {
return err
}
defer f.Close()
// 使用pq驱动程序打开数据库连接
db, err := sql.Open("postgres", "postgres://postgres@localhost:5432/postgres?sslmode=disable")
if err != nil {
return err
}
defer db.Close()
// 执行复制操作
if err = copyFrom(ctx, db, "my_table", f); err != nil {
return err
}
return nil
}
func copyFrom(ctx context.Context, db *sql.DB, table string, r io.Reader) error {
query := fmt.Sprintf("COPY %s FROM STDIN WITH (FORMAT csv)", pq.QuoteIdentifier(table))
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
stmt, err := tx.PrepareContext(ctx, query)
if err != nil {
return err
}
sc := bufio.NewScanner(r)
for sc.Scan() {
if _, err = stmt.ExecContext(ctx, sc.Text()); err != nil {
return err
}
}
if err = sc.Err(); err != nil {
return err
}
if _, err = stmt.ExecContext(ctx); err != nil {
return err
}
return tx.Commit()
}
注意:
jackc/pgx
较低级别的pgconn.PgConn
类型具有CopyFrom
方法,允许您传递任意语句和io.Reader
。如果您通过stdlib的db
包进行连接,仍然可以访问底层的pgconn.PgConn
,如下所示。但是,在使用pgx时,还有其他处理连接/池等的方法,因此值得一看。
package main
import (
"context"
"database/sql"
"fmt"
"io"
"log"
"os"
"os/signal"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/stdlib"
)
func main() {
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()
if err := run(ctx); err != nil {
log.Fatal(err)
}
}
func run(ctx context.Context) error {
// 打开CSV文件(也可以是os.Stdin等)
f, err := os.Open("/tmp/csv")
if err != nil {
return err
}
defer f.Close()
// 使用pgx驱动程序打开数据库连接
db, err := sql.Open("pgx", "postgres://postgres@localhost:5432/postgres?sslmode=disable")
if err != nil {
return err
}
defer db.Close()
// 执行复制操作
if err = copyFrom(ctx, db, "my_table", f); err != nil {
return err
}
return nil
}
func copyFrom(ctx context.Context, db *sql.DB, table string, r io.Reader) error {
query := fmt.Sprintf("COPY %s FROM STDIN WITH (FORMAT csv)", pgx.Identifier{table}.Sanitize())
conn, err := db.Conn(ctx)
if err != nil {
return err
}
defer conn.Close()
return conn.Raw(func(driverConn any) error {
pgConn := driverConn.(*stdlib.Conn).Conn().PgConn()
_, err := pgConn.CopyFrom(ctx, r, query)
return err
})
}
英文:
Looking at the two most popular Golang postgres libraries:
lib/pq
The driver supports this via the standard library methods, but you'll need to consume the io.Reader
variable to pass each CSV row through.
package main
import (
"bufio"
"context"
"database/sql"
"fmt"
"io"
"log"
"os"
"os/signal"
"github.com/lib/pq"
)
func main() {
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()
if err := run(ctx); err != nil {
log.Fatal(err)
}
}
func run(ctx context.Context) error {
// Open CSV file (this could also be os.Stdin, etc)
f, err := os.Open("/tmp/csv")
if err != nil {
return err
}
defer f.Close()
// Open database connection using pq driver
db, err := sql.Open("postgres", "postgres://postgres@localhost:5432/postgres?sslmode=disable")
if err != nil {
return err
}
defer db.Close()
// Execute copy
if err = copyFrom(ctx, db, "my_table", f); err != nil {
return err
}
return nil
}
func copyFrom(ctx context.Context, db *sql.DB, table string, r io.Reader) error {
query := fmt.Sprintf("COPY %s FROM STDIN WITH (FORMAT csv)", pq.QuoteIdentifier(table))
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
stmt, err := tx.PrepareContext(ctx, query)
if err != nil {
return err
}
sc := bufio.NewScanner(r)
for sc.Scan() {
if _, err = stmt.ExecContext(ctx, sc.Text()); err != nil {
return err
}
}
if err = sc.Err(); err != nil {
return err
}
if _, err = stmt.ExecContext(ctx); err != nil {
return err
}
return tx.Commit()
}
Note
- lib/pq is in maintenance mode
- Numerous data races have been reported in the underlying code that handles
COPY
statements, with at least one still open
jackc/pgx
The lower level pgconn.PgConn
type has a CopyFrom
method that allows you to pass an arbitrary statement and io.Reader
. If you're connecting via the stdlib db
package, you can still get access to the underlying pgconn.PgConn
as shown below, although there are other ways of handling connections / pools, etc, when using pgx, so it's worth taking a look at those too.
package main
import (
"context"
"database/sql"
"fmt"
"io"
"log"
"os"
"os/signal"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/stdlib"
)
func main() {
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()
if err := run(ctx); err != nil {
log.Fatal(err)
}
}
func run(ctx context.Context) error {
// Open CSV file (this could also be os.Stdin, etc)
f, err := os.Open("/tmp/csv")
if err != nil {
return err
}
defer f.Close()
// Open database connection using pgx driver
db, err := sql.Open("pgx", "postgres://postgres@localhost:5432/postgres?sslmode=disable")
if err != nil {
return err
}
defer db.Close()
// Execute copy
if err = copyFrom(ctx, db, "my_table", f); err != nil {
return err
}
return nil
}
func copyFrom(ctx context.Context, db *sql.DB, table string, r io.Reader) error {
query := fmt.Sprintf("COPY %s FROM STDIN WITH (FORMAT csv)", pgx.Identifier{table}.Sanitize())
conn, err := db.Conn(ctx)
if err != nil {
return err
}
defer conn.Close()
return conn.Raw(func(driverConn any) error {
pgConn := driverConn.(*stdlib.Conn).Conn().PgConn()
_, err := pgConn.CopyFrom(ctx, r, query)
return err
})
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论