如何从标准输入导入行到PostgreSQL?

huangapple go评论122阅读模式
英文:

How do I import rows to Postgresql from STDIN?

问题

在Python中,我有以下代码可以将行批量加载到PostgreSQL中,而无需使用文件:

  1. import csv
  2. import subprocess
  3. mylist, keys = [{'name': 'fred'}, {'name': 'mary'}], ['name']
  4. p = subprocess.Popen(['psql', 'mydb', '-U', 'openupitsme', '-h', 'my.ip.address', '--no-password', '-c',
  5. '\COPY tester(%s) FROM STDIN (FORMAT CSV)' % ','.join(keys),
  6. '--set=ON_ERROR_STOP=false'
  7. ], stdin=subprocess.PIPE
  8. )
  9. for d in mylist:
  10. dict_writer = csv.DictWriter(p.stdin, keys, quoting=csv.QUOTE_MINIMAL)
  11. dict_writer.writerow(d)
  12. p.stdin.close()

我正在尝试在Go中实现相同的功能。我目前是将行写入文件,然后导入它们,然后删除该文件。我希望像在Python中那样从STDIN导入行。我有以下代码:

  1. package main
  2. import (
  3. "database/sql"
  4. "log"
  5. "os"
  6. "os/exec"
  7. _ "github.com/lib/pq"
  8. )
  9. var (
  10. err error
  11. db *sql.DB
  12. )
  13. func main() {
  14. var err error
  15. fh := "/path/to/my/file.txt"
  16. f, err := os.Create(fh)
  17. if err != nil {
  18. panic(err)
  19. }
  20. defer f.Close()
  21. defer os.Remove(fh)
  22. rows := []string{"fred", "mary"}
  23. for _, n := range rows {
  24. _, err = f.WriteString(n + "\n")
  25. if err != nil {
  26. panic(err)
  27. }
  28. }
  29. // dump to postgresql
  30. c := exec.Command("psql", "mydb", "-U", "openupitsme", "-h", "my.ip.address", "--no-password",
  31. "-c", `\COPY tester(customer) FROM `+fh)
  32. if out, err := c.CombinedOutput(); err != nil {
  33. log.Println(string(out), err)
  34. }
  35. }

稍微修改一下,但是这不会插入记录:

  1. keys := []string{"link", "domain"}
  2. records := [][]string{
  3. {"first_name", "last_name"},
  4. {"Rob", "Pike"},
  5. {"Ken", "Thompson"},
  6. {"Robert", "Griesemer"},
  7. }
  8. cmd := exec.Command("psql")
  9. stdin, err := cmd.StdinPipe()
  10. if err != nil {
  11. log.Println(err)
  12. }
  13. stdout, err := cmd.StdoutPipe()
  14. if err != nil {
  15. log.Println(err)
  16. }
  17. if err := cmd.Start(); err != nil {
  18. log.Println(err)
  19. }
  20. go func() {
  21. _, err = io.WriteString(stdin, "search -U meyo -h 1.2.3.4 -p 1111 --no-password -c ")
  22. if err != nil {
  23. log.Println(err)
  24. }
  25. _, err := io.WriteString(stdin, fmt.Sprintf("COPY links(%s) FROM STDIN (FORMAT CSV)", strings.Join(keys, ",")))
  26. if err != nil {
  27. log.Println(err)
  28. }
  29. w := csv.NewWriter(stdin)
  30. if err := w.WriteAll(records); err != nil {
  31. log.Fatalln("error writing record to csv:", err)
  32. }
  33. w.Flush()
  34. if err := w.Error(); err != nil {
  35. log.Fatal(err)
  36. }
  37. if err != nil {
  38. log.Println(err)
  39. }
  40. stdin.Close()
  41. }()
  42. done := make(chan bool)
  43. go func() {
  44. _, err := io.Copy(os.Stdout, stdout)
  45. if err != nil {
  46. log.Fatal(err)
  47. }
  48. stdout.Close()
  49. done <- true
  50. }()
  51. <-done
  52. if err := cmd.Wait(); err != nil {
  53. log.Println(err, cmd.Args, stdout)
  54. }

没有插入记录,并且我得到了一个不太有用的错误:

  1. exit status 2
英文:

