将全局数据或特定于运行的数据传输到Go中的Beam DoFn中。

huangapple go评论103阅读模式
英文:

Transmitting global data, or run-specific data, to a Beam DoFn in go

问题

我正在为一个流式数据流管道编写一个Go的Beam ParDo转换,作为一个DoFn。我正在尝试找到一种方法,在运行时将一个在管道之前计算的映射添加到每个DoFn中。使用状态API似乎不太合适,因为它是管道持续时间的常量数据。但是我似乎无法传入一个预初始化的DoFn来实现这一点。我尝试了以下代码:

type EngineMap struct {
    Map map[string]string 
}

type ResultProcessor struct {
	engineMap EngineMap
}

... (定义ProcessElement初始化过程)


processor := ResultProcessor{}
processor.engineMap.Map = make(map[string]string)
for k, v := range engines.Map {
	processor.engineMap.Map[k] = v
}
register.DoFn2x1[context.Context, []byte, []string](&processor)

... (初始化管道定义输入"lines")

result := beam.ParDo(s, &processor, lines)

但是当我运行这段代码时,当ProcessElement()方法运行时,engineMap中的映射仍然是空的,尽管在for循环之后它不是空的。我可以将这些数据作为侧输入传递,但对于一个在管道运行时是常量且相对较小的映射来说,这似乎过于复杂,特别是对于一个流式管道来说。

是否有其他方法可以传递数据?

英文:

I'm writing a Beam ParDo transform in Go for a streaming Dataflow pipeline, as a DoFn. I'm trying to find a way to add a map that was computed at runtime, but pre-pipeline, to every DoFn. Putting it in using the state API seems not quite right, as it's constant data for the duration of the pipeline. But I can't seem to pass in a pre-initialized DoFn to do this. I tried

type EngineMap struct {
    Map map[string]string 
}

type ResultProcessor struct {
	engineMap EngineMap
}

... (ProcessElement defined, initialization)


processor := ResultProcessor{}
processor.engineMap.Map = make(map[string]string)
for k, v := range engines.Map {
	processor.engineMap.Map[k] = v
}
register.DoFn2x1[context.Context, []byte, []string](&processor)

... (pipeline initialized, input "lines" defined)

result := beam.ParDo(s, &processor, lines)

but when I run this, the map in engineMap is still empty when the ProcessElement() method runs, even though it isn't after the for loop. I could pass this data as a side input, but this seems unnecessarily complicated for a fairly small map that is constant at pipeline run time, especially for a streaming pipeline.

Is there another way to pass the data along?

答案1

得分: 2

根本原因是engineMap字段未导出,因此其数据无法序列化。只有导出的字段(例如EngineMap)才能进行序列化。这是依赖于反射的“通用”编码器(如JSON或Beam Schema Row编码)的属性。

https://beam.apache.org/documentation/programming-guide/#user-code-serializability

在流水线中,不需要也不建议注册与使用的DoFn实例相同的实例。(如果这样可以工作,我们根本不需要注册)。DoFn的注册应该在init块中进行,或者至少在主函数中调用beam.Init()之前进行。

英文:

The root cause is that the engineMap field is unexported, so it's data cannot be serialized. Only Exported Fields (say, EngineMap can be serialized. This is a property of "general" coders which rely on reflection, like JSON or Beam Schema Row encodings.

https://beam.apache.org/documentation/programming-guide/#user-code-serializability

It's not necessary and not recommended to register the same instance of a DoFn as used in the pipeline. (If that could work, we wouldn't need to register at all). DoFn Registration should happen in an init block or at least prior to beam.Init() is called in main.

答案2

得分: 0

如果我理解正确,你正在使用的地图只是一个DoFn成员变量,在DoFn初始化后保持不变?在这种情况下,我建议将成员变量engineMap设置为公共变量,以便在为束创建的DoFn实例中进行序列化和反序列化

StartBundle方法与ProcessElement方法具有相同的元素。https://github.com/apache/beam/blob/b68d38e32c2aac51170da16c4d9c479420754009/sdks/go/pkg/beam/pardo.go#L240

这是一个使用StartBundle的示例(这是一个相当大的示例,所以我建议你专注于一个单独的DoFn):https://github.com/apache/beam/blob/67e6726ffeb47d2ada0122369fa230833ce0f026/sdks/go/examples/large_wordcount/large_wordcount.go#L207

英文:

If I understand correctly, the map you are using is just a DoFn member variable which remains constant after the DoFn initialization? In that case, I would suggest to make the member variable engineMap public for it to be serialized and deserialized in DoFn instances created for bundles.

The StartBundle has the same elements as that of ProcessElement method. https://github.com/apache/beam/blob/b68d38e32c2aac51170da16c4d9c479420754009/sdks/go/pkg/beam/pardo.go#L240

An example with StartBundle (This is a fairly large example so I would suggest focusing on a single DoFn): https://github.com/apache/beam/blob/67e6726ffeb47d2ada0122369fa230833ce0f026/sdks/go/examples/large_wordcount/large_wordcount.go#L207

huangapple
  • 本文由 发表于 2022年10月20日 04:03:53
  • 转载请务必保留本文链接:https://go.coder-hub.com/74131249.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定