如何从标准输入导入行到PostgreSQL?

huangapple go评论81阅读模式
英文:

How do I import rows to Postgresql from STDIN?

问题

在Python中,我有以下代码可以将行批量加载到PostgreSQL中,而无需使用文件:

import csv
import subprocess

mylist, keys = [{'name': 'fred'}, {'name': 'mary'}], ['name']
p = subprocess.Popen(['psql', 'mydb', '-U', 'openupitsme', '-h', 'my.ip.address', '--no-password', '-c',
    '\COPY tester(%s) FROM STDIN (FORMAT CSV)' % ','.join(keys),
    '--set=ON_ERROR_STOP=false'
    ], stdin=subprocess.PIPE
)
for d in mylist:
    dict_writer = csv.DictWriter(p.stdin, keys, quoting=csv.QUOTE_MINIMAL)
    dict_writer.writerow(d)
p.stdin.close()

我正在尝试在Go中实现相同的功能。我目前是将行写入文件,然后导入它们,然后删除该文件。我希望像在Python中那样从STDIN导入行。我有以下代码:

package main

import (
	"database/sql"
	"log"
	"os"
	"os/exec"

	_ "github.com/lib/pq"
)

var (
	err error
	db  *sql.DB
)

func main() {
	var err error
	fh := "/path/to/my/file.txt"
	f, err := os.Create(fh)
	if err != nil {
		panic(err)
	}
	defer f.Close()
	defer os.Remove(fh)
	rows := []string{"fred", "mary"}
	for _, n := range rows {
		_, err = f.WriteString(n + "\n")
		if err != nil {
			panic(err)
		}
	}
	// dump to postgresql
	c := exec.Command("psql", "mydb", "-U", "openupitsme", "-h", "my.ip.address", "--no-password",
		"-c", `\COPY tester(customer) FROM `+fh)
	if out, err := c.CombinedOutput(); err != nil {
		log.Println(string(out), err)
	}
}

稍微修改一下,但是这不会插入记录:

keys := []string{"link", "domain"}
records := [][]string{
	{"first_name", "last_name"},
	{"Rob", "Pike"},
	{"Ken", "Thompson"},
	{"Robert", "Griesemer"},
}

cmd := exec.Command("psql")
stdin, err := cmd.StdinPipe()
if err != nil {
	log.Println(err)
}
stdout, err := cmd.StdoutPipe()
if err != nil {
	log.Println(err)
}
if err := cmd.Start(); err != nil {
	log.Println(err)
}
go func() {
	_, err = io.WriteString(stdin, "search -U meyo -h 1.2.3.4 -p 1111 --no-password -c ")
	if err != nil {
		log.Println(err)
	}
	_, err := io.WriteString(stdin, fmt.Sprintf("COPY links(%s) FROM STDIN (FORMAT CSV)", strings.Join(keys, ",")))
	if err != nil {
		log.Println(err)
	}
	w := csv.NewWriter(stdin)
	if err := w.WriteAll(records); err != nil {
		log.Fatalln("error writing record to csv:", err)
	}
	w.Flush()
	if err := w.Error(); err != nil {
		log.Fatal(err)
	}
	if err != nil {
		log.Println(err)
	}
	stdin.Close()
}()

done := make(chan bool)
go func() {
	_, err := io.Copy(os.Stdout, stdout)
	if err != nil {
		log.Fatal(err)
	}
	stdout.Close()
	done <- true
}()
<-done

if err := cmd.Wait(); err != nil {
	log.Println(err, cmd.Args, stdout)
}

没有插入记录,并且我得到了一个不太有用的错误:

exit status 2
英文:

In Python I have the following that will bulk-load rows to Postgresql without using a file:

