大型查询的Protobuf TIMESTAMP字段类型

huangapple go评论93阅读模式
英文:

Protobuf type for bigquery TIMESTAMP field

问题

我正在使用新的Storage API从Golang将数据流式传输到BigQuery。我的BigQuery表的模式包含一个TIMESTAMP字段,如下所示:

bq mk -t mydataset.mytable name:string,lastseen:timestamp

另外,我已经定义了一个类似于以下的协议缓冲区:

message Row {
	string Name = 1;
	google.protobuf.Timestamp LastSeen = 3;
}

然而,当我将这些数据提交到BigQuery时,我收到以下错误:

rpc error: code = InvalidArgument desc = The proto field mismatched with BigQuery field at tutorial_Row.LastSeen, the proto field type message, BigQuery field type TIMESTAMP

看起来google.protobuf.Timestamp协议缓冲区与BigQuery中的TIMESTAMP类型不对应。这是有道理的,因为BigQuery文档中说TIMESTAMP包含一个时区,但google.protobuf.Timestamp不包含时区。那么我应该使用哪个协议缓冲区呢?

我正在使用从这个存储库派生的代码,它看起来像这样:

import (
	"context"
	"fmt"
	"log"

	storage "cloud.google.com/go/bigquery/storage/apiv1beta2"
	"cloud.google.com/go/bigquery/storage/managedwriter/adapt"
	storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1beta2"
	"google.golang.org/protobuf/proto"
	timestamppb "google.golang.org/protobuf/types/known/timestamppb"
)

const (
	project = "myproject"
	dataset = "mydataset"
	table   = "mytable2"
)

func main() {
	ctx := context.Background()

	// 我们将要流式传输到BigQuery的数据
	var rows = []*Row{
		{Name: "John Doe", Age: 104, LastSeen: timestamppb.Now()},
		{Name: "Jane Doe", Age: 69, LastSeen: timestamppb.Now()},
		{Name: "Adam Smith", Age: 33, LastSeen: timestamppb.Now()},
	}

	// 创建BigQuery客户端
	client, err := storage.NewBigQueryWriteClient(ctx)
	if err != nil {
		log.Fatal(err)
	}
	defer client.Close()

	// 创建写入流
	// COMMITTED写入流会立即将数据插入到BigQuery中
	resp, err := client.CreateWriteStream(ctx, &storagepb.CreateWriteStreamRequest{
		Parent: fmt.Sprintf("projects/%s/datasets/%s/tables/%s", project, dataset, table),
		WriteStream: &storagepb.WriteStream{
			Type: storagepb.WriteStream_COMMITTED,
		},
	})
	if err != nil {
		log.Fatal("CreateWriteStream: ", err)
	}

	// 通过调用AppendRows获取流
	stream, err := client.AppendRows(ctx)
	if err != nil {
		log.Fatal("AppendRows: ", err)
	}

	// 获取我们行类型的协议缓冲区描述符
	var row Row
	descriptor, err := adapt.NormalizeDescriptor(row.ProtoReflect().Descriptor())
	if err != nil {
		log.Fatal("NormalizeDescriptor: ", err)
	}

	// 序列化行
	var opts proto.MarshalOptions
	var data [][]byte
	for _, row := range rows {
		buf, err := opts.Marshal(row)
		if err != nil {
			log.Fatal("protobuf.Marshal: ", err)
		}
		data = append(data, buf)
	}

	// 将行发送到BigQuery
	err = stream.Send(&storagepb.AppendRowsRequest{
		WriteStream: resp.Name,
		Rows: &storagepb.AppendRowsRequest_ProtoRows{
			ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
				// 协议缓冲区模式
				WriterSchema: &storagepb.ProtoSchema{
					ProtoDescriptor: descriptor,
				},
				// 协议缓冲区数据
				Rows: &storagepb.ProtoRows{
					SerializedRows: data, // 序列化的协议缓冲区数据
				},
			},
		},
	})
	if err != nil {
		log.Fatal("AppendRows.Send: ", err)
	}

	// 获取响应,告诉我们是否成功
	_, err = stream.Recv()
	if err != nil {
		log.Fatal("AppendRows.Recv: ", err)
	}

	log.Println("done")
}
英文:

I am streaming data to bigquery from Golang using the new Storage API. The schema of my bigquery table contains a TIMESTAMP field as follows:

