英文:
Transmitting global data, or run-specific data, to a Beam DoFn in go
问题
我正在为一个流式数据流管道编写一个Go的Beam ParDo转换,作为一个DoFn。我正在尝试找到一种方法,在运行时将一个在管道之前计算的映射添加到每个DoFn中。使用状态API似乎不太合适,因为它是管道持续时间的常量数据。但是我似乎无法传入一个预初始化的DoFn来实现这一点。我尝试了以下代码:
type EngineMap struct {
Map map[string]string
}
type ResultProcessor struct {
engineMap EngineMap
}
... (定义ProcessElement,初始化过程)
processor := ResultProcessor{}
processor.engineMap.Map = make(map[string]string)
for k, v := range engines.Map {
processor.engineMap.Map[k] = v
}
register.DoFn2x1[context.Context, []byte, []string](&processor)
... (初始化管道,定义输入"lines")
result := beam.ParDo(s, &processor, lines)
但是当我运行这段代码时,当ProcessElement()方法运行时,engineMap中的映射仍然是空的,尽管在for
循环之后它不是空的。我可以将这些数据作为侧输入传递,但对于一个在管道运行时是常量且相对较小的映射来说,这似乎过于复杂,特别是对于一个流式管道来说。
是否有其他方法可以传递数据?
英文:
I'm writing a Beam ParDo transform in Go for a streaming Dataflow pipeline, as a DoFn. I'm trying to find a way to add a map that was computed at runtime, but pre-pipeline, to every DoFn. Putting it in using the state API seems not quite right, as it's constant data for the duration of the pipeline. But I can't seem to pass in a pre-initialized DoFn to do this. I tried
type EngineMap struct {
Map map[string]string
}
type ResultProcessor struct {
engineMap EngineMap
}
... (ProcessElement defined, initialization)
processor := ResultProcessor{}
processor.engineMap.Map = make(map[string]string)
for k, v := range engines.Map {
processor.engineMap.Map[k] = v
}
register.DoFn2x1[context.Context, []byte, []string](&processor)
... (pipeline initialized, input "lines" defined)
result := beam.ParDo(s, &processor, lines)
but when I run this, the map in engineMap is still empty when the ProcessElement() method runs, even though it isn't after the for
loop. I could pass this data as a side input, but this seems unnecessarily complicated for a fairly small map that is constant at pipeline run time, especially for a streaming pipeline.
Is there another way to pass the data along?
答案1
得分: 2
根本原因是engineMap
字段未导出,因此其数据无法序列化。只有导出的字段(例如EngineMap
)才能进行序列化。这是依赖于反射的“通用”编码器(如JSON或Beam Schema Row编码)的属性。
https://beam.apache.org/documentation/programming-guide/#user-code-serializability
在流水线中,不需要也不建议注册与使用的DoFn实例相同的实例。(如果这样可以工作,我们根本不需要注册)。DoFn的注册应该在init块中进行,或者至少在主函数中调用beam.Init()之前进行。
英文:
The root cause is that the engineMap
field is unexported, so it's data cannot be serialized. Only Exported Fields (say, EngineMap
can be serialized. This is a property of "general" coders which rely on reflection, like JSON or Beam Schema Row encodings.
https://beam.apache.org/documentation/programming-guide/#user-code-serializability
It's not necessary and not recommended to register the same instance of a DoFn as used in the pipeline. (If that could work, we wouldn't need to register at all). DoFn Registration should happen in an init block or at least prior to beam.Init() is called in main.
答案2
得分: 0
如果我理解正确,你正在使用的地图只是一个DoFn成员变量,在DoFn初始化后保持不变?在这种情况下,我建议将成员变量engineMap
设置为公共变量,以便在为束创建的DoFn实例中进行序列化和反序列化。
StartBundle方法与ProcessElement方法具有相同的元素。https://github.com/apache/beam/blob/b68d38e32c2aac51170da16c4d9c479420754009/sdks/go/pkg/beam/pardo.go#L240
这是一个使用StartBundle的示例(这是一个相当大的示例,所以我建议你专注于一个单独的DoFn):https://github.com/apache/beam/blob/67e6726ffeb47d2ada0122369fa230833ce0f026/sdks/go/examples/large_wordcount/large_wordcount.go#L207
英文:
If I understand correctly, the map you are using is just a DoFn member variable which remains constant after the DoFn initialization? In that case, I would suggest to make the member variable engineMap
public for it to be serialized and deserialized in DoFn instances created for bundles.
The StartBundle has the same elements as that of ProcessElement method. https://github.com/apache/beam/blob/b68d38e32c2aac51170da16c4d9c479420754009/sdks/go/pkg/beam/pardo.go#L240
An example with StartBundle (This is a fairly large example so I would suggest focusing on a single DoFn): https://github.com/apache/beam/blob/67e6726ffeb47d2ada0122369fa230833ce0f026/sdks/go/examples/large_wordcount/large_wordcount.go#L207
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论