使用Go编写Apache Beam将消息写入PubSub

huangapple go评论83阅读模式
英文:

Apache Beam Write PubSub messages using Go

问题

我是你的中文翻译助手,以下是翻译好的内容:

我是Go的新手,尝试从BigQuery读取表格,并使用PubSub发布消息。我在网上搜索并找到了下面的代码。

package main

import (
	"context"
	"flag"
	"reflect"

	"github.com/apache/beam/sdks/v2/go/pkg/beam"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/bigqueryio"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/pubsubio"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/options/gcpopts"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)

// CommentRow models 1 row of HackerNews comments.
type CommentRow struct {
	Text string `bigquery:"text"`
}

const query = `SELECT text
FROM ` + "`bigquery-public-data.hacker_news.comments`" + `
WHERE time_ts BETWEEN '2013-01-01' AND '2014-01-01' and text IS NOT NULL
LIMIT 1000
`

func main() {
	flag.Parse()
	beam.Init()

	ctx := context.Background()
	p := beam.NewPipeline()
	s := p.Root()
	project := gcpopts.GetProject(ctx)

	// Build a PCollection<CommentRow> by querying BigQuery.
	rows := bigqueryio.Query(s, project, query,
		reflect.TypeOf(CommentRow{}), bigqueryio.UseStandardSQL())

	pc := beam.ParDo(s, func(row CommentRow, emit func(CommentRow)) {
		emit(row)
	}, rows)

	pubsubio.Write(s, project, "projects/PROJECTNAME/topics/test-topic", pc)

	if err := beamx.Run(ctx, p); err != nil {
		log.Exitf(ctx, "Failed to execute job: %v", err)
	}
}

但是我收到了以下错误信息。

panic: pubsubio.Write only accepts PCollections of *pubsub.PubsubMessage and []uint8, received main.CommentRow

我该如何将PCollection转换为PubsubMessage类型?我找不到太多关于这个的信息。

我的用例是从BigQuery表中读取多列,并将内容发布到PubSub主题。

英文:

I'm new to Go and trying to read a table from BigQuery and publish as messages using PubSub. I searched online and came up with the below code.

package main

import (
	&quot;context&quot;
	&quot;flag&quot;
	&quot;reflect&quot;

	&quot;github.com/apache/beam/sdks/v2/go/pkg/beam&quot;
	&quot;github.com/apache/beam/sdks/v2/go/pkg/beam/io/bigqueryio&quot;
	&quot;github.com/apache/beam/sdks/v2/go/pkg/beam/io/pubsubio&quot;
	&quot;github.com/apache/beam/sdks/v2/go/pkg/beam/log&quot;
	&quot;github.com/apache/beam/sdks/v2/go/pkg/beam/options/gcpopts&quot;
	&quot;github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx&quot;
)

// CommentRow models 1 row of HackerNews comments.
type CommentRow struct {
	Text string `bigquery:&quot;text&quot;`
}

const query = `SELECT text
FROM ` + &quot;`bigquery-public-data.hacker_news.comments`&quot; + `
WHERE time_ts BETWEEN &#39;2013-01-01&#39; AND &#39;2014-01-01&#39; and text IS NOT NULL
LIMIT 1000
`

func main() {
	flag.Parse()
	beam.Init()

	ctx := context.Background()
	p := beam.NewPipeline()
	s := p.Root()
	project := gcpopts.GetProject(ctx)

	// Build a PCollection&lt;CommentRow&gt; by querying BigQuery.
	rows := bigqueryio.Query(s, project, query,
		reflect.TypeOf(CommentRow{}), bigqueryio.UseStandardSQL())

	pc := beam.ParDo(s, func(row CommentRow, emit func(CommentRow)) {
		emit(row)
	}, rows)

	pubsubio.Write(s, project, &quot;projects/PROJECTNAME/topics/test-topic&quot;, pc)

	if err := beamx.Run(ctx, p); err != nil {
		log.Exitf(ctx, &quot;Failed to execute job: %v&quot;, err)
	}
}

But I get the below error message.

panic: pubsubio.Write only accepts PCollections of *pubsub.PubsubMessage and []uint8, received main.CommentRow

How can I covert the PCollection to type as PubsubMessage? I couldn't find much information about this.

My use case is to read multiple columns from a BigQuery table and publish the contents to a PubSub topic.

答案1

得分: 3

一个 pubsub 消息基本上是这样定义的:

message PubsubMessage {
  bytes data = 1;
  map<string, string> attributes = 2;
  string message_id = 3;
  google.protobuf.Timestamp publish_time = 4;
}

你可以将 []byte 类型的 PCollection 传递给 pubsubio.Write(),它会将其封装成 PubsubMessage 类型(参考文档2)。它使用这个 DoFn 来完成封装。

英文:

A pubsub message is basically this

message PubsubMessage {
  bytes data = 1;
  map&lt;string, string&gt; attributes = 2;
  string message_id = 3;
  google.protobuf.Timestamp publish_time = 4;
}

as defined in the proto.

You can pass in PCollection of []byte type to pubsubio.Write() and it will wrap it into PubsubMessage type (Doc). It uses this DoFn to do it.

huangapple
  • 本文由 发表于 2022年12月11日 20:18:07
  • 转载请务必保留本文链接:https://go.coder-hub.com/74760743.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定