将CSV文件加载到bigquery中,os.Create()之后不会加载数据。

huangapple go评论80阅读模式
英文:

Loading CSV file into bigquery after os.Create() doesn't load data

问题

我正在尝试运行以下流程:

  1. 从某个地方获取数据
  2. 创建新的本地 CSV 文件,将数据写入该文件
  3. 将 CSV 文件上传到 BigQuery
  4. 删除本地文件

但是似乎加载了空数据。
这是代码:

func (c *Client) Do(ctx context.Context) error {
	bqClient, err := bigquerypkg.NewBigQueryUtil(ctx, "projectID", "datasetID")
	if err != nil {
		return err
	}

	data, err := c.GetSomeData(ctx)
	if err != nil {
		return err
	}

	file, err := os.Create("example.csv")
	if err != nil {
		return err
	}
	defer file.Close()
	// 还需要删除文件

	writer := csv.NewWriter(file)
	defer writer.Flush()

	timestamp := time.Now().UTC().Format("2006-01-02 03:04:05.000000000")
	for _, d := range data {
		csvRow := []string{
			d.ID,
			d.Name,
			timestamp,
		}
		err = writer.Write(csvRow)
		if err != nil {
			log.Printf("error writing data to CSV: %v\n", err)
		}
	}

	source := bigquery.NewReaderSource(file)
	source.Schema = bigquery.Schema{
		{Name: "id", Type: bigquery.StringFieldType},
		{Name: "name", Type: bigquery.StringFieldType},
		{Name: "createdAt", Type: bigquery.TimestampFieldType},
	}
	if _, err = bqClient.LoadCsv(ctx, "tableID", source); err != nil {
		return err
	}

	return nil
}

LoadCSV() 的代码如下:

func (c *Client) LoadCsv(ctx context.Context, tableID string, src bigquery.LoadSource) (string, error) {
	loader := c.bigQueryClient.Dataset(c.datasetID).Table(tableID).LoaderFrom(src)
	loader.WriteDisposition = bigquery.WriteTruncate
	job, err := loader.Run(ctx)
	if err != nil {
		return "", err
	}
	status, err := job.Wait(ctx)
	if err != nil {
		return job.ID(), err
	}

	if status.Err() != nil {
		return job.ID(), fmt.Errorf("job completed with error: %v", status.Err())
	}

	return job.ID(), nil
}

运行后,BigQuery 确实创建了模式,但没有数据。
如果我将 os.Create() 更改为 os.Open(),并且文件已经存在,一切正常。就好像在加载 CSV 时,文件数据尚未被写入一样(?)
原因是什么?

英文:

I'm trying to run the following flow:

  1. Get data from somewhere
  2. Create new local CSV file, write the data into that file
  3. Upload the CSV to Bigquery
  4. Delete the local file

But it seems to load empty data.
This is the code:

func (c *Client) Do(ctx context.Context) error {
	bqClient, err := bigquerypkg.NewBigQueryUtil(ctx, "projectID", "datasetID")
	if err != nil {
		return err
	}

	data, err := c.GetSomeData(ctx)
	if err != nil {
		return err
	}

	file, err := os.Create("example.csv")
	if err != nil {
		return err
	}
	defer file.Close()
	// also file need to be delete

	writer := csv.NewWriter(file)
	defer writer.Flush()

	timestamp := time.Now().UTC().Format("2006-01-02 03:04:05.000000000")
	for _, d := range data {
		csvRow := []string{
			d.ID,
			d.Name,
			timestamp,
		}
		err = writer.Write(csvRow)
		if err != nil {
			log.Printf("error writing data to CSV: %v\n", err)
		}
	}

	source := bigquery.NewReaderSource(file)
	source.Schema = bigquery.Schema{
		{Name: "id", Type: bigquery.StringFieldType},
		{Name: "name", Type: bigquery.StringFieldType},
		{Name: "createdAt", Type: bigquery.TimestampFieldType},
	}
	if _, err = bqClient.LoadCsv(ctx, "tableID", source); err != nil {
		return err
	}

	return nil
}

