英文:
Large batch Inserts Using Snowflake & Go
问题
我正在从一个REST API中检索有效载荷,然后希望将其插入Snowflake表中。
我目前的流程是使用Snowflake数据库连接,并迭代一个包含来自API的数据的结构体切片。然而,这似乎不是高效或最佳的方法。一切都成功加载,但我正在尝试找出如何优化大量插入,可能涉及数千条记录。也许需要一个单独的通道来进行插入,而不是同步插入?
代码的一般流程如下:
import (
"database/sql"
"fmt"
"sync"
"time"
_ "github.com/snowflakedb/gosnowflake"
)
func ETL() {
var wg sync.WaitGroup
ch := make(chan []*Response)
defer close(ch)
// 创建API请求
for _, req := range requests {
// 所有这些都没有问题
wg.Add(1)
go func(request Request) {
defer wg.Done()
resp, _ := request.Get()
ch <- resp
}(request)
}
// 连接到Snowflake
// 这不是问题
connString := fmt.Sprintf(config...)
db, _ := sql.Open("snowflake", connString)
defer db.Close()
// 从通道中收集响应
results := make([][]*Response, len(requests))
for i, _ := range results {
results[i] <- ch
for _, res := range results[i] {
// transform只是将我的结构体扁平化为我想要插入Snowflake的条目的函数。这不是瓶颈。
entries := transform(res)
// 将数据加载到Snowflake中,传递已经扁平化的条目以及数据库连接
err := load(entries, db)
}
}
}
type Entry struct {
field1 string
field2 string
statusCode int
}
func load(entries []*Entry, db *sql.DB) error {
start := time.Now()
for i, entry := range entries {
fmt.Printf("正在加载条目 %d\n", i)
stmt := `INSERT INTO tbl (field1, field2, updated_date, status_code)
VALUES (?, ?, CURRENT_TIMESTAMP(), ?)`
_, err := db.Exec(stmt, entry.field1, entry.field2, entry.statusCode)
if err != nil {
fmt.Println(err)
return err
}
}
fmt.Println("加载时间:", time.Since(start))
return nil
}
英文:
I am retrieving payloads from a REST API with which I then want to insert into a Snowflake table.
My current process is to use the Snowflake DB connection and iterate over a slice of structs (which contain my data from the API). However, this doesn't seem to be efficient or optimal. Everything is successfully loading, but I am trying to figure out how to optimize a large amount of inserts for potentially thousands of records. Perhaps there needs to be a separate channel for insertions instead of synchronously inserting?
General code flow:
import (
"database/sql"
"fmt"
"sync"
"time"
_ "github.com/snowflakedb/gosnowflake"
)
func ETL() {
var wg sync.WaitGroup
ch := make(chan []*Response)
defer close(ch)
// Create requests to API
for _, req := range requests {
// All of this flows fine without issue
wg.Add(1)
go func(request Request) {
defer wg.Done()
resp, _ := request.Get()
ch <- resp
}(request)
}
// Connect to snowflake
// This is not a problem
connString := fmt.Sprintf(config...)
db, _ := sql.Open("snowflake", connString)
defer db.Close()
// Collect responses from our channel
results := make([][]*Response, len(requests))
for i, _ := range results {
results[i] <-ch
for _, res := range results[i] {
// transform is just a function to flatten my structs into entries that I would like to insert into Snowflake. This is not a bottleneck.
entries := transform(res)
// Load the data into snowflake, passing the entries that have been
// Flattened as well as the db connection
err := load(entries, db)
}
}
}
type Entry struct {
field1 string
field2 string
statusCode int
}
func load(entries []*Entry, db *sql.DB) error {
start := time.Now()
for i, entry := range entries {
fmt.Printf("Loading entry %d\n", i)
stmt := `INSERT INTO tbl (field1, field2, updated_date, status_code)
VALUES (?, ?, CURRENT_TIMESTAMP(), ?)`
_, err := db.Exec(stmt, entry.field1, entry.field2, entry.statusCode)
if err != nil {
fmt.Println(err)
return err
}
}
fmt.Println("Load time: ", time.Since(start))
return nil
}
答案1
得分: 1
不要担心,我会为你翻译这段内容。以下是翻译好的内容:
而不是逐个插入行,可以将行收集到文件中,每次将其中一个推送到S3/GCS/Azure时,它将立即加载。
我写了一篇详细介绍这些步骤的文章:
通过适当的存储集成,这将自动导入文件:
create pipe temp.public.meetup202011_pipe
auto_ingest = true
integration = temp_meetup202011_pubsub_int
as
copy into temp.public.meetup202011_rsvps
from @temp_fhoffa_gcs_meetup;
还要检查以下注意事项:
即将推出:如果您想发送单个行并实时将其导入Snowflake - 这正在开发中(https://www.snowflake.com/blog/snowflake-streaming-now-hiring-help-design-and-build-the-future-of-big-data-and-stream-processing/)。
英文:
Instead of INSERT
ing individual rows, collect rows in files and each time you push one of these to S3/GCS/Azure it will be loaded immediately.
I wrote a post detailing these steps:
With the appropriate storage integration, this would auto-ingest the files:
create pipe temp.public.meetup202011_pipe
auto_ingest = true
integration = temp_meetup202011_pubsub_int
as
copy into temp.public.meetup202011_rsvps
from @temp_fhoffa_gcs_meetup;
Also check these considerations:
Soon: If you want to send individual rows and ingest them in real time into Snowflake - that's in development (https://www.snowflake.com/blog/snowflake-streaming-now-hiring-help-design-and-build-the-future-of-big-data-and-stream-processing/).
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论