使用Golang中的BigQuery Write API

huangapple go评论87阅读模式
英文:

Using BigQuery Write API in Golang

问题

我正在尝试使用新的BigQuery Storage API来从Golang进行流式插入。根据这个页面,我了解到这个API取代了旧的流式插入BigQuery API。

然而,文档中的示例都没有展示如何实际插入行。为了创建一个AppendRowsRequest,我得到了以下代码:

&storagepb.AppendRowsRequest{
	WriteStream: resp.Name,
	Rows: &storagepb.AppendRowsRequest_ProtoRows{
		ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
			WriterSchema: nil, // protobuf schema??
			Rows: &storagepb.ProtoRows{
				SerializedRows: [][]byte{}, // serialized protocol buffer data??
			},
		},
	},
}

上面的SerializedRows字段应该放入什么数据?

上面的storagepb.ProtoRows结构在这里有文档。不幸的是,文档只给出了一个指向协议缓冲区的主要概述页面的链接。

有人可以给我一个使用新的BigQuery Storage API从Golang流式插入行的示例吗?

英文:

I am trying to use the new Bigquery Storage API to do streaming inserts from Golang. I understand based on this page that this API replaces the old streaming insert bigquery API.

However, none of the examples in the docs show how to actually insert rows. In order to create an AppendRowsRequest, I have arrived at the following:

&storagepb.AppendRowsRequest{
	WriteStream: resp.Name,
	Rows: &storagepb.AppendRowsRequest_ProtoRows{
		ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
			WriterSchema: nil, // protobuf schema??
			Rows: &storagepb.ProtoRows{
				SerializedRows: [][]byte{}, // serialized protocol buffer data??
			},
		},
	},
}

What data should I put into the SerializedRows field above?

The storagepb.ProtoRows struct above is documented here. Unfortunately all that is given is a link to the main overview page for protocol buffers.

Can anyone give me an example of using the new Bigquery Storage API to stream rows into bigquery from Golang?

答案1

得分: 5

通过上面的答案的帮助,我已经得到了一个可用的示例,可以在GitHub上找到:https://github.com/alexflint/bigquery-storage-api-example

主要代码如下:

const (
    project = "myproject"
    dataset = "mydataset"
    table   = "mytable"
    trace   = "bigquery-writeclient-example" // 用于标识此客户端以进行bigquery调试
)

// 我们将要流式传输到bigquery的数据
var rows = []*Row{
    {Name: "John Doe", Age: 104},
    {Name: "Jane Doe", Age: 69},
    {Name: "Adam Smith", Age: 33},
}

func main() {
    ctx := context.Background()

    // 创建bigquery客户端
    client, err := storage.NewBigQueryWriteClient(ctx)
    if err != nil {
        log.Fatal(err)
    }
    defer client.Close()

    // 创建写入流
    // COMMITTED写入流会立即将数据插入到bigquery中
    resp, err := client.CreateWriteStream(ctx, &storagepb.CreateWriteStreamRequest{
        Parent: fmt.Sprintf("projects/%s/datasets/%s/tables/%s", project, dataset, table),
        WriteStream: &storagepb.WriteStream{
            Type: storagepb.WriteStream_COMMITTED,
        },
    })
    if err != nil {
        log.Fatal("CreateWriteStream: ", err)
    }

    // 通过调用AppendRows获取流
    stream, err := client.AppendRows(ctx)
    if err != nil {
        log.Fatal("AppendRows: ", err)
    }

    // 获取我们行类型的protobuf描述符
    var row Row
    descriptor, err := adapt.NormalizeDescriptor(row.ProtoReflect().Descriptor())
    if err != nil {
        log.Fatal("NormalizeDescriptor: ", err)
    }

    // 序列化行
    var opts proto.MarshalOptions
    var data [][]byte
    for _, row := range rows {
        buf, err := opts.Marshal(row)
        if err != nil {
            log.Fatal("protobuf.Marshal: ", err)
        }
        data = append(data, buf)
    }

    // 将行发送到bigquery
    err = stream.Send(&storagepb.AppendRowsRequest{
        WriteStream: resp.Name,
        TraceId:     trace, // 标识此客户端
        Rows: &storagepb.AppendRowsRequest_ProtoRows{
            ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
                // 协议缓冲区模式
                WriterSchema: &storagepb.ProtoSchema{
                    ProtoDescriptor: descriptor,
                },
                // 协议缓冲区数据
                Rows: &storagepb.ProtoRows{
                    SerializedRows: data, // 序列化的协议缓冲区数据
                },
            },
        },
    })
    if err != nil {
        log.Fatal("AppendRows.Send: ", err)
    }

    // 获取响应,告诉我们是否成功
    _, err = stream.Recv()
    if err != nil {
        log.Fatal("AppendRows.Recv: ", err)
    }

    log.Println("done")
}

上面的"Row"结构的协议缓冲区定义如下:

syntax = "proto3";

package tutorial;

option go_package = ".;main";

message Row {
    string Name = 1;
    int32 Age = 2;
}

您需要首先创建一个具有与协议缓冲区对应的模式的bigquery数据集和表。请参阅上面链接的存储库中的自述文件以了解如何执行此操作。

运行上面的代码后,数据将显示在bigquery中,如下所示:

