英文:
BigQuery nullable types in golang when using BigQuery storage write API
问题
我正在切换从传统的流式 API 到 存储写入 API,按照这个 Golang 示例进行操作:
https://github.com/alexflint/bigquery-storage-api-example
在旧代码中,我使用了 bigquery 的 null 类型来表示字段可以为空:
type Person struct {
Name bigquery.NullString `bigquery:"name"`
Age bigquery.NullInt64 `bigquery:"age"`
}
var persons = []Person{
{
Name: ToBigqueryNullableString(""),
Age: ToBigqueryNullableInt64("20"),
},
{
Name: ToBigqueryNullableString("David"),
Age: ToBigqueryNullableInt64("60"),
},
}
func main() {
ctx := context.Background()
bigqueryClient, _ := bigquery.NewClient(ctx, "project-id")
inserter := bigqueryClient.Dataset("dataset-id").Table("table-id").Inserter()
err := inserter.Put(ctx, persons)
if err != nil {
log.Fatal(err)
}
}
func ToBigqueryNullableString(x string) bigquery.NullString {
if x == "" {
return bigquery.NullString{Valid: false}
}
return bigquery.NullString{StringVal: x, Valid: true}
}
func ToBigqueryNullableInt64(x string) bigquery.NullInt64 {
if x == "" {
return bigquery.NullInt64{Valid: false}
}
if s, err := strconv.ParseInt(x, 10, 64); err == nil {
return bigquery.NullInt64{Int64: s, Valid: true}
}
return bigquery.NullInt64{Valid: false}
}
切换到新的 API 后:
var persons = []*personpb.Row{
{
Name: "",
Age: 20,
},
{
Name: "David",
Age: 60,
},
}
func main() {
ctx := context.Background()
client, _ := storage.NewBigQueryWriteClient(ctx)
defer client.Close()
stream, err := client.AppendRows(ctx)
if err != nil {
log.Fatal("AppendRows: ", err)
}
var row personpb.Row
descriptor, err := adapt.NormalizeDescriptor(row.ProtoReflect().Descriptor())
if err != nil {
log.Fatal("NormalizeDescriptor: ", err)
}
var opts proto.MarshalOptions
var data [][]byte
for _, row := range persons {
buf, err := opts.Marshal(row)
if err != nil {
log.Fatal("protobuf.Marshal: ", err)
}
data = append(data, buf)
}
err = stream.Send(&storagepb.AppendRowsRequest{
WriteStream: fmt.Sprintf("projects/%s/datasets/%s/tables/%s/streams/_default", "project-id", "dataset-id", "table-id"),
Rows: &storagepb.AppendRowsRequest_ProtoRows{
ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
WriterSchema: &storagepb.ProtoSchema{
ProtoDescriptor: descriptor,
},
Rows: &storagepb.ProtoRows{
SerializedRows: data,
},
},
},
})
if err != nil {
log.Fatal("AppendRows.Send: ", err)
}
_, err = stream.Recv()
if err != nil {
log.Fatal("AppendRows.Recv: ", err)
}
}
使用新的 API,我需要在 .proto 文件中定义类型,所以我需要使用其他方法来定义可为空的字段,我尝试使用可选字段:
syntax = "proto3";
package person;
option go_package = "/personpb";
message Row {
optional string name = 1;
int64 age = 2;
}
但是在尝试流式传输时出现错误(不是在编译时):
BqMessage.proto: person_Row.Name: The [proto3_optional=true] option may only be set on proto3fields, not person_Row.Name
我尝试的另一个选项是使用 oneof
,并像这样编写 proto 文件:
syntax = "proto3";
import "google/protobuf/struct.proto";
package person;
option go_package = "/personpb";
message Row {
NullableString name = 1;
int64 age = 2;
}
message NullableString {
oneof kind {
google.protobuf.NullValue null = 1;
string data = 2;
}
}
然后像这样使用它:
var persons = []*personpb.Row{
{
Name: &personpb.NullableString{Kind: &personpb.NullableString_Null{
Null: structpb.NullValue_NULL_VALUE,
}},
Age: 20,
},
{
Name: &personpb.NullableString{Kind: &personpb.NullableString_Data{
Data: "David",
}},
Age: 60,
},
}
...
但是这给我带来了以下错误:
Invalid proto schema: BqMessage.proto: person_Row.person_NullableString.null: FieldDescriptorProto.oneof_index 0 is out of range for type "person_NullableString".
我猜是因为 API 不知道如何处理 oneof
类型,我需要以某种方式告诉它。
在使用新的存储 API 时,我如何使用类似 bigquery.Nullable
类型?任何帮助将不胜感激。
英文:
I'm switching from the legacy streaming API to the storage write API following this example in golang:
https://github.com/alexflint/bigquery-storage-api-example
In the old code I used bigquery's null types to indicate a field can be null:
type Person struct {
Name bigquery.NullString `bigquery:"name"`
Age bigquery.NullInt64 `bigquery:"age"`
}
var persons = []Person{
{
Name: ToBigqueryNullableString(""), // this will be null in bigquery
Age: ToBigqueryNullableInt64("20"),
},
{
Name: ToBigqueryNullableString("David"),
Age: ToBigqueryNullableInt64("60"),
},
}
func main() {
ctx := context.Background()
bigqueryClient, _ := bigquery.NewClient(ctx, "project-id")
inserter := bigqueryClient.Dataset("dataset-id").Table("table-id").Inserter()
err := inserter.Put(ctx, persons)
if err != nil {
log.Fatal(err)
}
}
func ToBigqueryNullableString(x string) bigquery.NullString {
if x == "" {
return bigquery.NullString{Valid: false}
}
return bigquery.NullString{StringVal: x, Valid: true}
}
func ToBigqueryNullableInt64(x string) bigquery.NullInt64 {
if x == "" {
return bigquery.NullInt64{Valid: false}
}
if s, err := strconv.ParseInt(x, 10, 64); err == nil {
return bigquery.NullInt64{Int64: s, Valid: true}
}
return bigquery.NullInt64{Valid: false}
}
After switching to the new API:
var persons = []*personpb.Row{
{
Name: "",
Age: 20,
},
{
Name: "David",
Age: 60,
},
}
func main() {
ctx := context.Background()
client, _ := storage.NewBigQueryWriteClient(ctx)
defer client.Close()
stream, err := client.AppendRows(ctx)
if err != nil {
log.Fatal("AppendRows: ", err)
}
var row personpb.Row
descriptor, err := adapt.NormalizeDescriptor(row.ProtoReflect().Descriptor())
if err != nil {
log.Fatal("NormalizeDescriptor: ", err)
}
var opts proto.MarshalOptions
var data [][]byte
for _, row := range persons {
buf, err := opts.Marshal(row)
if err != nil {
log.Fatal("protobuf.Marshal: ", err)
}
data = append(data, buf)
}
err = stream.Send(&storagepb.AppendRowsRequest{
WriteStream: fmt.Sprintf("projects/%s/datasets/%s/tables/%s/streams/_default", "project-id", "dataset-id", "table-id"),
Rows: &storagepb.AppendRowsRequest_ProtoRows{
ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
WriterSchema: &storagepb.ProtoSchema{
ProtoDescriptor: descriptor,
},
Rows: &storagepb.ProtoRows{
SerializedRows: data,
},
},
},
})
if err != nil {
log.Fatal("AppendRows.Send: ", err)
}
_, err = stream.Recv()
if err != nil {
log.Fatal("AppendRows.Recv: ", err)
}
}
With the new API I need to define the types in a .proto file, so I need to use something else to define nullable fields, I tried with optional fields:
syntax = "proto3";
package person;
option go_package = "/personpb";
message Row {
optional string name = 1;
int64 age = 2;
}
but it gives me error when trying to stream (not in the compile time):
BqMessage.proto: person_Row.Name: The [proto3_optional=true] option may only be set on proto3fields, not person_Row.Name
Another option I tried is to use oneof
, and write the proto file like this
syntax = "proto3";
import "google/protobuf/struct.proto";
package person;
option go_package = "/personpb";
message Row {
NullableString name = 1;
int64 age = 2;
}
message NullableString {
oneof kind {
google.protobuf.NullValue null = 1;
string data = 2;
}
}
Then use it like this:
var persons = []*personpb.Row{
{
Name: &personpb.NullableString{Kind: &personpb.NullableString_Null{
Null: structpb.NullValue_NULL_VALUE,
}},
Age: 20,
},
{
Name: &personpb.NullableString{Kind: &personpb.NullableString_Data{
Data: "David",
}},
Age: 60,
},
}
...
But this gives me the following error:
Invalid proto schema: BqMessage.proto: person_Row.person_NullableString.null: FieldDescriptorProto.oneof_index 0 is out of range for type "person_NullableString".
I guess because the api doesn't know how to handle oneof type, I need to tell it somehow about this.
How can I use something like bigquery.Nullable
types when using the new storage API? Any help will be appreciated
答案1
得分: 1
请看一下这个示例,它展示了在Go语言中使用proto2语法文件的端到端示例。
当使用Storage API时,proto3仍然有一些特殊之处,原因如下:
- Storage API的当前行为是使用proto2语义进行操作。
- 目前,Storage API不理解包装类型,这是proto3最初用于表示可选存在性的方式(例如,在BigQuery字段中表示NULL)。因此,它倾向于将包装字段视为具有值字段的子消息(在BigQuery中,是一个具有单个叶字段的STRUCT)。
- 在其演变的后期,proto3重新引入了
optional
关键字作为标记存在性的方式,但在内部表示中,这意味着添加了另一个存在性标记(这是您在后端错误中观察到的proto3_optional
警告的来源)。
看起来您正在使用较新的外观,特别是adapt.NormalizeDescriptor()
。我怀疑如果您正在使用这个函数,可能是因为您使用的是较旧的模块版本,因为规范化代码在这个PR中进行了更新,并作为bigquery/v1.33.0
的一部分发布。
我们正在努力改进Storage API的使用体验,使整体体验更加顺畅,但仍有工作要做。
英文:
Take a look at this sample for an end to end example using a proto2 syntax file in go.
proto3 is still a bit of a special beast when working with the Storage API, for a couple reasons:
- The current behavior of the Storage API is to operate using proto2 semantics.
- Currently, the Storage API doesn't understand wrapper types, which was the original way in which proto3 was meant to communicate optional presence (e.g. NULL in BigQuery fields). Because of this, it tends to treat wrapper fields as a submessage with a value field (in BigQuery, a STRUCT with a single leaf field).
- Later in its evolution, proto3 reintroduced the
optional
keyword as a way of marking presence, but in the internal representation this meant adding another presence marker (the source of theproto3_optional
warning you were observing in the backend error).
It looks like you've using bits of the newer veneer, particularly adapt.NormalizeDescriptor()
. I suspect if you're using this, you may be using an older version of the module, as the normalization code was updated in this PR and released as part of bigquery/v1.33.0
.
There's work to improve the experiences with the storage API and make the overall experience smoother, but there's still work to be done.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论