将BigQuery中的数据写入CSV文件速度较慢。

huangapple go评论71阅读模式
英文:

Writing data from bigquery to csv is slow

问题

我写了一段代码,它的行为很奇怪而且很慢,我无法理解其中的原因。
我尝试的目标是从BigQuery下载数据(使用查询作为输入),将其写入CSV文件,然后创建一个包含此CSV文件的URL链接,以便用户可以将其下载为报告。
我试图优化写入CSV的过程,因为它需要一些时间并且有一些奇怪的行为。

该代码遍历BigQuery的结果,并将每个结果传递给一个通道,以便将来进行解析/写入,使用了Golang的encoding/csv包。
以下是相关部分的一些调试信息:

func (s *Service) generateReportWorker(ctx context.Context, query, reportName string) error {
    it, err := s.bigqueryClient.Read(ctx, query)
    if err != nil {
        return err
    }
    filename := generateReportFilename(reportName)
    gcsObj := s.gcsClient.Bucket(s.config.GcsBucket).Object(filename)
    wc := gcsObj.NewWriter(ctx)
    wc.ContentType = "text/csv"
    wc.ContentDisposition = "attachment"

    csvWriter := csv.NewWriter(wc)

    var doneCount uint64

    go backgroundTimer(ctx, it.TotalRows, &doneCount)

    rowJobs := make(chan []bigquery.Value, it.TotalRows)
    workers := 10
    wg := sync.WaitGroup{}
    wg.Add(workers)

    // 启动工作池
    for i := 0; i < workers; i++ {
        go func(c context.Context, num int) {
            defer wg.Done()
            for row := range rowJobs {
                records := make([]string, len(row))
                for j, r := range records {
                    records[j] = fmt.Sprintf("%v", r)
                }
                s.mu.Lock()
                start := time.Now()
                if err := csvWriter.Write(records); err != {
                    log.Errorf("Error writing row: %v", err)
                }
                if time.Since(start) > time.Second {
                    fmt.Printf("worker %d took %v\n", num, time.Since(start))
                }
                s.mu.Unlock()
                atomic.AddUint64(&doneCount, 1)
            }
        }(ctx, i)
    }

    // 从BigQuery读取结果并添加到工作池
    for {
        var row []bigquery.Value
        if err := it.Next(&row); err != nil {
            if err == iterator.Done || err == context.DeadlineExceeded {
                break
            }
            log.Errorf("Error loading next row from BQ: %v", err)
        }
        rowJobs <- row
    }

    fmt.Println("***done loop!***")

    close(rowJobs)

    wg.Wait()

    csvWriter.Flush()
    wc.Close()

    url := fmt.Sprintf("%s/%s/%s", s.config.BaseURL, s.config.GcsBucket, filename)

    // ....
}

func backgroundTimer(ctx context.Context, total uint64, done *uint64) {
    ticker := time.NewTicker(10 * time.Second)
    go func() {
        for {
            select {
            case <-ctx.Done():
                ticker.Stop()
                return
            case _ = <-ticker.C:
                fmt.Printf("progress (%d,%d)\n", atomic.LoadUint64(done), total)
            }
        }
    }()
}

BigQuery的Read函数:

func (c *Client) Read(ctx context.Context, query string) (*bigquery.RowIterator, error)  {
    job, err := c.bigqueryClient.Query(query).Run(ctx)
    if err != nil {
        return nil, err
    }
    it, err := job.Read(ctx)
    if err != nil {
        return nil, err
    }
    return it, nil
}

我使用包含约400,000行的查询运行了这段代码。查询本身大约需要10秒钟,但整个过程需要大约2分钟。
输出结果如下:

progress (112346,392565)
progress (123631,392565)
***done loop!***
progress (123631,392565)
progress (123631,392565)
progress (123631,392565)
progress (123631,392565)
progress (123631,392565)
progress (123631,392565)
progress (123631,392565)
worker 3 took 1m16.728143875s
progress (247525,392565)
progress (247525,392565)
progress (247525,392565)
progress (247525,392565)
progress (247525,392565)
progress (247525,392565)
progress (247525,392565)
worker 3 took 1m13.525662666s
progress (370737,392565)
progress (370737,392565)
progress (370737,392565)
progress (370737,392565)
progress (370737,392565)
progress (370737,392565)
progress (370737,392565)
progress (370737,392565)
worker 4 took 1m17.576536375s
progress (392565,392565)

你可以看到,前面的112346行写入速度很快,然后由于某种原因,worker 3 花费了1.16分钟(!!!)来写入一行数据,这导致其他工作线程等待互斥锁的释放,这种情况发生了另外两次,导致整个过程花费超过2分钟才完成。

我不确定发生了什么以及如何进一步调试,为什么执行过程中会出现这种停顿?

英文:

I wrote code that behaves weird and slow and I can't understand why.
What I'm trying to do is to download data from bigquery (using a query as an input) to a CSV file, then create a url link with this CSV so people can download it as a report.
I'm trying to optimize the process of writing the CSV as it takes some time and have some weird behavior.

The code iterates over bigquery results and pass each result to a channel for future parsing/writing using golang encoding/csv package.
This is the relevant parts with some debugging

