英文:
Apache Beam Go SDK: how to convert PCollection<string> to PCollection<KV<string, string>>?
问题
我正在使用Apache Beam Go SDK,并且在将 PCollection 转换为正确格式以进行按键分组/合并方面遇到了困难。
我有一个包含多个记录的 PCollection,每个键对应的字符串如下所示:
Bob, cat
Bob, dog
Carla, cat
Carla, bunny
Doug, horse
我想使用 GroupByKey 和 CombinePerKey 来对每个人的宠物进行聚合,结果如下所示:
Bob, [cat, dog]
Carla, [cat, bunny]
Doug, [horse]
我该如何将 PCollection<string> 转换为 PCollection<KV<string, string>>?
他们在这里提到了类似的内容(链接),但没有包含用于聚合字符串值的代码。
我可以使用 ParDo 来获取字符串键和字符串值,如下所示,但我无法弄清楚如何转换为 GroupPerKey 所需的 KV<string, string> 或 CoGBK<string, string> 格式的输入。
pcolOut := beam.ParDo(s, func(line string) (string, string) {
cleanString := strings.TrimSpace(line)
openingChar := ","
iStart := strings.Index(cleanString, openingChar)
key := cleanString[0:iStart]
value := cleanString[iStart+1:]
// 在返回之前如何转换为 PCollection<KV<string, string>>?
return key, value
}, pcolIn)
groupedKV := beam.GroupByKey(s, pcolOut)
它会出现以下错误。有什么建议吗?
panic: inserting ParDo in scope root
creating new DoFn in scope root
binding fn main.main.func2
binding params [{Value string} {Value string}] to input CoGBK<string,string>
values of CoGBK<string,string> cannot bind to {Value string}
英文:
I'm using the Apache Beam Go SDK and having a hard time getting a PCollection in the correct format for grouping/combining by key.
I have multiple records per key in a PCollection of strings that look like this:
Bob, cat
Bob, dog
Carla, cat
Carla, bunny
Doug, horse
I want to use GroupByKey and CombinePerKey so I can aggregate each person's pets like this:
Bob, [cat, dog]
Carla, [cat, bunny]
Doug, [horse]
How do I convert a PCollection<string> to PCollection<KV<string, string>>?
They mention something similar here, but the code to aggregate the string values is not included.
I can use a ParDo to get the string key and string value as shown below, but I can't figure out how to convert to the KV<string, string> or CoGBK<string, string> format required as input to GroupPerKey.
pcolOut := beam.ParDo(s, func(line string) (string, string) {
cleanString := strings.TrimSpace(line)
openingChar := ","
iStart := strings.Index(cleanString, openingChar)
key := cleanString[0:iStart]
value := cleanString[iStart+1:]
// How to convert to PCollection<KV<string, string>> before returning?
return key, value
}, pcolIn)
groupedKV := beam.GroupByKey(s, pcolOut)
It fails with the following error. Any suggestions?
panic: inserting ParDo in scope root
creating new DoFn in scope root
binding fn main.main.func2
binding params [{Value string} {Value string}] to input CoGBK<string,string>
values of CoGBK<string,string> cannot bind to {Value string}
答案1
得分: 0
要将数据映射为键值对(KVs),您可以使用MapElements并使用into()设置KV类型,在via()逻辑中创建一个新的KV.of(myKey, myValue)
。例如,要获取一个KV<String, String>
,可以使用类似以下代码:
PCollection<KV<String, String>> kvPairs = linkpages.apply(MapElements.into(
TypeDescriptors.kvs(
TypeDescriptors.strings(),
TypeDescriptors.strings()))
.via(
linkpage -> KV.of(dataFile, linkpage)));
请注意,这是一个Java代码示例,用于将linkpages中的数据映射为键值对。
英文:
To map to KVs, you can apply MapElements and use into() to set KV types and in the via() logic, create a new KV.of(myKey, myValue)
, for example, to get a KV<String,String>
, use something like this:
PCollection<KV<String, String>> kvPairs = linkpages.apply(MapElements.into(
TypeDescriptors.kvs(
TypeDescriptors.strings(),
TypeDescriptors.strings()))
.via(
linkpage -> KV.of(dataFile, linkpage)));
答案2
得分: 0
也许你误解了下一个pardo迭代类型
测试这段代码
pcolIn := beam.CreateList(s, []string{"Bob, cat",
"Bob, dog",
"Carla, cat",
"Carla, bunny",
"Doug, horse",
})
pcolOut := beam.ParDo(s, func(line string) (string, string) {
cleanString := strings.TrimSpace(line)
openingChar := ","
iStart := strings.Index(cleanString, openingChar)
key := cleanString[0:iStart]
value := cleanString[iStart+1:]
// 在返回之前如何转换为PCollection<KV<string, string>>?
return key, value
}, pcolIn)
groupedKV := beam.GroupByKey(s, pcolOut)
beam.ParDo0(s, func(key string, iter func(*string) bool) {
vals := []string{}
val := ""
for iter(&val) {
vals = append(vals, strings.TrimSpace(val))
}
fmt.Println(key, vals)
}, groupedKV)
英文:
Maybe you mistake the next pardo iter type
test this code
pcolIn := beam.CreateList(s, []string{"Bob, cat",
"Bob, dog",
"Carla, cat",
"Carla, bunny",
"Doug, horse",
})
pcolOut := beam.ParDo(s, func(line string) (string, string) {
cleanString := strings.TrimSpace(line)
openingChar := ","
iStart := strings.Index(cleanString, openingChar)
key := cleanString[0:iStart]
value := cleanString[iStart+1:]
// How to convert to PCollection<KV<string, string>> before returning?
return key, value
}, pcolIn)
groupedKV := beam.GroupByKey(s, pcolOut)
beam.ParDo0(s, func(key string, iter func(*string) bool) {
vals := []string{}
val := ""
for iter(&val) {
vals = append(vals, strings.TrimSpace(val))
}
fmt.Println(key, vals)
}, groupedKV)
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论