用于 Beam 流水线的键值迭代

huangapple go评论58阅读模式
英文:

Iterate Keys with Values for Beam pipeline

问题

在应用 .apply(GroupByKey.create()) 后,我得到类似 PCollection<KV<Integer, Iterable>> 的值。你能建议如何对每个键应用进一步的转换吗?

例如:PCollection<KV<1, Iterable>>
PCollection<KV<2, Iterable>>
这些键是动态值。我需要对PCollection中存在的每个键进行迭代。

英文:

After applying .apply(GroupByKey.create()) I am getting values like PCollection<KV<Integer,Iterable<String>>. Can you suggest how to apply further transforms for each key.

Ex: PCollection<KV<1,Iterable<String>>
PCollection<KV<2,Iterable<String>>
The keys are dynamic values. I need to iterate for each Key Present in the PCollection.

答案1

得分: 3

你应该能够使用DoFn / ParDo来迭代这样的可迭代对象。

我草拟了一个快速的示例来展示如何实现这一点。

    // 创建样本行
    PCollection<TableRow> rows =
        pipeline
            .apply(
                Create.of(
                    new TableRow().set("group", 1).set("name", "Dataflow"),
                    new TableRow().set("group", 1).set("name", "Pub/Sub"),
                    new TableRow().set("group", 2).set("name", "BigQuery"),
                    new TableRow().set("group", 2).set("name", "Vertex")))
            .setCoder(TableRowJsonCoder.of());

    // 转换为 <group, name> 的 KV
    PCollection<KV<Integer, String>> keyValues =
        rows.apply(
            "Key",
            MapElements.into(
                    TypeDescriptors.kvs(TypeDescriptors.integers(), TypeDescriptors.strings()))
                .via(row -> KV.of((Integer) row.get("group"), (String) row.get("name"))));

    // 按键分组
    PCollection<KV<Integer, Iterable<String>>> groups =
        keyValues.apply("Group", GroupByKey.create());

    // 迭代并打印组 + 值
    groups.apply(
        ParDo.of(
            new DoFn<KV<Integer, Iterable<String>>, Void>() {
              @ProcessElement
              public void processElement(@Element KV<Integer, Iterable<String>> kv) {
                StringBuilder sb = new StringBuilder();
                for (String name : kv.getValue()) {
                  if (sb.length() > 0) {
                    sb.append(", ");
                  }
                  sb.append(name);
                }
                System.out.println("Group " + kv.getKey() + " values: " + sb);
              }
            }));

    pipeline.run();

输出(请注意,由于并发性,输出结果不是有序/保证的)。

Group 2 values: BigQuery, Vertex
Group 1 values: Dataflow, Pub/Sub
英文:

You should be able to use a DoFn / ParDo to iterate over such iterable.

I drafted a quick example to show how this can be done.

    // Create sample rows
    PCollection&lt;TableRow&gt; rows =
        pipeline
            .apply(
                Create.of(
                    new TableRow().set(&quot;group&quot;, 1).set(&quot;name&quot;, &quot;Dataflow&quot;),
                    new TableRow().set(&quot;group&quot;, 1).set(&quot;name&quot;, &quot;Pub/Sub&quot;),
                    new TableRow().set(&quot;group&quot;, 2).set(&quot;name&quot;, &quot;BigQuery&quot;),
                    new TableRow().set(&quot;group&quot;, 2).set(&quot;name&quot;, &quot;Vertex&quot;)))
            .setCoder(TableRowJsonCoder.of());

    // Convert into a KV of &lt;group, name&gt;
    PCollection&lt;KV&lt;Integer, String&gt;&gt; keyValues =
        rows.apply(
            &quot;Key&quot;,
            MapElements.into(
                    TypeDescriptors.kvs(TypeDescriptors.integers(), TypeDescriptors.strings()))
                .via(row -&gt; KV.of((Integer) row.get(&quot;group&quot;), (String) row.get(&quot;name&quot;))));

    // Group by key
    PCollection&lt;KV&lt;Integer, Iterable&lt;String&gt;&gt;&gt; groups =
        keyValues.apply(&quot;Group&quot;, GroupByKey.create());

    // Iterate and print group + values
    groups.apply(
        ParDo.of(
            new DoFn&lt;KV&lt;Integer, Iterable&lt;String&gt;&gt;, Void&gt;() {
              @ProcessElement
              public void processElement(@Element KV&lt;Integer, Iterable&lt;String&gt;&gt; kv) {
                StringBuilder sb = new StringBuilder();
                for (String name : kv.getValue()) {
                  if (sb.length() &gt; 0) {
                    sb.append(&quot;, &quot;);
                  }
                  sb.append(name);
                }
                System.out.println(&quot;Group &quot; + kv.getKey() + &quot; values: &quot; + sb);
              }
            }));

    pipeline.run();

Prints (note that the output is not ordered/guaranteed due to concurrency).

Group 2 values: BigQuery, Vertex
Group 1 values: Dataflow, Pub/Sub

huangapple
  • 本文由 发表于 2023年2月6日 17:23:06
  • 转载请务必保留本文链接:https://go.coder-hub.com/75359414.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定