func (s *Service) generateReportWorker(ctx context.Context, query, reportName string) error {
it, err := s.bigqueryClient.Read(ctx, query)
if err != nil {
return err
}
filename := generateReportFilename(reportName)
gcsObj := s.gcsClient.Bucket(s.config.GcsBucket).Object(filename)
wc := gcsObj.NewWriter(ctx)
wc.ContentType = &quot;text/csv&quot;
wc.ContentDisposition = &quot;attachment&quot;
csvWriter := csv.NewWriter(wc)
var doneCount uint64
go backgroundTimer(ctx, it.TotalRows, &amp;doneCount)
rowJobs := make(chan []bigquery.Value, it.TotalRows)
workers := 10
wg := sync.WaitGroup{}
wg.Add(workers)
// start wrokers pool
for i := 0; i &lt; workers; i++ {
go func(c context.Context, num int) {
defer wg.Done()
for row := range rowJobs {
records := make([]string, len(row))
for j, r := range records {
records[j] = fmt.Sprintf(&quot;%v&quot;, r)
}
s.mu.Lock()
start := time.Now()
if err := csvWriter.Write(records); err != {
log.Errorf(&quot;Error writing row: %v&quot;, err)
}
if time.Since(start) &gt; time.Second {
fmt.Printf(&quot;worker %d took %v\n&quot;, num, time.Since(start))
}
s.mu.Unlock()
atomic.AddUint64(&amp;doneCount, 1)
}
}(ctx, i)
}
// read results from bigquery and add to the pool
for {
var row []bigquery.Value
if err := it.Next(&amp;row); err != nil {
if err == iterator.Done || err == context.DeadlineExceeded {
break
}
log.Errorf(&quot;Error loading next row from BQ: %v&quot;, err)
}
rowJobs &lt;- row
}
fmt.Println(&quot;***done loop!***&quot;)
close(rowJobs)
wg.Wait()
csvWriter.Flush()
wc.Close()
url := fmt.Sprintf(&quot;%s/%s/%s&quot;, s.config.BaseURL s.config.GcsBucket, filename)
/// ....
}
func backgroundTimer(ctx context.Context, total uint64, done *uint64) {
ticker := time.NewTicker(10 * time.Second)
go func() {
for {
select {
case &lt;-ctx.Done():
ticker.Stop()
return
case _ = &lt;-ticker.C:
fmt.Printf(&quot;progress (%d,%d)\n&quot;, atomic.LoadUint64(done), total)
}
}
}()
}

bigquery Read func

func (c *Client) Read(ctx context.Context, query string) (*bigquery.RowIterator, error)  {
job, err := c.bigqueryClient.Query(query).Run(ctx)
if err != nil {
return nil, err
}
it, err := job.Read(ctx)
if err != nil {
return nil, err
}
return it, nil
}

I run this code with query that have about 400,000 rows. the query itself take around 10 seconds, but the whole process takes around 2 minutes
The output:

progress (112346,392565)
progress (123631,392565)
***done loop!***
progress (123631,392565)
progress (123631,392565)
progress (123631,392565)
progress (123631,392565)
progress (123631,392565)
progress (123631,392565)
progress (123631,392565)
worker 3 took 1m16.728143875s
progress (247525,392565)
progress (247525,392565)
progress (247525,392565)
progress (247525,392565)
progress (247525,392565)
progress (247525,392565)
progress (247525,392565)
worker 3 took 1m13.525662666s
progress (370737,392565)
progress (370737,392565)
progress (370737,392565)
progress (370737,392565)
progress (370737,392565)
progress (370737,392565)
progress (370737,392565)
progress (370737,392565)
worker 4 took 1m17.576536375s
progress (392565,392565)

You can see that writing first 112346 rows was fast, then for some reason worker 3 took 1.16minutes (!!!) to write a single row, which cause the other workers to wait for the mutex to be released, and this happened again 2 more times, which caused the whole process to take more than 2 minutes to finish.

I'm not sure whats going and how can I debug this further, why I have this stalls in the execution?

答案1

得分: 1

根据@serge-v的建议,你可以将所有记录写入本地文件,然后将整个文件作为一个整体传输到GCS。为了在更短的时间内完成这个过程,你可以将文件分成多个块,并使用以下命令:gsutil -m cp -j,其中:

gsutil 用于从命令行访问云存储

-m 用于执行并行多线程/多进程复制

cp 用于复制文件

-j 对任何文件上传应用gzip传输编码。这样可以在保持数据在云存储中未压缩的同时节省网络带宽。

要在你的Go程序中应用这个命令,你可以参考这个GitHub链接

你可以尝试在你的Go程序中实现性能分析。性能分析可以帮助你分析复杂性。通过性能分析,你还可以找出程序中的时间消耗。

由于你正在从BigQuery读取数百万行数据,你可以尝试使用BigQuery Storage API。它提供比批量数据导出更快的访问速度。使用BigQuery Storage API而不是你在Go程序中使用的迭代器可以加快处理速度。

你还可以参考BigQuery提供的查询优化技巧

英文:

As suggested by @serge-v, you can write all the records to a local file and then transfer the file as a whole to GCS. To make the process happen in a shorter time span you can split the files into multiple chunks and can use this command : gsutil -m cp -j where

gsutil is used to access cloud storage from command line

-m is used to perform a parallel multi-threaded/multi-processing copy

cp is used to copy files

-j applies gzip transport encoding to any file upload. This also saves network bandwidth while leaving the data uncompressed in Cloud Storage.

To apply this command in your go Program you can refer to this Github link.

You could try implementing profiling in your Go program. Profiling will help you analyze the complexity. You can also find the time consumption in the program through profiling.

Since you are reading millions of rows from BigQuery you can try using the BigQuery Storage API. It Provides faster access to BigQuery-managed Storage than Bulk data export. Using BigQuery Storage API rather than the iterators that you are using in Go program can make the process faster.

For more reference you can also look into the Query Optimization techniques provided by BigQuery.

huangapple
  • 本文由 发表于 2022年1月20日 18:35:13
  • 转载请务必保留本文链接:https://go.coder-hub.com/70784658.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定