英文:
Apache Beam Write PubSub messages using Go
问题
我是你的中文翻译助手,以下是翻译好的内容:
我是Go的新手,尝试从BigQuery读取表格,并使用PubSub发布消息。我在网上搜索并找到了下面的代码。
package main
import (
"context"
"flag"
"reflect"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/bigqueryio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/pubsubio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/options/gcpopts"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)
// CommentRow models 1 row of HackerNews comments.
type CommentRow struct {
Text string `bigquery:"text"`
}
const query = `SELECT text
FROM ` + "`bigquery-public-data.hacker_news.comments`" + `
WHERE time_ts BETWEEN '2013-01-01' AND '2014-01-01' and text IS NOT NULL
LIMIT 1000
`
func main() {
flag.Parse()
beam.Init()
ctx := context.Background()
p := beam.NewPipeline()
s := p.Root()
project := gcpopts.GetProject(ctx)
// Build a PCollection<CommentRow> by querying BigQuery.
rows := bigqueryio.Query(s, project, query,
reflect.TypeOf(CommentRow{}), bigqueryio.UseStandardSQL())
pc := beam.ParDo(s, func(row CommentRow, emit func(CommentRow)) {
emit(row)
}, rows)
pubsubio.Write(s, project, "projects/PROJECTNAME/topics/test-topic", pc)
if err := beamx.Run(ctx, p); err != nil {
log.Exitf(ctx, "Failed to execute job: %v", err)
}
}
但是我收到了以下错误信息。
panic: pubsubio.Write only accepts PCollections of *pubsub.PubsubMessage and []uint8, received main.CommentRow
我该如何将PCollection转换为PubsubMessage类型?我找不到太多关于这个的信息。
我的用例是从BigQuery表中读取多列,并将内容发布到PubSub主题。
英文:
I'm new to Go and trying to read a table from BigQuery and publish as messages using PubSub. I searched online and came up with the below code.
package main
import (
"context"
"flag"
"reflect"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/bigqueryio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/pubsubio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/options/gcpopts"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)
// CommentRow models 1 row of HackerNews comments.
type CommentRow struct {
Text string `bigquery:"text"`
}
const query = `SELECT text
FROM ` + "`bigquery-public-data.hacker_news.comments`" + `
WHERE time_ts BETWEEN '2013-01-01' AND '2014-01-01' and text IS NOT NULL
LIMIT 1000
`
func main() {
flag.Parse()
beam.Init()
ctx := context.Background()
p := beam.NewPipeline()
s := p.Root()
project := gcpopts.GetProject(ctx)
// Build a PCollection<CommentRow> by querying BigQuery.
rows := bigqueryio.Query(s, project, query,
reflect.TypeOf(CommentRow{}), bigqueryio.UseStandardSQL())
pc := beam.ParDo(s, func(row CommentRow, emit func(CommentRow)) {
emit(row)
}, rows)
pubsubio.Write(s, project, "projects/PROJECTNAME/topics/test-topic", pc)
if err := beamx.Run(ctx, p); err != nil {
log.Exitf(ctx, "Failed to execute job: %v", err)
}
}
But I get the below error message.
panic: pubsubio.Write only accepts PCollections of *pubsub.PubsubMessage and []uint8, received main.CommentRow
How can I covert the PCollection to type as PubsubMessage? I couldn't find much information about this.
My use case is to read multiple columns from a BigQuery table and publish the contents to a PubSub topic.
答案1
得分: 3
一个 pubsub 消息基本上是这样定义的:
message PubsubMessage {
bytes data = 1;
map<string, string> attributes = 2;
string message_id = 3;
google.protobuf.Timestamp publish_time = 4;
}
你可以将 []byte
类型的 PCollection 传递给 pubsubio.Write()
,它会将其封装成 PubsubMessage 类型(参考文档2)。它使用这个 DoFn 来完成封装。
英文:
A pubsub message is basically this
message PubsubMessage {
bytes data = 1;
map<string, string> attributes = 2;
string message_id = 3;
google.protobuf.Timestamp publish_time = 4;
}
as defined in the proto.
You can pass in PCollection of []byte
type to pubsubio.Write()
and it will wrap it into PubsubMessage type (Doc). It uses this DoFn to do it.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论