Apache Beam Go SDK: how to convert PCollection<string> to PCollection<KV<string, string>>?

huangapple go评论95阅读模式
英文:

Apache Beam Go SDK: how to convert PCollection<string> to PCollection<KV<string, string>>?

问题

我正在使用Apache Beam Go SDK,并且在将 PCollection 转换为正确格式以进行按键分组/合并方面遇到了困难。

我有一个包含多个记录的 PCollection,每个键对应的字符串如下所示:

Bob, cat
Bob, dog
Carla, cat
Carla, bunny
Doug, horse

我想使用 GroupByKeyCombinePerKey 来对每个人的宠物进行聚合,结果如下所示:

Bob, [cat, dog]
Carla, [cat, bunny]
Doug, [horse]

我该如何将 PCollection<string> 转换为 PCollection<KV<string, string>>?

他们在这里提到了类似的内容(链接),但没有包含用于聚合字符串值的代码。

我可以使用 ParDo 来获取字符串键和字符串值,如下所示,但我无法弄清楚如何转换为 GroupPerKey 所需的 KV<string, string> 或 CoGBK<string, string> 格式的输入。

pcolOut := beam.ParDo(s, func(line string) (string, string) {
  cleanString := strings.TrimSpace(line)
  openingChar := ","
  iStart := strings.Index(cleanString, openingChar)
  key := cleanString[0:iStart]
  value := cleanString[iStart+1:]
		
  // 在返回之前如何转换为 PCollection<KV<string, string>>?
  return key, value
}, pcolIn)

groupedKV := beam.GroupByKey(s, pcolOut)

它会出现以下错误。有什么建议吗?

panic:  inserting ParDo in scope root
        creating new DoFn in scope root
        binding fn main.main.func2
        binding params [{Value string} {Value string}] to input CoGBK<string,string>
values of CoGBK<string,string> cannot bind to {Value string}
英文:

I'm using the Apache Beam Go SDK and having a hard time getting a PCollection in the correct format for grouping/combining by key.

I have multiple records per key in a PCollection of strings that look like this:

Bob, cat
Bob, dog
Carla, cat
Carla, bunny
Doug, horse

I want to use GroupByKey and CombinePerKey so I can aggregate each person's pets like this:

Bob, [cat, dog]
Carla, [cat, bunny]
Doug, [horse]

How do I convert a PCollection&lt;string&gt; to PCollection&lt;KV&lt;string, string&gt;&gt;?

They mention something similar here, but the code to aggregate the string values is not included.

I can use a ParDo to get the string key and string value as shown below, but I can't figure out how to convert to the KV&lt;string, string&gt; or CoGBK&lt;string, string&gt; format required as input to GroupPerKey.

pcolOut := beam.ParDo(s, func(line string) (string, string) {
  cleanString := strings.TrimSpace(line)
  openingChar := &quot;,&quot;
  iStart := strings.Index(cleanString, openingChar)
  key := cleanString[0:iStart]
  value := cleanString[iStart+1:]
		
// How to convert to PCollection&lt;KV&lt;string, string&gt;&gt; before returning?
  return key, value
}, pcolIn)

groupedKV := beam.GroupByKey(s, pcolOut) 

It fails with the following error. Any suggestions?

panic:  inserting ParDo in scope root
        creating new DoFn in scope root
        binding fn main.main.func2
        binding params [{Value string} {Value string}] to input CoGBK&lt;string,string&gt;
values of CoGBK&lt;string,string&gt; cannot bind to {Value string}

答案1

得分: 0

要将数据映射为键值对(KVs),您可以使用MapElements并使用into()设置KV类型,在via()逻辑中创建一个新的KV.of(myKey, myValue)。例如,要获取一个KV<String, String>,可以使用类似以下代码:

PCollection<KV<String, String>> kvPairs = linkpages.apply(MapElements.into(
    TypeDescriptors.kvs(
        TypeDescriptors.strings(),
        TypeDescriptors.strings()))
    .via(
        linkpage -> KV.of(dataFile, linkpage)));

请注意,这是一个Java代码示例,用于将linkpages中的数据映射为键值对。

英文:

To map to KVs, you can apply MapElements and use into() to set KV types and in the via() logic, create a new KV.of(myKey, myValue), for example, to get a KV&lt;String,String&gt;, use something like this:

    PCollection&lt;KV&lt;String, String&gt;&gt; kvPairs = linkpages.apply(MapElements.into(
        TypeDescriptors.kvs(
            TypeDescriptors.strings(),
            TypeDescriptors.strings()))
        .via(
            linkpage -&gt; KV.of(dataFile, linkpage)));

答案2

得分: 0

也许你误解了下一个pardo迭代类型

测试这段代码

pcolIn := beam.CreateList(s, []string{"Bob, cat",
	"Bob, dog",
	"Carla, cat",
	"Carla, bunny",
	"Doug, horse",
})

pcolOut := beam.ParDo(s, func(line string) (string, string) {
	cleanString := strings.TrimSpace(line)
	openingChar := ","
	iStart := strings.Index(cleanString, openingChar)
	key := cleanString[0:iStart]
	value := cleanString[iStart+1:]

	// 在返回之前如何转换为PCollection<KV<string, string>>?
	return key, value
}, pcolIn)

groupedKV := beam.GroupByKey(s, pcolOut)

beam.ParDo0(s, func(key string, iter func(*string) bool) {
	vals := []string{}
	val := ""
	for iter(&val) {
		vals = append(vals, strings.TrimSpace(val))
	}
	fmt.Println(key, vals)
}, groupedKV)
英文:

Maybe you mistake the next pardo iter type

test this code

pcolIn := beam.CreateList(s, []string{&quot;Bob, cat&quot;,
	&quot;Bob, dog&quot;,
	&quot;Carla, cat&quot;,
	&quot;Carla, bunny&quot;,
	&quot;Doug, horse&quot;,
})

pcolOut := beam.ParDo(s, func(line string) (string, string) {
	cleanString := strings.TrimSpace(line)
	openingChar := &quot;,&quot;
	iStart := strings.Index(cleanString, openingChar)
	key := cleanString[0:iStart]
	value := cleanString[iStart+1:]

	// How to convert to PCollection&lt;KV&lt;string, string&gt;&gt; before returning?
	return key, value
}, pcolIn)

groupedKV := beam.GroupByKey(s, pcolOut)

beam.ParDo0(s, func(key string, iter func(*string) bool) {
	vals := []string{}
	val := &quot;&quot;
	for iter(&amp;val) {
		vals = append(vals, strings.TrimSpace(val))
	}
	fmt.Println(key, vals)
}, groupedKV)

huangapple
  • 本文由 发表于 2022年3月23日 13:31:28
  • 转载请务必保留本文链接:https://go.coder-hub.com/71582246.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定