bq mk -t mydataset.mytable name:string,lastseen:timestamp

Separately I have defined a protocol buffer like this:

message Row {
	string Name = 1;
	google.protobuf.Timestamp LastSeen = 3;
}

However, when I submit this data to BigQuery, I get the following error:

rpc error: code = InvalidArgument desc = The proto field mismatched with BigQuery field at tutorial_Row.LastSeen, the proto field type message, BigQuery field type TIMESTAMP

It seems that the google.protobuf.Timestamp protobuf does not correspond to the TIMESTAMP type in bigquery. This makes sense because the bigquery docs say that a TIMESTAMP contains a timezone, but google.protobuf.Timestamp does not contain a timezone. But which protocol buffer then should I use?

I'm using code derived from this repository, which looks like this:

import (
	"context"
	"fmt"
	"log"

	storage "cloud.google.com/go/bigquery/storage/apiv1beta2"
	"cloud.google.com/go/bigquery/storage/managedwriter/adapt"
	storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1beta2"
	"google.golang.org/protobuf/proto"
	timestamppb "google.golang.org/protobuf/types/known/timestamppb"
)

const (
	project = "myproject"
	dataset = "mydataset"
	table   = "mytable2"
)

func main() {
	ctx := context.Background()

	// the data we will stream to bigquery
	var rows = []*Row{
		{Name: "John Doe", Age: 104, LastSeen: timestamppb.Now()},
		{Name: "Jane Doe", Age: 69, LastSeen: timestamppb.Now()},
		{Name: "Adam Smith", Age: 33, LastSeen: timestamppb.Now()},
	}

	// create the bigquery client
	client, err := storage.NewBigQueryWriteClient(ctx)
	if err != nil {
		log.Fatal(err)
	}
	defer client.Close()

	// create the write stream
	// a COMMITTED write stream inserts data immediately into bigquery
	resp, err := client.CreateWriteStream(ctx, &storagepb.CreateWriteStreamRequest{
		Parent: fmt.Sprintf("projects/%s/datasets/%s/tables/%s", project, dataset, table),
		WriteStream: &storagepb.WriteStream{
			Type: storagepb.WriteStream_COMMITTED,
		},
	})
	if err != nil {
		log.Fatal("CreateWriteStream: ", err)
	}

	// get the stream by calling AppendRows
	stream, err := client.AppendRows(ctx)
	if err != nil {
		log.Fatal("AppendRows: ", err)
	}

	// get the protobuf descriptor for our row type
	var row Row
	descriptor, err := adapt.NormalizeDescriptor(row.ProtoReflect().Descriptor())
	if err != nil {
		log.Fatal("NormalizeDescriptor: ", err)
	}

	// serialize the rows
	var opts proto.MarshalOptions
	var data [][]byte
	for _, row := range rows {
		buf, err := opts.Marshal(row)
		if err != nil {
			log.Fatal("protobuf.Marshal: ", err)
		}
		data = append(data, buf)
	}

	// send the rows to bigquery
	err = stream.Send(&storagepb.AppendRowsRequest{
		WriteStream: resp.Name,
		Rows: &storagepb.AppendRowsRequest_ProtoRows{
			ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
				// protocol buffer schema
				WriterSchema: &storagepb.ProtoSchema{
					ProtoDescriptor: descriptor,
				},
				// protocol buffer data
				Rows: &storagepb.ProtoRows{
					SerializedRows: data, // serialized protocol buffer data
				},
			},
		},
	})
	if err != nil {
		log.Fatal("AppendRows.Send: ", err)
	}

	// get the response, which will tell us whether it worked
	_, err = stream.Recv()
	if err != nil {
		log.Fatal("AppendRows.Recv: ", err)
	}

	log.Println("done")
}

答案1

得分: 4

是的,后端无法正确解码 proto Timestamp 消息。

最快的解决方法是:发送 int64 类型,并将其填充为以微秒为单位的纪元时间。

https://cloud.google.com/bigquery/docs/write-api#data_type_conversions

英文:

Yeah, the backend doesn't decode proto Timestamp messages properly.

Quickest resolution answer: send int64 type, populate as epoch microseconds.

https://cloud.google.com/bigquery/docs/write-api#data_type_conversions

huangapple
  • 本文由 发表于 2021年12月27日 05:15:53
  • 转载请务必保留本文链接:https://go.coder-hub.com/70489919.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定