英文:
Protobuf type for bigquery TIMESTAMP field
问题
我正在使用新的Storage API从Golang将数据流式传输到BigQuery。我的BigQuery表的模式包含一个TIMESTAMP字段,如下所示:
bq mk -t mydataset.mytable name:string,lastseen:timestamp
另外,我已经定义了一个类似于以下的协议缓冲区:
message Row {
string Name = 1;
google.protobuf.Timestamp LastSeen = 3;
}
然而,当我将这些数据提交到BigQuery时,我收到以下错误:
rpc error: code = InvalidArgument desc = The proto field mismatched with BigQuery field at tutorial_Row.LastSeen, the proto field type message, BigQuery field type TIMESTAMP
看起来google.protobuf.Timestamp
协议缓冲区与BigQuery中的TIMESTAMP类型不对应。这是有道理的,因为BigQuery文档中说TIMESTAMP包含一个时区,但google.protobuf.Timestamp
不包含时区。那么我应该使用哪个协议缓冲区呢?
我正在使用从这个存储库派生的代码,它看起来像这样:
import (
"context"
"fmt"
"log"
storage "cloud.google.com/go/bigquery/storage/apiv1beta2"
"cloud.google.com/go/bigquery/storage/managedwriter/adapt"
storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1beta2"
"google.golang.org/protobuf/proto"
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
)
const (
project = "myproject"
dataset = "mydataset"
table = "mytable2"
)
func main() {
ctx := context.Background()
// 我们将要流式传输到BigQuery的数据
var rows = []*Row{
{Name: "John Doe", Age: 104, LastSeen: timestamppb.Now()},
{Name: "Jane Doe", Age: 69, LastSeen: timestamppb.Now()},
{Name: "Adam Smith", Age: 33, LastSeen: timestamppb.Now()},
}
// 创建BigQuery客户端
client, err := storage.NewBigQueryWriteClient(ctx)
if err != nil {
log.Fatal(err)
}
defer client.Close()
// 创建写入流
// COMMITTED写入流会立即将数据插入到BigQuery中
resp, err := client.CreateWriteStream(ctx, &storagepb.CreateWriteStreamRequest{
Parent: fmt.Sprintf("projects/%s/datasets/%s/tables/%s", project, dataset, table),
WriteStream: &storagepb.WriteStream{
Type: storagepb.WriteStream_COMMITTED,
},
})
if err != nil {
log.Fatal("CreateWriteStream: ", err)
}
// 通过调用AppendRows获取流
stream, err := client.AppendRows(ctx)
if err != nil {
log.Fatal("AppendRows: ", err)
}
// 获取我们行类型的协议缓冲区描述符
var row Row
descriptor, err := adapt.NormalizeDescriptor(row.ProtoReflect().Descriptor())
if err != nil {
log.Fatal("NormalizeDescriptor: ", err)
}
// 序列化行
var opts proto.MarshalOptions
var data [][]byte
for _, row := range rows {
buf, err := opts.Marshal(row)
if err != nil {
log.Fatal("protobuf.Marshal: ", err)
}
data = append(data, buf)
}
// 将行发送到BigQuery
err = stream.Send(&storagepb.AppendRowsRequest{
WriteStream: resp.Name,
Rows: &storagepb.AppendRowsRequest_ProtoRows{
ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
// 协议缓冲区模式
WriterSchema: &storagepb.ProtoSchema{
ProtoDescriptor: descriptor,
},
// 协议缓冲区数据
Rows: &storagepb.ProtoRows{
SerializedRows: data, // 序列化的协议缓冲区数据
},
},
},
})
if err != nil {
log.Fatal("AppendRows.Send: ", err)
}
// 获取响应,告诉我们是否成功
_, err = stream.Recv()
if err != nil {
log.Fatal("AppendRows.Recv: ", err)
}
log.Println("done")
}
英文:
I am streaming data to bigquery from Golang using the new Storage API. The schema of my bigquery table contains a TIMESTAMP field as follows:
bq mk -t mydataset.mytable name:string,lastseen:timestamp
Separately I have defined a protocol buffer like this:
message Row {
string Name = 1;
google.protobuf.Timestamp LastSeen = 3;
}
However, when I submit this data to BigQuery, I get the following error:
rpc error: code = InvalidArgument desc = The proto field mismatched with BigQuery field at tutorial_Row.LastSeen, the proto field type message, BigQuery field type TIMESTAMP
It seems that the google.protobuf.Timestamp
protobuf does not correspond to the TIMESTAMP type in bigquery. This makes sense because the bigquery docs say that a TIMESTAMP contains a timezone, but google.protobuf.Timestamp
does not contain a timezone. But which protocol buffer then should I use?
I'm using code derived from this repository, which looks like this:
import (
"context"
"fmt"
"log"
storage "cloud.google.com/go/bigquery/storage/apiv1beta2"
"cloud.google.com/go/bigquery/storage/managedwriter/adapt"
storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1beta2"
"google.golang.org/protobuf/proto"
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
)
const (
project = "myproject"
dataset = "mydataset"
table = "mytable2"
)
func main() {
ctx := context.Background()
// the data we will stream to bigquery
var rows = []*Row{
{Name: "John Doe", Age: 104, LastSeen: timestamppb.Now()},
{Name: "Jane Doe", Age: 69, LastSeen: timestamppb.Now()},
{Name: "Adam Smith", Age: 33, LastSeen: timestamppb.Now()},
}
// create the bigquery client
client, err := storage.NewBigQueryWriteClient(ctx)
if err != nil {
log.Fatal(err)
}
defer client.Close()
// create the write stream
// a COMMITTED write stream inserts data immediately into bigquery
resp, err := client.CreateWriteStream(ctx, &storagepb.CreateWriteStreamRequest{
Parent: fmt.Sprintf("projects/%s/datasets/%s/tables/%s", project, dataset, table),
WriteStream: &storagepb.WriteStream{
Type: storagepb.WriteStream_COMMITTED,
},
})
if err != nil {
log.Fatal("CreateWriteStream: ", err)
}
// get the stream by calling AppendRows
stream, err := client.AppendRows(ctx)
if err != nil {
log.Fatal("AppendRows: ", err)
}
// get the protobuf descriptor for our row type
var row Row
descriptor, err := adapt.NormalizeDescriptor(row.ProtoReflect().Descriptor())
if err != nil {
log.Fatal("NormalizeDescriptor: ", err)
}
// serialize the rows
var opts proto.MarshalOptions
var data [][]byte
for _, row := range rows {
buf, err := opts.Marshal(row)
if err != nil {
log.Fatal("protobuf.Marshal: ", err)
}
data = append(data, buf)
}
// send the rows to bigquery
err = stream.Send(&storagepb.AppendRowsRequest{
WriteStream: resp.Name,
Rows: &storagepb.AppendRowsRequest_ProtoRows{
ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
// protocol buffer schema
WriterSchema: &storagepb.ProtoSchema{
ProtoDescriptor: descriptor,
},
// protocol buffer data
Rows: &storagepb.ProtoRows{
SerializedRows: data, // serialized protocol buffer data
},
},
},
})
if err != nil {
log.Fatal("AppendRows.Send: ", err)
}
// get the response, which will tell us whether it worked
_, err = stream.Recv()
if err != nil {
log.Fatal("AppendRows.Recv: ", err)
}
log.Println("done")
}
答案1
得分: 4
是的,后端无法正确解码 proto Timestamp 消息。
最快的解决方法是:发送 int64 类型,并将其填充为以微秒为单位的纪元时间。
https://cloud.google.com/bigquery/docs/write-api#data_type_conversions
英文:
Yeah, the backend doesn't decode proto Timestamp messages properly.
Quickest resolution answer: send int64 type, populate as epoch microseconds.
https://cloud.google.com/bigquery/docs/write-api#data_type_conversions
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论