In Python I have the following that will bulk-load rows to Postgresql without using a file:

  1. import csv
  2. import subprocess
  3. mylist, keys = [{&#39;name&#39;: &#39;fred&#39;}, {&#39;name&#39;: &#39;mary&#39;}], [&#39;name&#39;]
  4. p = subprocess.Popen([&#39;psql&#39;, &#39;mydb&#39;, &#39;-U&#39;, &#39;openupitsme&#39;, &#39;-h&#39;, &#39;my.ip.address&#39;, &#39;--no-password&#39;, &#39;-c&#39;,
  5. &#39;\COPY tester(%s) FROM STDIN (FORMAT CSV)&#39; % &#39;, &#39;.join(keys),
  6. &#39;--set=ON_ERROR_STOP=false&#39;
  7. ], stdin=subprocess.PIPE
  8. )
  9. for d in mylist:
  10. dict_writer = csv.DictWriter(p.stdin, keys, quoting=csv.QUOTE_MINIMAL)
  11. dict_writer.writerow(d)
  12. p.stdin.close()

I am trying to accomplish the same in Go. I am currently writing the rows to a file then importing them and then deleting that file. I'd like to import the rows from STDIN like I do in Python. I have:

  1. package main
  2. import (
  3. &quot;database/sql&quot;
  4. &quot;log&quot;
  5. &quot;os&quot;
  6. &quot;os/exec&quot;
  7. _ &quot;github.com/lib/pq&quot;
  8. )
  9. var (
  10. err error
  11. db *sql.DB
  12. )
  13. func main() {
  14. var err error
  15. fh := &quot;/path/to/my/file.txt&quot;
  16. f, err := os.Create(fh)
  17. if err != nil {
  18. panic(err)
  19. }
  20. defer f.Close()
  21. defer os.Remove(fh)
  22. rows := []string{&quot;fred&quot;, &quot;mary&quot;}
  23. for _, n := range rows {
  24. _, err = f.WriteString(n + &quot;\n&quot;)
  25. if err != nil {
  26. panic(err)
  27. }
  28. }
  29. // dump to postgresql
  30. c := exec.Command(&quot;psql&quot;, &quot;mydb&quot;, &quot;-U&quot;, &quot;openupitsme&quot;, &quot;-h&quot;, &quot;my.ip.address&quot;, &quot;--no-password&quot;,
  31. &quot;-c&quot;, `\COPY tester(customer) FROM `+fh)
  32. if out, err := c.CombinedOutput(); err != nil {
  33. log.Println(string(out), err)
  34. }
  35. }

EDIT:
A bit further along but this is not inserting records:

  1. keys := []string{&quot;link&quot;, &quot;domain&quot;}
  2. records := [][]string{
  3. {&quot;first_name&quot;, &quot;last_name&quot;},
  4. {&quot;Rob&quot;, &quot;Pike&quot;},
  5. {&quot;Ken&quot;, &quot;Thompson&quot;},
  6. {&quot;Robert&quot;, &quot;Griesemer&quot;},
  7. }
  8. cmd := exec.Command(&quot;psql&quot;)
  9. stdin, err := cmd.StdinPipe()
  10. if err != nil {
  11. log.Println(err)
  12. }
  13. stdout, err := cmd.StdoutPipe()
  14. if err != nil {
  15. log.Println(err)
  16. }
  17. if err := cmd.Start(); err != nil {
  18. log.Println(err)
  19. }
  20. go func() {
  21. _, err = io.WriteString(stdin, &quot;search -U meyo -h 1.2.3.4 -p 1111 --no-password -c &quot;)
  22. if err != nil {
  23. log.Println(err)
  24. }
  25. _, err := io.WriteString(stdin, fmt.Sprintf(&quot;COPY links(%s) FROM STDIN (FORMAT CSV)&quot;, strings.Join(keys, &quot;,&quot;)))
  26. if err != nil {
  27. log.Println(err)
  28. }
  29. w := csv.NewWriter(stdin)
  30. if err := w.WriteAll(records); err != nil {
  31. log.Fatalln(&quot;error writing record to csv:&quot;, err)
  32. }
  33. w.Flush()
  34. if err := w.Error(); err != nil {
  35. log.Fatal(err)
  36. }
  37. if err != nil {
  38. log.Println(err)
  39. }
  40. stdin.Close()
  41. }()
  42. done := make(chan bool)
  43. go func() {
  44. _, err := io.Copy(os.Stdout, stdout)
  45. if err != nil {
  46. log.Fatal(err)
  47. }
  48. stdout.Close()
  49. done &lt;- true
  50. }()
  51. &lt;-done
  52. if err := cmd.Wait(); err != nil {
  53. log.Println(err, cmd.Args, stdout)
  54. }

No records are inserted and I get a non-helpful error:

  1. exit status 2

答案1

得分: 3

github.com/lib/pq包的文档实际上有一个示例,展示了如何实现你想要的功能。以下是整个程序的适应文本:

  1. package main
  2. import (
  3. "database/sql"
  4. "log"
  5. "github.com/lib/pq"
  6. )
  7. func main() {
  8. records := [][]string{
  9. {"Rob", "Pike"},
  10. {"Ken", "Thompson"},
  11. {"Robert", "Griesemer"},
  12. }
  13. db, err := sql.Open("postgres", "dbname=postgres user=postgres password=postgres")
  14. if err != nil {
  15. log.Fatalf("open: %v", err)
  16. }
  17. if err = db.Ping(); err != nil {
  18. log.Fatalf("open ping: %v", err)
  19. }
  20. defer db.Close()
  21. txn, err := db.Begin()
  22. if err != nil {
  23. log.Fatalf("begin: %v", err)
  24. }
  25. stmt, err := txn.Prepare(pq.CopyIn("test", "first_name", "last_name"))
  26. if err != nil {
  27. log.Fatalf("prepare: %v", err)
  28. }
  29. for _, r := range records {
  30. _, err = stmt.Exec(r[0], r[1])
  31. if err != nil {
  32. log.Fatalf("exec: %v", err)
  33. }
  34. }
  35. _, err = stmt.Exec()
  36. if err != nil {
  37. log.Fatalf("exec: %v", err)
  38. }
  39. err = stmt.Close()
  40. if err != nil {
  41. log.Fatalf("stmt close: %v", err)
  42. }
  43. err = txn.Commit()
  44. if err != nil {
  45. log.Fatalf("commit: %v", err)
  46. }
  47. }

在我的机器上,这个程序大约在2秒内导入了1000000条记录。

英文:

The github.com/lib/pq package docs actually have an example of how to do what you want. Here is the adapted text of the whole program:

  1. package main
  2. import (
  3. &quot;database/sql&quot;
  4. &quot;log&quot;
  5. &quot;github.com/lib/pq&quot;
  6. )
  7. func main() {
  8. records := [][]string{
  9. {&quot;Rob&quot;, &quot;Pike&quot;},
  10. {&quot;Ken&quot;, &quot;Thompson&quot;},
  11. {&quot;Robert&quot;, &quot;Griesemer&quot;},
  12. }
  13. db, err := sql.Open(&quot;postgres&quot;, &quot;dbname=postgres user=postgres password=postgres&quot;)
  14. if err != nil {
  15. log.Fatalf(&quot;open: %v&quot;, err)
  16. }
  17. if err = db.Ping(); err != nil {
  18. log.Fatalf(&quot;open ping: %v&quot;, err)
  19. }
  20. defer db.Close()
  21. txn, err := db.Begin()
  22. if err != nil {
  23. log.Fatalf(&quot;begin: %v&quot;, err)
  24. }
  25. stmt, err := txn.Prepare(pq.CopyIn(&quot;test&quot;, &quot;first_name&quot;, &quot;last_name&quot;))
  26. if err != nil {
  27. log.Fatalf(&quot;prepare: %v&quot;, err)
  28. }
  29. for _, r := range records {
  30. _, err = stmt.Exec(r[0], r[1])
  31. if err != nil {
  32. log.Fatalf(&quot;exec: %v&quot;, err)
  33. }
  34. }
  35. _, err = stmt.Exec()
  36. if err != nil {
  37. log.Fatalf(&quot;exec: %v&quot;, err)
  38. }
  39. err = stmt.Close()
  40. if err != nil {
  41. log.Fatalf(&quot;stmt close: %v&quot;, err)
  42. }
  43. err = txn.Commit()
  44. if err != nil {
  45. log.Fatalf(&quot;commit: %v&quot;, err)
  46. }
  47. }

On my machine this imports 1 000 000 records in about 2 seconds.

答案2

得分: 2

以下代码应该指导您前进的方向:

  1. package main
  2. import (
  3. "fmt"
  4. "log"
  5. "os"
  6. "os/exec"
  7. "strings"
  8. )
  9. func main() {
  10. keys := []string{"customer"}
  11. sqlCmd := fmt.Sprintf("COPY tester(%s) FROM STDIN (FORMAT CSV)", strings.Join(keys, ","))
  12. cmd := exec.Command("psql", "<dbname>", "-U", "<username>", "-h", "<host_ip>", "--no-password", "-c", sqlCmd)
  13. cmd.Stdin = os.Stdin
  14. output, _ := cmd.CombinedOutput()
  15. log.Println(string(output))
  16. }

如果键需要是动态的,您可以从 os.Args 中获取它们。

请注意,如果您计划使用 psql 命令,那么您不需要导入 database/sqllib/pq。如果您有兴趣使用 lib/pq,请查看 lib/pq 文档中的批量导入部分。

英文:

The following code should point you in the direction you want to go:

  1. package main
  2. import (
  3. &quot;fmt&quot;
  4. &quot;log&quot;
  5. &quot;os&quot;
  6. &quot;os/exec&quot;
  7. &quot;strings&quot;
  8. )
  9. func main() {
  10. keys := []string{&quot;customer&quot;}
  11. sqlCmd := fmt.Sprintf(&quot;COPY tester(%s) FROM STDIN (FORMAT CSV)&quot;, strings.Join(keys, &quot;,&quot;))
  12. cmd := exec.Command(&quot;psql&quot;, &quot;&lt;dbname&gt;&quot;, &quot;-U&quot;, &quot;&lt;username&gt;&quot;, &quot;-h&quot;, &quot;&lt;host_ip&gt;&quot;, &quot;--no-password&quot;, &quot;-c&quot;, sqlCmd)
  13. cmd.Stdin = os.Stdin
  14. output, _ := cmd.CombinedOutput()
  15. log.Println(string(output))
  16. }

If the keys need to be dynamic you can harvest them from os.Args.

Please note that if you plan to use the psql command then you don't need to import database/sql or lib/pq. If you are interested in using lib/pq then have a look at Bulk Imports in the lib/pq documentation.

huangapple
  • 本文由 发表于 2016年2月14日 02:55:21
  • 转载请务必保留本文链接:https://go.coder-hub.com/35383998.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定