读取 Google Cloud Pubsub 消息并使用 Golang 将其写入 BigQuery。

huangapple go评论95阅读模式
英文:

Read Google Cloud Pubsub message and write to BigQuery using Golang

问题

我正在使用以下代码从Google Cloud Pubsub读取数据:

pubsubmessage := pubsubio.Read(s, project, *input, &pubsubio.ReadOptions{Subscription: sub.ID()})

以及以下代码将数据写入我的BigQuery数据集:

bigqueryio.Write(s, project, *output, pubsubmessage)

我得到了以下错误:

panic: schema type must be struct: []uint8
unable to convert []uint8/byte to schema type must be struct`

请帮助我。

我正在遵循这些示例:

https://github.com/apache/beam/blob/cea122724c5cd87a403684004452305ca64b3a68/sdks/go/examples/cookbook/max/max.go

https://github.com/apache/beam/blob/master/sdks/go/examples/streaming_wordcap/wordcap.go

英文:

I am using this code to read data from Google Cloud Pubsub:

pubsubmessage := pubsubio.Read(s, project, *input, &pubsubio.ReadOptions{Subscription: sub.ID()})

and this code to write to my bigquery data set :

bigqueryio.Write(s, project, *output, pubsubmessage)

I get the following error:

panic: schema type must be struct: []uint8
unable to convert []uint8/byte to schema type must be struct`

Please help me.

I am following these examples:

https://github.com/apache/beam/blob/cea122724c5cd87a403684004452305ca64b3a68/sdks/go/examples/cookbook/max/max.go

https://github.com/apache/beam/blob/master/sdks/go/examples/streaming_wordcap/wordcap.go

答案1

得分: 1

pubsubio.Read的返回值是一个Pubsub消息的PCollection。要将其转换为BigQuery行,您需要应用一个DoFn,它接受一个Pubsub消息并将其转换为BigQuery行。这将返回一个包含BigQuery行的PCollection,您可以将其传递给bigqueryio.Write。类似以下代码:

p := beam.NewPipeline()
s := p.Root()

pubsubmessages := pubsubio.Read(s, project, *input, &pubsubio.ReadOptions{Subscription: sub.ID()})

bigqueryrows := beam.ParDo(s, func(message []byte) string {
		return ...
}, pubsubmessages)

bigqueryio.Write(s, project, *output, bigqueryrows)

您需要将...替换为将Pubsub消息的原始字节转换为BigQuery行的代码。

英文:

The return value of pubsubio.Read is a PCollection of Pubsub messages. To convert these to a BigQuery row, you will need to apply a DoFn that takes a Pubsub message and converts it to a BigQuery row. This will return a PCollection of BigQuery rows that you can pass to bigqueryio.Write. Something like this:

p := beam.NewPipeline()
s := p.Root()

pubsubmessages := pubsubio.Read(s, project, *input, &pubsubio.ReadOptions{Subscription: sub.ID()})

bigqueryrows := beam.ParDo(s, func(message []byte) string {
		return ...
}, pubsubmessages)

bigqueryio.Write(s, project, *output, bigqueryrows)

You replace the ... with your code that converts the raw bytes of the Pubsub message to a BigQuery row.

huangapple
  • 本文由 发表于 2022年9月14日 21:33:21
  • 转载请务必保留本文链接:https://go.coder-hub.com/73717783.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定