$ bq query 'select * from mydataset.mytable'
Waiting on bqjob_r1b39442e5474a885_0000017df21f629e_1 ... (0s) Current status: DONE   
+------------+-----+
|    name    | age |
+------------+-----+
| John Doe   | 104 |
| Jane Doe   |  69 |
| Adam Smith |  33 |
+------------+-----+

感谢大家的帮助!

英文:

With much help from the answers above I have come to a working example, which is available on github:
https://github.com/alexflint/bigquery-storage-api-example

The main code is:

const (
	project = "myproject"
	dataset = "mydataset"
	table   = "mytable"
	trace   = "bigquery-writeclient-example" // identifies this client for bigquery debugging
)

// the data we will stream to bigquery
var rows = []*Row{
	{Name: "John Doe", Age: 104},
	{Name: "Jane Doe", Age: 69},
	{Name: "Adam Smith", Age: 33},
}

func main() {
	ctx := context.Background()

	// create the bigquery client
	client, err := storage.NewBigQueryWriteClient(ctx)
	if err != nil {
		log.Fatal(err)
	}
	defer client.Close()

	// create the write stream
	// a COMMITTED write stream inserts data immediately into bigquery
	resp, err := client.CreateWriteStream(ctx, &storagepb.CreateWriteStreamRequest{
		Parent: fmt.Sprintf("projects/%s/datasets/%s/tables/%s", project, dataset, table),
		WriteStream: &storagepb.WriteStream{
			Type: storagepb.WriteStream_COMMITTED,
		},
	})
	if err != nil {
		log.Fatal("CreateWriteStream: ", err)
	}

	// get the stream by calling AppendRows
	stream, err := client.AppendRows(ctx)
	if err != nil {
		log.Fatal("AppendRows: ", err)
	}

	// get the protobuf descriptor for our row type
	var row Row
	descriptor, err := adapt.NormalizeDescriptor(row.ProtoReflect().Descriptor())
	if err != nil {
		log.Fatal("NormalizeDescriptor: ", err)
	}

	// serialize the rows
	var opts proto.MarshalOptions
	var data [][]byte
	for _, row := range rows {
		buf, err := opts.Marshal(row)
		if err != nil {
			log.Fatal("protobuf.Marshal: ", err)
		}
		data = append(data, buf)
	}

	// send the rows to bigquery
	err = stream.Send(&storagepb.AppendRowsRequest{
		WriteStream: resp.Name,
		TraceId:     trace, // identifies this client
		Rows: &storagepb.AppendRowsRequest_ProtoRows{
			ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
				// protocol buffer schema
				WriterSchema: &storagepb.ProtoSchema{
					ProtoDescriptor: descriptor,
				},
				// protocol buffer data
				Rows: &storagepb.ProtoRows{
					SerializedRows: data, // serialized protocol buffer data
				},
			},
		},
	})
	if err != nil {
		log.Fatal("AppendRows.Send: ", err)
	}

	// get the response, which will tell us whether it worked
	_, err = stream.Recv()
	if err != nil {
		log.Fatal("AppendRows.Recv: ", err)
	}

	log.Println("done")
}

And the protocol buffer definition for the "Row" struct above is:

syntax = "proto3";

package tutorial;

option go_package = ".;main";

message Row {
	string Name = 1;
	int32 Age = 2;
}

You need to create a bigquery dataset and table first with a schema that corresponds to the protocol buffer. See the readme in the repository linked above for how to do that.

After running the code above, the data shows up in bigquery like this:

$ bq query 'select * from mydataset.mytable'
Waiting on bqjob_r1b39442e5474a885_0000017df21f629e_1 ... (0s) Current status: DONE   
+------------+-----+
|    name    | age |
+------------+-----+
| John Doe   | 104 |
| Jane Doe   |  69 |
| Adam Smith |  33 |
+------------+-----+

Thanks all for the help!

答案2

得分: 1

我找到了一些关于向表中写入流的文档[1](https://cloud.google.com/go/docs/reference/cloud.google.com/go/bigquery/latest/storage/apiv1beta2#cloud_google_com_go_bigquery_storage_apiv1beta2_BigQueryWriteClient_CreateWriteStream)[[2]](https://pkg.go.dev/cloud.google.com/go/bigquery/storage/apiv1beta2#BigQueryWriteClient.CreateWriteStream),但我不确定这是否是你要找的内容。请注意,storage/apiv1beta2目前处于测试阶段,所以可能尚未实现或缺乏相关文档。如果我提供的文档对你没有帮助,我们可以开启一个公共问题跟踪器来正确记录或实现行流式传输。

英文:

I found out some documentation [1](https://cloud.google.com/go/docs/reference/cloud.google.com/go/bigquery/latest/storage/apiv1beta2#cloud_google_com_go_bigquery_storage_apiv1beta2_BigQueryWriteClient_CreateWriteStream)[[2]](https://pkg.go.dev/cloud.google.com/go/bigquery/storage/apiv1beta2#BigQueryWriteClient.CreateWriteStream) about writing streams to a table but I’m not really sure that this is what you’re looking for. Keep in mind that storage/apiv1beta2 is currently in beta state, so maybe this is not yet implemented or lacks documentation about it. If the documentation that I attached doesn’t help you we could open a public issue tracker to correctly document or implement the row streaming.

huangapple
  • 本文由 发表于 2021年12月9日 01:28:29
  • 转载请务必保留本文链接:https://go.coder-hub.com/70279279.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定