英文:
Loading CSV file into bigquery after os.Create() doesn't load data
问题
我正在尝试运行以下流程:
- 从某个地方获取数据
- 创建新的本地 CSV 文件,将数据写入该文件
- 将 CSV 文件上传到 BigQuery
- 删除本地文件
但是似乎加载了空数据。
这是代码:
func (c *Client) Do(ctx context.Context) error {
bqClient, err := bigquerypkg.NewBigQueryUtil(ctx, "projectID", "datasetID")
if err != nil {
return err
}
data, err := c.GetSomeData(ctx)
if err != nil {
return err
}
file, err := os.Create("example.csv")
if err != nil {
return err
}
defer file.Close()
// 还需要删除文件
writer := csv.NewWriter(file)
defer writer.Flush()
timestamp := time.Now().UTC().Format("2006-01-02 03:04:05.000000000")
for _, d := range data {
csvRow := []string{
d.ID,
d.Name,
timestamp,
}
err = writer.Write(csvRow)
if err != nil {
log.Printf("error writing data to CSV: %v\n", err)
}
}
source := bigquery.NewReaderSource(file)
source.Schema = bigquery.Schema{
{Name: "id", Type: bigquery.StringFieldType},
{Name: "name", Type: bigquery.StringFieldType},
{Name: "createdAt", Type: bigquery.TimestampFieldType},
}
if _, err = bqClient.LoadCsv(ctx, "tableID", source); err != nil {
return err
}
return nil
}
LoadCSV()
的代码如下:
func (c *Client) LoadCsv(ctx context.Context, tableID string, src bigquery.LoadSource) (string, error) {
loader := c.bigQueryClient.Dataset(c.datasetID).Table(tableID).LoaderFrom(src)
loader.WriteDisposition = bigquery.WriteTruncate
job, err := loader.Run(ctx)
if err != nil {
return "", err
}
status, err := job.Wait(ctx)
if err != nil {
return job.ID(), err
}
if status.Err() != nil {
return job.ID(), fmt.Errorf("job completed with error: %v", status.Err())
}
return job.ID(), nil
}
运行后,BigQuery 确实创建了模式,但没有数据。
如果我将 os.Create()
更改为 os.Open()
,并且文件已经存在,一切正常。就好像在加载 CSV 时,文件数据尚未被写入一样(?)
原因是什么?
英文:
I'm trying to run the following flow:
- Get data from somewhere
- Create new local CSV file, write the data into that file
- Upload the CSV to Bigquery
- Delete the local file
But it seems to load empty data.
This is the code:
func (c *Client) Do(ctx context.Context) error {
bqClient, err := bigquerypkg.NewBigQueryUtil(ctx, "projectID", "datasetID")
if err != nil {
return err
}
data, err := c.GetSomeData(ctx)
if err != nil {
return err
}
file, err := os.Create("example.csv")
if err != nil {
return err
}
defer file.Close()
// also file need to be delete
writer := csv.NewWriter(file)
defer writer.Flush()
timestamp := time.Now().UTC().Format("2006-01-02 03:04:05.000000000")
for _, d := range data {
csvRow := []string{
d.ID,
d.Name,
timestamp,
}
err = writer.Write(csvRow)
if err != nil {
log.Printf("error writing data to CSV: %v\n", err)
}
}
source := bigquery.NewReaderSource(file)
source.Schema = bigquery.Schema{
{Name: "id", Type: bigquery.StringFieldType},
{Name: "name", Type: bigquery.StringFieldType},
{Name: "createdAt", Type: bigquery.TimestampFieldType},
}
if _, err = bqClient.LoadCsv(ctx, "tableID", source); err != nil {
return err
}
return nil
}
LoadCSV()
looks like this:
func (c *Client) LoadCsv(ctx context.Context, tableID string, src bigquery.LoadSource) (string, error) {
loader := c.bigQueryClient.Dataset(c.datasetID).Table(tableID).LoaderFrom(src)
loader.WriteDisposition = bigquery.WriteTruncate
job, err := loader.Run(ctx)
if err != nil {
return "", err
}
status, err := job.Wait(ctx)
if err != nil {
return job.ID(), err
}
if status.Err() != nil {
return job.ID(), fmt.Errorf("job completed with error: %v", status.Err())
}
return job.ID(), nil
}
After running this, bigquery does create the schema but with no data.
If I'm changing os.Create()
to os.Open()
and the file already exist, everything work. It's like when loading the CSV the file data is not yet written (?)
What's the reason?
答案1
得分: 1
我看到的问题是你没有将文件句柄的光标倒回到文件的开头。因此,下一次读取将在文件的末尾进行,并且将读取0字节。这就解释了为什么文件中似乎没有内容。
https://pkg.go.dev/os#File.Seek 可以帮助你处理这个问题。
实际上,Flush
不相关,因为你使用的是同一个文件句柄来读取文件,就像你用来写入文件一样,所以你会看到自己写入的字节,即使没有刷新。如果文件是由不同的进程打开或重新打开,情况就不同了。
演示代码如下:
package main
import (
"fmt"
"io"
"os"
)
func main() {
f, err := os.CreateTemp("", "data.csv")
if err != nil {
panic(err)
} else {
defer f.Close()
defer os.Remove(f.Name())
}
fmt.Fprintf(f, "hello, world")
fmt.Fprintln(os.Stderr, "Before rewind:")
if _, err := io.Copy(os.Stderr, f); err != nil {
panic(err)
}
f.Seek(0, io.SeekStart)
fmt.Fprintln(os.Stderr, "\nAfter rewind:")
if _, err := io.Copy(os.Stderr, f); err != nil {
panic(err)
}
fmt.Fprintln(os.Stderr, "\n")
}
运行结果如下:
% go run t.go
Before rewind:
After rewind:
hello, world
英文:
The problem I see here is that you don't rewind the file handle's cursor to the beginning of the file. Thus, the next read will be at the end of the file, and will be a 0 byte read. That explains why it seems like there's no content in the file.
https://pkg.go.dev/os#File.Seek can handle this for you.
Actually, the Flush
is not relevant, because you're using the same file handle to read the file than you did to write it, so you'll see your own written bytes even without a flush. This would not be the case if the file was opened by a different process or was reopened.
Edit: OP Claims this flush was necessary in their case and I cannot provide evidence to disagree. Flush will not hurt things either.
Demonstration:
package main
import (
"fmt"
"io"
"os"
)
func main() {
f, err := os.CreateTemp("", "data.csv")
if err != nil {
panic(err)
} else {
defer f.Close()
defer os.Remove(f.Name())
}
fmt.Fprintf(f, "hello, world")
fmt.Fprintln(os.Stderr, "Before rewind: ")
if _, err := io.Copy(os.Stderr, f); err != nil {
panic(err)
}
f.Seek(0, io.SeekStart)
fmt.Fprintln(os.Stderr, "\nAfter rewind: ")
if _, err := io.Copy(os.Stderr, f); err != nil {
panic(err)
}
fmt.Fprintln(os.Stderr, "\n")
}
% go run t.go
Before rewind:
After rewind:
hello, world
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论