LoadCSV() looks like this:

func (c *Client) LoadCsv(ctx context.Context, tableID string, src bigquery.LoadSource) (string, error) {
	loader := c.bigQueryClient.Dataset(c.datasetID).Table(tableID).LoaderFrom(src)
	loader.WriteDisposition = bigquery.WriteTruncate
	job, err := loader.Run(ctx)
	if err != nil {
		return "", err
	}
	status, err := job.Wait(ctx)
	if err != nil {
		return job.ID(), err
	}

	if status.Err() != nil {
		return job.ID(), fmt.Errorf("job completed with error: %v", status.Err())
	}

	return job.ID(), nil
}

After running this, bigquery does create the schema but with no data.
If I'm changing os.Create() to os.Open() and the file already exist, everything work. It's like when loading the CSV the file data is not yet written (?)
What's the reason?

答案1

得分: 1

我看到的问题是你没有将文件句柄的光标倒回到文件的开头。因此,下一次读取将在文件的末尾进行,并且将读取0字节。这就解释了为什么文件中似乎没有内容。

https://pkg.go.dev/os#File.Seek 可以帮助你处理这个问题。

实际上,Flush 不相关,因为你使用的是同一个文件句柄来读取文件,就像你用来写入文件一样,所以你会看到自己写入的字节,即使没有刷新。如果文件是由不同的进程打开或重新打开,情况就不同了。

演示代码如下:

package main

import (
	"fmt"
	"io"
	"os"
)

func main() {
	f, err := os.CreateTemp("", "data.csv")
	if err != nil {
		panic(err)
	} else {
		defer f.Close()
		defer os.Remove(f.Name())
	}
	fmt.Fprintf(f, "hello, world")
	fmt.Fprintln(os.Stderr, "Before rewind:")
	if _, err := io.Copy(os.Stderr, f); err != nil {
		panic(err)
	}
	f.Seek(0, io.SeekStart)
	fmt.Fprintln(os.Stderr, "\nAfter rewind:")
	if _, err := io.Copy(os.Stderr, f); err != nil {
		panic(err)
	}
	fmt.Fprintln(os.Stderr, "\n")
}

运行结果如下:

% go run t.go
Before rewind:

After rewind:
hello, world

英文:

The problem I see here is that you don't rewind the file handle's cursor to the beginning of the file. Thus, the next read will be at the end of the file, and will be a 0 byte read. That explains why it seems like there's no content in the file.

https://pkg.go.dev/os#File.Seek can handle this for you.

Actually, the Flush is not relevant, because you're using the same file handle to read the file than you did to write it, so you'll see your own written bytes even without a flush. This would not be the case if the file was opened by a different process or was reopened.

Edit: OP Claims this flush was necessary in their case and I cannot provide evidence to disagree. Flush will not hurt things either.

Demonstration:

package main

import (
	"fmt"
	"io"
	"os"
)

func main() {
	f, err := os.CreateTemp("", "data.csv")
	if err != nil {
		panic(err)
	} else {
        defer f.Close()
		defer os.Remove(f.Name())
	}
	fmt.Fprintf(f, "hello, world")
	fmt.Fprintln(os.Stderr, "Before rewind: ")
	if _, err := io.Copy(os.Stderr, f); err != nil {
		panic(err)
	}
	f.Seek(0, io.SeekStart)
	fmt.Fprintln(os.Stderr, "\nAfter rewind: ")
	if _, err := io.Copy(os.Stderr, f); err != nil {
		panic(err)
	}
	fmt.Fprintln(os.Stderr, "\n")
}
% go run t.go
Before rewind:

After rewind:
hello, world

huangapple
  • 本文由 发表于 2021年10月22日 23:48:47
  • 转载请务必保留本文链接:https://go.coder-hub.com/69679747.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定