大批量插入使用Snowflake和Go

huangapple go评论95阅读模式
英文:

Large batch Inserts Using Snowflake & Go

问题

我正在从一个REST API中检索有效载荷,然后希望将其插入Snowflake表中。

我目前的流程是使用Snowflake数据库连接,并迭代一个包含来自API的数据的结构体切片。然而,这似乎不是高效或最佳的方法。一切都成功加载,但我正在尝试找出如何优化大量插入,可能涉及数千条记录。也许需要一个单独的通道来进行插入,而不是同步插入?

代码的一般流程如下:

import (
	"database/sql"
	"fmt"
	"sync"
	"time"

	_ "github.com/snowflakedb/gosnowflake"
)

func ETL() {
	var wg sync.WaitGroup
	ch := make(chan []*Response)
	defer close(ch)

	// 创建API请求
	for _, req := range requests {
		// 所有这些都没有问题
		wg.Add(1)
		go func(request Request) {
			defer wg.Done()
			resp, _ := request.Get()
			ch <- resp
		}(request)
	}

	// 连接到Snowflake
	// 这不是问题
	connString := fmt.Sprintf(config...)
	db, _ := sql.Open("snowflake", connString)
	defer db.Close()

	// 从通道中收集响应
	results := make([][]*Response, len(requests))
	for i, _ := range results {
		results[i] <- ch
		for _, res := range results[i] {
			// transform只是将我的结构体扁平化为我想要插入Snowflake的条目的函数。这不是瓶颈。
			entries := transform(res)

			// 将数据加载到Snowflake中,传递已经扁平化的条目以及数据库连接
			err := load(entries, db)
		}
	}
}

type Entry struct {
	field1     string
	field2     string
	statusCode int
}

func load(entries []*Entry, db *sql.DB) error {
	start := time.Now()
	for i, entry := range entries {
		fmt.Printf("正在加载条目 %d\n", i)

		stmt := `INSERT INTO tbl (field1, field2, updated_date, status_code)
			 VALUES (?, ?, CURRENT_TIMESTAMP(), ?)`

		_, err := db.Exec(stmt, entry.field1, entry.field2, entry.statusCode)
		if err != nil {
			fmt.Println(err)
			return err
		}
	}
	fmt.Println("加载时间:", time.Since(start))
	return nil
}
英文:

I am retrieving payloads from a REST API with which I then want to insert into a Snowflake table.

My current process is to use the Snowflake DB connection and iterate over a slice of structs (which contain my data from the API). However, this doesn't seem to be efficient or optimal. Everything is successfully loading, but I am trying to figure out how to optimize a large amount of inserts for potentially thousands of records. Perhaps there needs to be a separate channel for insertions instead of synchronously inserting?

General code flow:

import (
&quot;database/sql&quot;
&quot;fmt&quot;
&quot;sync&quot;
&quot;time&quot;
_ &quot;github.com/snowflakedb/gosnowflake&quot;
)
func ETL() {
var wg sync.WaitGroup
ch := make(chan []*Response)
defer close(ch)
// Create requests to API
for _, req := range requests {
// All of this flows fine without issue
wg.Add(1)
go func(request Request) {
defer wg.Done()
resp, _ := request.Get() 
ch &lt;- resp
}(request)
}
// Connect to snowflake
// This is not a problem
connString := fmt.Sprintf(config...)
db, _ := sql.Open(&quot;snowflake&quot;, connString)
defer db.Close()
// Collect responses from our channel
results := make([][]*Response, len(requests))
for i, _ := range results {
results[i] &lt;-ch
for _, res :=  range results[i] {
// transform is just a function to flatten my structs into entries that I would like to insert into Snowflake. This is not a bottleneck.
entries := transform(res)
// Load the data into snowflake, passing the entries that have been
// Flattened as well as the db connection
err := load(entries, db)
}
}
}
type Entry struct {
field1 string
field2 string
statusCode int
}
func load(entries []*Entry, db *sql.DB) error {
start := time.Now()
for i, entry := range entries {
fmt.Printf(&quot;Loading entry %d\n&quot;, i)
stmt := `INSERT INTO tbl (field1, field2, updated_date, status_code)
VALUES (?, ?, CURRENT_TIMESTAMP(), ?)`
_, err := db.Exec(stmt, entry.field1, entry.field2, entry.statusCode)
if err != nil {
fmt.Println(err)
return err
}
}
fmt.Println(&quot;Load time: &quot;, time.Since(start))
return nil
}

答案1

得分: 1

不要担心,我会为你翻译这段内容。以下是翻译好的内容:

而不是逐个插入行,可以将行收集到文件中,每次将其中一个推送到S3/GCS/Azure时,它将立即加载。

我写了一篇详细介绍这些步骤的文章:

通过适当的存储集成,这将自动导入文件:

create pipe temp.public.meetup202011_pipe
auto_ingest = true
integration = temp_meetup202011_pubsub_int
as
copy into temp.public.meetup202011_rsvps
from @temp_fhoffa_gcs_meetup;

还要检查以下注意事项:

即将推出:如果您想发送单个行并实时将其导入Snowflake - 这正在开发中(https://www.snowflake.com/blog/snowflake-streaming-now-hiring-help-design-and-build-the-future-of-big-data-and-stream-processing/)。

英文:

Instead of INSERTing individual rows, collect rows in files and each time you push one of these to S3/GCS/Azure it will be loaded immediately.

I wrote a post detailing these steps:

With the appropriate storage integration, this would auto-ingest the files:

create pipe temp.public.meetup202011_pipe
auto_ingest = true
integration = temp_meetup202011_pubsub_int
as
copy into temp.public.meetup202011_rsvps
from @temp_fhoffa_gcs_meetup;

Also check these considerations:

Soon: If you want to send individual rows and ingest them in real time into Snowflake - that's in development (https://www.snowflake.com/blog/snowflake-streaming-now-hiring-help-design-and-build-the-future-of-big-data-and-stream-processing/).

huangapple
  • 本文由 发表于 2023年2月1日 00:22:49
  • 转载请务必保留本文链接:https://go.coder-hub.com/75300089.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定