英文:
Apache Beam / Dataflow GoSDK Pipeline doesn't process any pubsub messages
问题
我希望大家星期五过得愉快。我急需一些关于Apache Beam Go SDK(https://github.com/apache/beam/tree/master/sdks)的帮助。我使用它编写了一个处理PubSub事件的流水线,但遇到了一个问题:我的工作程序正常启动,但没有从PubSub中消费任何消息。我尝试运行SDK中提供的示例(streaming_wordcap),它使用相同的pubsubio,结果也是一样的。新创建的主题中没有消费任何消息。我想知道是否有额外的选项需要启用?是否有特定于部署的标志?我有点迷失了。
订阅中有消息(几百万条)。当我进行实验并将订阅名称更改为不存在的名称时,我在数据流日志中看到了错误。否则,除了通用的数据流调试信息外,没有错误或其他信息。
这是我的流水线代码的一部分:
var (
inputTopic = flag.String("topic", "", "PubSub input topic (required).")
inputSubscription = flag.String("inputSubscription", "", "PubSub input subscription (required).")
outputTableSpec = flag.String("outputTableSpec", "", "Output BQ table (required).")
)
func init() {
beam.RegisterType(reflect.TypeOf((*event.Envelope)(nil)).Elem())
beam.RegisterType(reflect.TypeOf((*decodeEnvelopeJSONFunc)(nil)).Elem())
[...]
}
func main() {
flag.Parse()
beam.Init()
ctx := context.Background()
if err := validateFlags(); err != nil {
log.Exit(ctx, err.Error())
}
project := gcpopts.GetProject(ctx)
p, s := beam.NewPipelineWithRoot()
pubSubMessages := pubsubio.Read(s, project, *inputTopic, &pubsubio.ReadOptions{
Subscription: *inputSubscription, WithAttributes: false, IDAttribute: "", TimestampAttribute: "",
})
eventMapper := DecodeAndMap(s, pubSubMessages)
bigqueryio.Write(s, project, *outputTableSpec, eventMapper)
if err := beamx.Run(ctx, p); err != nil {
log.Fatalf(ctx, "failed to execute job: %v", err)
}
}
func DecodeAndMap(s beam.Scope, messages beam.PCollection) beam.PCollection {
s = s.Scope("DecodeAndMap")
events := beam.ParDo(s, &decodeEnvelopeJSONFunc{}, messages)
return beam.ParDo(s, &mapPayloadFunc{}, events)
}
type decodeEnvelopeJSONFunc struct{}
func (f *decodeEnvelopeJSONFunc) ProcessElement(ctx context.Context, msg []byte, emit func(*event.Envelope)) error {
var e event.Envelope
log.Infoln(ctx, "decoding envelope")
if err := json.NewDecoder(bytes.NewReader(msg)).Decode(&e); err != nil {
return fmt.Errorf("failed to decode envelope: %w", err)
}
log.Infoln(ctx, "emitting envelope")
emit(&e)
return nil
}
[...]
这是我部署流水线的方式:
go run ./pkg/my-mapper/. \
--runner dataflow \
--job_name my-mapper \
--project mb-gcp-project \
--region europe-west4 --zone europe-west4-a \
--temp_location gs://my-beam-tmp-data-bucket/tmp/ \
--staging_location gs://my-beam-tmp-data-bucket/binaries/ \
--worker_harness_container_image=apache/beam_go_sdk:latest \
--subnetwork regions/europe-west4/subnetworks/my-subnetwork \
--num_workers 3 \
--max_num_workers 10 \
--async --update \
--topic my-topic-name --inputSubscription my-sub-name --outputTableSpec my-gcp-project:my_dataset.mapped_data
2022/07/08 12:16:33 Cross-compiling ... as /tmp/worker-1-1657278993706049280
[...]
"type": "JOB_TYPE_STREAMING"
}
2022/07/08 12:20:11 Submitted job: 2022-07-08_04_20_11-11918574995509384496
英文:
I hope everyone's Friday is going well. I am desperately looking for some help with apache beam Go SDK (https://github.com/apache/beam/tree/master/sdks). I've written a pipeline using it to process PubSub events and got into a stage where my workers are starting nicely, but no messages are being consumed from pubsub. I've tried to run the example provided in the SDK (streaming_wordcap) that's using the same pubsubio and the result is the same. No messages in the newly created topics are being consumed. I wonder if there is an extra option that I should be enabling? Any deployment-specific flag? I am a little bit lost now.
There are messages in the subscription (a few million). When performed an experiment and changed the subscription name to something that doesn't exist I have seen errors in dataflow logs. Otherwise no errors, no info apart from generic dataflow debug.
2022-07-08T11:21:31.793474125ZStarting 3 workers in europe-west4-a...
Debug
2022-07-08T11:21:31.820662575ZStarting worker pool setup.
Debug
2022-07-08T11:22:00.789962383ZAutoscaling: Raised the number of workers to 3 so that the pipeline can catch up with its backlog and keep up with its input rate.
Debug
2022-07-08T11:22:50.806937837ZWorkers have started successfully.
Here is a part of my pipeline code:
var (
inputTopic = flag.String("topic", "", "PubSub input topic (required).")
inputSubscription = flag.String("inputSubscription", "", "PubSub input subscription (required).")
outputTableSpec = flag.String("outputTableSpec", "", "Output BQ table (required).")
)
func init() {
beam.RegisterType(reflect.TypeOf((*event.Envelope)(nil)).Elem())
beam.RegisterType(reflect.TypeOf((*decodeEnvelopeJSONFunc)(nil)).Elem())
[...]
}
func main() {
flag.Parse()
beam.Init()
ctx := context.Background()
if err := validateFlags(); err != nil {
log.Exit(ctx, err.Error())
}
project := gcpopts.GetProject(ctx)
p, s := beam.NewPipelineWithRoot()
pubSubMessages := pubsubio.Read(s, project, *inputTopic, &pubsubio.ReadOptions{
Subscription: *inputSubscription, WithAttributes: false, IDAttribute: "", TimestampAttribute: "",
})
eventMapper := DecodeAndMap(s, pubSubMessages)
bigqueryio.Write(s, project, *outputTableSpec, eventMapper)
if err := beamx.Run(ctx, p); err != nil {
log.Fatalf(ctx, "failed to execute job: %v", err)
}
}
func DecodeAndMap(s beam.Scope, messages beam.PCollection) beam.PCollection {
s = s.Scope("DecodeAndMap")
events := beam.ParDo(s, &decodeEnvelopeJSONFunc{}, messages)
return beam.ParDo(s, &mapPayloadFunc{}, events)
}
type decodeEnvelopeJSONFunc struct{}
func (f *decodeEnvelopeJSONFunc) ProcessElement(ctx context.Context, msg []byte, emit func(*event.Envelope)) error {
var e event.Envelope
log.Infoln(ctx, "decoding envelope")
if err := json.NewDecoder(bytes.NewReader(msg)).Decode(&e); err != nil {
return fmt.Errorf("failed to decode envelope: %w", err)
}
log.Infoln(ctx, "emitting envelope")
emit(&e)
return nil
}
[...]
Here is how I am deploying my pipeline
go run ./pkg/my-mapper/. \
--runner dataflow \
--job_name my-mapper \
--project mb-gcp-project \
--region europe-west4 --zone europe-west4-a \
--temp_location gs://my-beam-tmp-data-bucket/tmp/ \
--staging_location gs://my-beam-tmp-data-bucket/binaries/ \
--worker_harness_container_image=apache/beam_go_sdk:latest \
--subnetwork regions/europe-west4/subnetworks/my-subnetwork \
--num_workers 3 \
--max_num_workers 10 \
--async --update \
--topic my-topic-name --inputSubscription my-sub-name --outputTableSpec my-gcp-project:my_dataset.mapped_data
2022/07/08 12:16:33 Cross-compiling ... as /tmp/worker-1-1657278993706049280
[...]
"type": "JOB_TYPE_STREAMING"
}
2022/07/08 12:20:11 Submitted job: 2022-07-08_04_20_11-11918574995509384496
答案1
得分: 1
这看起来像是一个窗口化的问题。这是一个流水线,它在写出之前没有对数据进行窗口化处理,这意味着它试图在默认的全局窗口中进行聚合,并且永远不会终止或确认消息。
在 PubSub 步骤之后添加一个 beam.WindowInto 步骤,以一定的粒度进行窗口化或触发器。有关更多信息,请参阅编程指南中的窗口化和触发器部分:https://beam.apache.org/documentation/programming-guide/#windowing
这将允许 bigqueryio.Write 步骤中的任何聚合操作实际完成,无论是 "xlang" 版本还是其他版本。
目前,我建议使用 "xlang" 版本,因为它已经经过验证可用于流式处理。Go 本机版本尚未经过测试和验证,无法正确地用于流式写入,可能会出现问题。
例如,如果您正在使用本机的 bigqueryIO,您的流水线在这里被阻塞,因为流水线正在等待全局窗口的结束。
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/io/bigqueryio/bigquery.go#L207
否则,以下是一些需要注意的事项:
- 确保您的导入路径使用 SDK 的 "sdks/v2"。没有使用 v2 的导入路径将停留在 2.32 版本,预计无法正常工作。不需要进行其他代码更改。所以 good(查看最新版本为 2.40),而 bad(查看最新版本为 2.32+incompatible)。编辑:根据屏幕截图,您已经在这样做了。太好了!
- 确保
event.Envelope
的字段都是公开的。否则,它们将不会被任何 JSON 或 beam 本机模式编码序列化。然而,我不认为这是根本问题。 - 如果您正在使用发布的 SDK 版本(v2.40.0、v2.39.0 等),则不需要包含
worker_harness_container_image
标志,实际上这可能会导致工作器启动问题。尽管历史上 Beam Go 容器没有太多变化,但随着添加了其他功能,需要在容器引导加载程序和工作器运行时代码之间进行协调,情况已经发生了变化。
祝好!
英文:
This looks like a windowing problem. It's a streaming pipeline that isn't windowing it's data before writing out, which means it's trying to aggregate in the Default Global Window, and never terminate or Ack the messages.
Add a beam.WindowInto step to some granularity after the PubSub step. Either with some windowing, or Triggers. See the Programming Guide on Windowing and Triggers for more info: https://beam.apache.org/documentation/programming-guide/#windowing
This will allow any aggretation operation in the bigqueryio.Write step to actually complete, regardless of if it's the "xlang" version or not.
At this time, I'd recommend using the xlang version, as that has been validated for Streaming use. The Go native one hasn't been tested and vetted to work properly for Streaming writes, and there could be issues as a result.
Eg. If you're using the native bigqueryIO, your pipeline is getting held up here, as the pipeline is waiting for the end of the global window.
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/io/bigqueryio/bigquery.go#L207
Otherwise, here are some things to look into though:
- Ensure your import paths are using the "sdks/v2" of the SDK. Imports that don't have that v2 are stuck at 2.32, and aren't expected to work. No other code changes are exected. So good (see that latest is 2.40), and bad (see that latest is 2.32+incompatible). Edit: Looking at the screenshot, you're already doing this. Excellent!
- Ensure that the fields of
event.Envelope
are all Exported. Otherwise they won't be serialized by any of either JSON or the beam native Schema encoding. I can't see this being the root problem however. - If you're using a released version of the SDK (v2.40.0, v2.39.0 etc...d), you don't need to include the
worker_harness_container_image
flag, and infact that could cause worker startup issues. While historically the Beam Go container hasn't changed much, that's no longer the case as additional features are being added, which require coordination between the container bootloader and the worker harness code.
Cheers!
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论