Go + Apache Beam GCP Dataflow: Could not find the sink for pubsub, Check that the sink library specifies alwayslink = 1

huangapple go评论98阅读模式
英文:

Go + Apache Beam GCP Dataflow: Could not find the sink for pubsub, Check that the sink library specifies alwayslink = 1

问题

我正在使用Go SDK和Apache Beam构建一个简单的Dataflow流水线,该流水线将从查询中获取数据,并使用以下代码将数据发布到pub/sub:

package main

import (
	"context"
	"flag"
	"github.com/apache/beam/sdks/go/pkg/beam"
	"github.com/apache/beam/sdks/go/pkg/beam/io/pubsubio"
	"github.com/apache/beam/sdks/go/pkg/beam/log"
	"github.com/apache/beam/sdks/go/pkg/beam/options/gcpopts"
	"github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
	"gitlab.com/bq-to-pubsub/infra/env"
	"gitlab.com/bq-to-pubsub/sources"
	"gitlab.com/bq-to-pubsub/sources/pp"
)

func main() {
	flag.Parse()
	ctx := context.Background()
	beam.Init()
	log.Info(ctx, "Creating new pipeline")
	pipeline, scope := beam.NewPipelineWithRoot()
	project := gcpopts.GetProject(ctx)

	ppData := pp.Query(scope, project)
	ppMessages := beam.ParDo(scope, pp.ToByteArray, ppData)
	pubsubio.Write(scope, "project", "topic", ppMessages)

	if err := beamx.Run(ctx, pipeline); err != nil {
		log.Exitf(ctx, "Failed to execute job: %v", err)
	}
}

当我的流水线在Google Cloud Dataflow上运行时,我遇到了以下错误:

Workflow failed. Causes: S01:Source pp/bigquery.Query/Impulse+Source pp/bigquery.Query/bigqueryio.queryFn+pp.ToByteArray+pubsubio.Write/External failed., The job failed because a work item has failed 4 times. Look in previous log entries for the cause of each one of the 4 failures. For more information, see https://cloud.google.com/dataflow/docs/guides/common-errors. The work item was attempted on these workers: pp10112132-vhzf-harness-p8v0 Root cause: Could not find the sink for pubsub, Check that the sink library specifies alwayslink = 1., pp10112132-vhzf-harness-p8v0 Root cause: Could not find the sink for pubsub, Check that the sink library specifies alwayslink = 1., pp10112132-vhzf-harness-p8v0 Root cause: Could not find the sink for pubsub, Check that the sink library specifies alwayslink = 1., pp10112132-vhzf-harness-p8v0 Root cause: Could not find the sink for pubsub, Check that the sink library specifies alwayslink = 1.

我已经阅读了这个帖子,但我不确定问题是如何解决的。

有什么想法吗?

英文:

I am using the Go SDK with Apache Beam to build a simple Dataflow pipeline that will get data from a query and publish the data to pub/sub with the following code:

package main

import (
	"context"
	"flag"
	"github.com/apache/beam/sdks/go/pkg/beam"
	"github.com/apache/beam/sdks/go/pkg/beam/io/pubsubio"
	"github.com/apache/beam/sdks/go/pkg/beam/log"
	"github.com/apache/beam/sdks/go/pkg/beam/options/gcpopts"
	"github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
	"gitlab.com/bq-to-pubsub/infra/env"
	"gitlab.com/bq-to-pubsub/sources"
	"gitlab.com/bq-to-pubsub/sources/pp"
)

func main() {
	flag.Parse()
	ctx := context.Background()
	beam.Init()
	log.Info(ctx, "Creating new pipeline")
	pipeline, scope := beam.NewPipelineWithRoot()
	project := gcpopts.GetProject(ctx)

	ppData := pp.Query(scope, project)
	ppMessages := beam.ParDo(scope, pp.ToByteArray, ppData)
	pubsubio.Write(scope, "project", "topic", ppMessages)

	if err := beamx.Run(ctx, pipeline); err != nil {
		log.Exitf(ctx, "Failed to execute job: %v", err)
	}
}

While my pipeline is running on Google Cloud Dataflow, I get the following error:

