英文:
Iterate Keys with Values for Beam pipeline
问题
在应用 .apply(GroupByKey.create())
后,我得到类似 PCollection<KV<Integer, Iterable
例如:PCollection<KV<1, Iterable
PCollection<KV<2, Iterable
这些键是动态值。我需要对PCollection中存在的每个键进行迭代。
英文:
After applying .apply(GroupByKey.create()) I am getting values like PCollection<KV<Integer,Iterable<String>>. Can you suggest how to apply further transforms for each key.
Ex: PCollection<KV<1,Iterable<String>>
PCollection<KV<2,Iterable<String>>
The keys are dynamic values. I need to iterate for each Key Present in the PCollection.
答案1
得分: 3
你应该能够使用DoFn / ParDo来迭代这样的可迭代对象。
我草拟了一个快速的示例来展示如何实现这一点。
// 创建样本行
PCollection<TableRow> rows =
pipeline
.apply(
Create.of(
new TableRow().set("group", 1).set("name", "Dataflow"),
new TableRow().set("group", 1).set("name", "Pub/Sub"),
new TableRow().set("group", 2).set("name", "BigQuery"),
new TableRow().set("group", 2).set("name", "Vertex")))
.setCoder(TableRowJsonCoder.of());
// 转换为 <group, name> 的 KV
PCollection<KV<Integer, String>> keyValues =
rows.apply(
"Key",
MapElements.into(
TypeDescriptors.kvs(TypeDescriptors.integers(), TypeDescriptors.strings()))
.via(row -> KV.of((Integer) row.get("group"), (String) row.get("name"))));
// 按键分组
PCollection<KV<Integer, Iterable<String>>> groups =
keyValues.apply("Group", GroupByKey.create());
// 迭代并打印组 + 值
groups.apply(
ParDo.of(
new DoFn<KV<Integer, Iterable<String>>, Void>() {
@ProcessElement
public void processElement(@Element KV<Integer, Iterable<String>> kv) {
StringBuilder sb = new StringBuilder();
for (String name : kv.getValue()) {
if (sb.length() > 0) {
sb.append(", ");
}
sb.append(name);
}
System.out.println("Group " + kv.getKey() + " values: " + sb);
}
}));
pipeline.run();
输出(请注意,由于并发性,输出结果不是有序/保证的)。
Group 2 values: BigQuery, Vertex
Group 1 values: Dataflow, Pub/Sub
英文:
You should be able to use a DoFn / ParDo to iterate over such iterable.
I drafted a quick example to show how this can be done.
// Create sample rows
PCollection<TableRow> rows =
pipeline
.apply(
Create.of(
new TableRow().set("group", 1).set("name", "Dataflow"),
new TableRow().set("group", 1).set("name", "Pub/Sub"),
new TableRow().set("group", 2).set("name", "BigQuery"),
new TableRow().set("group", 2).set("name", "Vertex")))
.setCoder(TableRowJsonCoder.of());
// Convert into a KV of <group, name>
PCollection<KV<Integer, String>> keyValues =
rows.apply(
"Key",
MapElements.into(
TypeDescriptors.kvs(TypeDescriptors.integers(), TypeDescriptors.strings()))
.via(row -> KV.of((Integer) row.get("group"), (String) row.get("name"))));
// Group by key
PCollection<KV<Integer, Iterable<String>>> groups =
keyValues.apply("Group", GroupByKey.create());
// Iterate and print group + values
groups.apply(
ParDo.of(
new DoFn<KV<Integer, Iterable<String>>, Void>() {
@ProcessElement
public void processElement(@Element KV<Integer, Iterable<String>> kv) {
StringBuilder sb = new StringBuilder();
for (String name : kv.getValue()) {
if (sb.length() > 0) {
sb.append(", ");
}
sb.append(name);
}
System.out.println("Group " + kv.getKey() + " values: " + sb);
}
}));
pipeline.run();
Prints (note that the output is not ordered/guaranteed due to concurrency).
Group 2 values: BigQuery, Vertex
Group 1 values: Dataflow, Pub/Sub
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论