import csv
import subprocess
mylist, keys = [{&#39;name&#39;: &#39;fred&#39;}, {&#39;name&#39;: &#39;mary&#39;}], [&#39;name&#39;]
p = subprocess.Popen([&#39;psql&#39;, &#39;mydb&#39;, &#39;-U&#39;, &#39;openupitsme&#39;, &#39;-h&#39;, &#39;my.ip.address&#39;, &#39;--no-password&#39;, &#39;-c&#39;,
&#39;\COPY tester(%s) FROM STDIN (FORMAT CSV)&#39; % &#39;, &#39;.join(keys),
&#39;--set=ON_ERROR_STOP=false&#39;
], stdin=subprocess.PIPE
)
for d in mylist:
dict_writer = csv.DictWriter(p.stdin, keys, quoting=csv.QUOTE_MINIMAL)
dict_writer.writerow(d)
p.stdin.close()

I am trying to accomplish the same in Go. I am currently writing the rows to a file then importing them and then deleting that file. I'd like to import the rows from STDIN like I do in Python. I have:

package main
import (
&quot;database/sql&quot;
&quot;log&quot;
&quot;os&quot;
&quot;os/exec&quot;
_ &quot;github.com/lib/pq&quot;
)
var (
err error
db  *sql.DB
)
func main() {
var err error
fh := &quot;/path/to/my/file.txt&quot;
f, err := os.Create(fh)
if err != nil {
panic(err)
}
defer f.Close()
defer os.Remove(fh)
rows := []string{&quot;fred&quot;, &quot;mary&quot;}
for _, n := range rows {
_, err = f.WriteString(n + &quot;\n&quot;)
if err != nil {
panic(err)
}
}
// dump to postgresql
c := exec.Command(&quot;psql&quot;, &quot;mydb&quot;, &quot;-U&quot;, &quot;openupitsme&quot;, &quot;-h&quot;, &quot;my.ip.address&quot;, &quot;--no-password&quot;,
&quot;-c&quot;, `\COPY tester(customer) FROM `+fh)
if out, err := c.CombinedOutput(); err != nil {
log.Println(string(out), err)
}
}

EDIT:
A bit further along but this is not inserting records:

    keys := []string{&quot;link&quot;, &quot;domain&quot;}
records := [][]string{
{&quot;first_name&quot;, &quot;last_name&quot;},
{&quot;Rob&quot;, &quot;Pike&quot;},
{&quot;Ken&quot;, &quot;Thompson&quot;},
{&quot;Robert&quot;, &quot;Griesemer&quot;},
}
cmd := exec.Command(&quot;psql&quot;)
stdin, err := cmd.StdinPipe()
if err != nil {
log.Println(err)
}
stdout, err := cmd.StdoutPipe()
if err != nil {
log.Println(err)
}
if err := cmd.Start(); err != nil {
log.Println(err)
}
go func() {
_, err = io.WriteString(stdin, &quot;search -U meyo -h 1.2.3.4 -p 1111 --no-password -c &quot;)
if err != nil {
log.Println(err)
}
_, err := io.WriteString(stdin, fmt.Sprintf(&quot;COPY links(%s) FROM STDIN (FORMAT CSV)&quot;, strings.Join(keys, &quot;,&quot;)))
if err != nil {
log.Println(err)
}
w := csv.NewWriter(stdin)
if err := w.WriteAll(records); err != nil {
log.Fatalln(&quot;error writing record to csv:&quot;, err)
}
w.Flush()
if err := w.Error(); err != nil {
log.Fatal(err)
}
if err != nil {
log.Println(err)
}
stdin.Close()
}()
done := make(chan bool)
go func() {
_, err := io.Copy(os.Stdout, stdout)
if err != nil {
log.Fatal(err)
}
stdout.Close()
done &lt;- true
}()
&lt;-done
if err := cmd.Wait(); err != nil {
log.Println(err, cmd.Args, stdout)
}

No records are inserted and I get a non-helpful error:

exit status 2

答案1

得分: 3

github.com/lib/pq包的文档实际上有一个示例,展示了如何实现你想要的功能。以下是整个程序的适应文本:

package main

import (
	"database/sql"
	"log"

	"github.com/lib/pq"
)

func main() {
	records := [][]string{
		{"Rob", "Pike"},
		{"Ken", "Thompson"},
		{"Robert", "Griesemer"},
	}

	db, err := sql.Open("postgres", "dbname=postgres user=postgres password=postgres")
	if err != nil {
		log.Fatalf("open: %v", err)
	}
	if err = db.Ping(); err != nil {
		log.Fatalf("open ping: %v", err)
	}
	defer db.Close()

	txn, err := db.Begin()
	if err != nil {
		log.Fatalf("begin: %v", err)
	}

	stmt, err := txn.Prepare(pq.CopyIn("test", "first_name", "last_name"))
	if err != nil {
		log.Fatalf("prepare: %v", err)
	}

	for _, r := range records {
		_, err = stmt.Exec(r[0], r[1])
		if err != nil {
			log.Fatalf("exec: %v", err)
		}
	}

	_, err = stmt.Exec()
	if err != nil {
		log.Fatalf("exec: %v", err)
	}

	err = stmt.Close()
	if err != nil {
		log.Fatalf("stmt close: %v", err)
	}

	err = txn.Commit()
	if err != nil {
		log.Fatalf("commit: %v", err)
	}
}

在我的机器上,这个程序大约在2秒内导入了1000000条记录。

英文:

The github.com/lib/pq package docs actually have an example of how to do what you want. Here is the adapted text of the whole program:

package main
import (
&quot;database/sql&quot;
&quot;log&quot;
&quot;github.com/lib/pq&quot;
)
func main() {
records := [][]string{
{&quot;Rob&quot;, &quot;Pike&quot;},
{&quot;Ken&quot;, &quot;Thompson&quot;},
{&quot;Robert&quot;, &quot;Griesemer&quot;},
}
db, err := sql.Open(&quot;postgres&quot;, &quot;dbname=postgres user=postgres password=postgres&quot;)
if err != nil {
log.Fatalf(&quot;open: %v&quot;, err)
}
if err = db.Ping(); err != nil {
log.Fatalf(&quot;open ping: %v&quot;, err)
}
defer db.Close()
txn, err := db.Begin()
if err != nil {
log.Fatalf(&quot;begin: %v&quot;, err)
}
stmt, err := txn.Prepare(pq.CopyIn(&quot;test&quot;, &quot;first_name&quot;, &quot;last_name&quot;))
if err != nil {
log.Fatalf(&quot;prepare: %v&quot;, err)
}
for _, r := range records {
_, err = stmt.Exec(r[0], r[1])
if err != nil {
log.Fatalf(&quot;exec: %v&quot;, err)
}
}
_, err = stmt.Exec()
if err != nil {
log.Fatalf(&quot;exec: %v&quot;, err)
}
err = stmt.Close()
if err != nil {
log.Fatalf(&quot;stmt close: %v&quot;, err)
}
err = txn.Commit()
if err != nil {
log.Fatalf(&quot;commit: %v&quot;, err)
}
}

On my machine this imports 1 000 000 records in about 2 seconds.

答案2

得分: 2

以下代码应该指导您前进的方向:

package main
import (
"fmt"
"log"
"os"
"os/exec"
"strings"
)
func main() {
keys := []string{"customer"}
sqlCmd := fmt.Sprintf("COPY tester(%s) FROM STDIN (FORMAT CSV)", strings.Join(keys, ","))
cmd := exec.Command("psql", "<dbname>", "-U", "<username>", "-h", "<host_ip>", "--no-password", "-c", sqlCmd)
cmd.Stdin = os.Stdin
output, _ := cmd.CombinedOutput()
log.Println(string(output))
}

如果键需要是动态的,您可以从 os.Args 中获取它们。

请注意,如果您计划使用 psql 命令,那么您不需要导入 database/sqllib/pq。如果您有兴趣使用 lib/pq,请查看 lib/pq 文档中的批量导入部分。

英文:

The following code should point you in the direction you want to go:

package main
import (
&quot;fmt&quot;
&quot;log&quot;
&quot;os&quot;
&quot;os/exec&quot;
&quot;strings&quot;
)
func main() {
keys := []string{&quot;customer&quot;}
sqlCmd := fmt.Sprintf(&quot;COPY tester(%s) FROM STDIN (FORMAT CSV)&quot;, strings.Join(keys, &quot;,&quot;))
cmd := exec.Command(&quot;psql&quot;, &quot;&lt;dbname&gt;&quot;, &quot;-U&quot;, &quot;&lt;username&gt;&quot;, &quot;-h&quot;, &quot;&lt;host_ip&gt;&quot;, &quot;--no-password&quot;, &quot;-c&quot;, sqlCmd)
cmd.Stdin = os.Stdin
output, _ := cmd.CombinedOutput()
log.Println(string(output))
}

If the keys need to be dynamic you can harvest them from os.Args.

Please note that if you plan to use the psql command then you don't need to import database/sql or lib/pq. If you are interested in using lib/pq then have a look at Bulk Imports in the lib/pq documentation.

huangapple
  • 本文由 发表于 2016年2月14日 02:55:21
  • 转载请务必保留本文链接:https://go.coder-hub.com/35383998.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定