> Workflow failed. Causes: S01:Source pp/bigquery.Query/Impulse+Source pp/bigquery.Query/bigqueryio.queryFn+pp.ToByteArray+pubsubio.Write/External failed., The job failed because a work item has failed 4 times. Look in previous log entries for the cause of each one of the 4 failures. For more information, see https://cloud.google.com/dataflow/docs/guides/common-errors. The work item was attempted on these workers: pp10112132-vhzf-harness-p8v0 Root cause: Could not find the sink for pubsub, Check that the sink library specifies alwayslink = 1., pp10112132-vhzf-harness-p8v0 Root cause: Could not find the sink for pubsub, Check that the sink library specifies alwayslink = 1., pp10112132-vhzf-harness-p8v0 Root cause: Could not find the sink for pubsub, Check that the sink library specifies alwayslink = 1., pp10112132-vhzf-harness-p8v0 Root cause: Could not find the sink for pubsub, Check that the sink library specifies alwayslink = 1.

I have read this thread but I am not sure how it was resolved.

Any idea?

答案1

得分: 3

这份代码是关于在流式模式还是批处理模式下运行的问题。我猜测是批处理模式。可能是因为用于批处理模式的Dataflow内部运行器没有链接到pub sub sink。

不幸的是,目前Go SDK没有提供本地的“备用”方法,供批处理运行器使用来写入pubsub。

不过,如果你使用标准的Go包编写自己的DoFn来写入PubSub,你应该很容易解决这个问题。你可以参考这里的文档:https://pkg.go.dev/cloud.google.com/go/pubsub#hdr-Publishing

大致上,你应该编写的代码如下所示:

var (
  // 假设所有内容都在一个项目中
  clientOnce sync.Once
  pubSubClient pubsub.Client
)

type PubSubSinkFn struct{
  Project, Topic string // 根据需要进行配置

  client pubsub.Client  // Client 可以在多个 goroutine 中安全使用
  batch []*myMessages   // 每个 bundle 的批次
}

func (fn *PubSubSinkFn) Setup(ctx context.Context) {
   clientOnce.Do (... ) // 使用 sync.Once 创建客户端,以便所有 bundle 都可以共享
   fn.client = pubSubClient
}

func (fn *PubSubSinkFn) ProcessElement(ctx context.Context, v *myMessage) {
  fn.batch = append(fn.batch, v)
  if len(fn.batch) > batchSize { // 或者使用其他条件
     fn.publishBatch()
  }
}

func (fn *PubSubSinkFn) FinishBundle() {
  fn.publishBatch()
}

func (fn *PubSubSinkFn) publishBatch() {
  // 使用 fn.client 来发布批次
  fn.batch = nil
}

// 在构建管道时使用
beam.ParDo0(s, &PubSubSinkFn{Project: "foo", Topic: "bar"}, messages)

希望对你有所帮助!

英文:

Is the job running in Streaming mode or Batch mode? I'd guess Batch mode. It might be the Dataflow internal runner used for batch mode doesn't link in the pub sub sink.

Unfortunately at this time, the Go SDK doesn't provide a local "fallback" for writing to pubsub that the batch runner can use instead.

That said, you should be unblocked pretty easily if you write your own DoFn to write to PubSub using the standard Go package. https://pkg.go.dev/cloud.google.com/go/pubsub#hdr-Publishing

Roughly what you should write would look like the following.

var (
  // Assuming everything is one project
  clientOnce sync.Once
  pubSubClient pubsub.Client
)

type PubSubSinkFn struct{
  Project, Topic string // Whatever configuration you need

  client pubsub.Client  // Client is safe to use on multiple goroutines
  batch []*myMessages   // per bundle batches.
}

func (fn *PubSubSinkFn) Setup(ctx context.Context) {
   clientOnce.Do (... ) // create the client with the sync.Once so it can be shared by all bundles
   fn.client = pubSubClient
}

func (fn *PubSubSinkFn) ProcessElement(ctx context.Context, v *myMessage) {
  fn.batch = append(fn.batch, v)
  if len(fn.batch) > batchSize { // or whatever criteria you want
     fn.publishBatch()
  }
}

func (fn *PubSubSinkFn) FinishBundle() {
  fn.publishBatch()
}

func (fn *PubSubSinkFn) publishBatch() {
  // use fn.client to publish the batch
  fn.batch = nil
}

// When constructing your pipeline
beam.ParDo0(s, &PubSubSinkFn{Project: "foo", Topic: "bar"}, messages)

huangapple
  • 本文由 发表于 2021年10月21日 03:00:23
  • 转载请务必保留本文链接:https://go.coder-hub.com/69651665.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定