英文:
How to Convert PCollection to List Collection in Apache Beam using Java?
问题
我正在使用Apache Beam来处理批量数据,为此我正在从List创建PCollection对象,但是一旦我完成管道处理,我需要将PCollection中的结果转换回List集合。有人可以帮助我吗?
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline pipeline = Pipeline.create(options);
List<Map<String, String>> inputDataList = ...//API call results.
PCollection pcollection = pipeline.apply(Create.of(inputDataList).withCoder(MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())));
pcollection = pcollection.apply()//应用一些转换。
List<Map<String, String>> outputDataList = ? //如何从pcollection对象获取列表?
在互联网上没有找到太多帮助,提到的解决方法都是对PCollection未定义的方法,例如get()、GetAllOut()等。
英文:
I am using Apache beam to process the batch data, for that purpose I am creating PCollection object from List, however once I finish executing pipeline processing, I need to convert the results in PCollection back into List collection. Can anybody please help me with this?
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline pipeline = Pipeline.create(options);
List<Map<String,String>> inputDataList = ...//API call results.
PCollection pcollection = pipeline.apply(Create.of(inputDataList).withCoder(MapCoder.of(StringUtf8Coder.of(),StringUtf8Coder.of())));
pcollection = pcollection.apply()// applying few transformation.
List<Map<String,String>> outputDataList = ? //How to get list from pcollection object?
Did not find much help on internet, whatever solutions mentioned has methods witch are undefined for PCollection. for e.g. get(), GetAllOut() etc.
答案1
得分: 1
无法将 PCollection
转换为 Collection
,因为Apache Beam只是转换和操作的逻辑计划,实际执行是由其他引擎(如Spark)完成的。
您可以将 PCollection
的结果写入一个 Sink(可以是文件、GCS、BigQuery等),然后从此Sink中读取结果。
请查看Apache Beam的执行模型。
英文:
It is not possible to convert PCollection
into a Collection
, since Apache Beam is just a logical plan of the transformations and actions but the execution is actually done by other engines like Spark.
What you can do is to let the PCollection to write the results to a Sink (could be a file, GCS, BigQuery, ..) and then you read the results out of this sink.
Checkout the execution model of Apache Beam
答案2
得分: 0
以下是您提供的内容的中文翻译:
实际上,我刚刚遇到了这个问题。与上面的回答和评论相反,可以将PCollection写入Java List,尽管有几个约束条件,我将在下面讨论。至于为什么要这样做,有几种情况下,仅使用PCollections是不足够的:
- 您想要使用旧版API,尤其是其中的类不是Java可序列化的。可序列化是在Beam集合上创建转换的要求。
- 您想要将PCollection的内容写入除文件、数据库或具有内置IO连接器的服务之外的某个目标。在我的情况下,我想要写入Java OutputStreams,这是文件、URL和Swing组件(如JTextArea)的共同分母;OutputStream及其子类不可序列化。虽然Beam可以处理像处理流数据这样复杂的事情,但却很难做一些简单的事情,比如将文本写入屏幕窗口。
为了将PCollection写入Java List(或其他Java集合),您可以使用Beam Combiner。在我的情况下,我使用ArrayAggArray组合器。写入数据是一个两阶段过程。首先,您应用组合器并运行Pipeline。然后,您在Pipeline之外处理生成的Java对象。这是这种方法的第一个约束。一旦运行了Pipeline,您将不再访问它创建的任何PCollections,因此基本上任何后处理都不能回调任何Beam对象。
请注意,您创建的Java对象可能非常大,无法并行处理(这就是为什么首先要使用Beam的原因!)。这是我这种方法的第二个约束。
关键在于,在扩展Beam Combiner时,您必须将最终累加器保存到Pipeline之外的Java对象中。我通过在组合器的构造函数中传递对该对象的引用来实现这一点。在组合器的extractOutput方法中,您必须将最终累加器的元素添加到Java对象中(您不能简单地替换它,因为Java不会将新实例传回给调用者)。
以下是一个示例:
首先,扩展Beam Combiner:
public static class JsonWriter extends ArrayAgg.ArrayAggArray<String> {
// ...(省略了一些代码)
private List<String> records;
public JsonWriter(List<String> records) {
super();
if (records == null || records.size() != 0)
throw new IllegalArgumentException("Records should be empty");
this.records = records;
}
@Override
public List<String> addInput(
List<String> accum,
String input) {
return super.addInput(accum, input);
}
@Override
public List<String> extractOutput(
List<String> accumulator) {
if (accumulator.isEmpty()) {
return null;
}
this.records.addAll(accumulator);
return accumulator;
}
public List<String> getRecords() {
return records;
}
}
调用者如下:
instance.recordsToJson();
pipeline.run().waitUntilFinish();
然后您可以在Pipeline之外使用列表,如下所示:
@Override
public MsdxWriter.Generator toJson(MsdxWriter.Generator generator) {
// ...(省略了一些代码)
recordsAsJson.forEach(record -> {
try {
if (!firstRecord.get())
generator.writeArrayValueSeparator(false);
else
firstRecord.set(false);
generator.writeRaw(record);
} catch (IOException e) {
System.err.println(e.getMessage());
e.printStackTrace();
}
});
// ...(省略了一些代码)
return generator;
}
// 在其他地方使用列表
window.println("cities:");
instance.toJson(MsdxOutputDestination.toStream(window.printStream()));
希望这对您有所帮助。
英文:
Actually, I just encountered this issue. Contrary to the answer and comments above, it is possible to write a PCollection to a Java List, although there are several constraints on doing so, as I discuss below. As far as why you would want to, there are several situations in which it is not adequate to work with PCollections:
- You want to work with a legacy API, especially one in which the classes are not Java Serializable. Serializability is a requirement of creating transformations on Beam collections.
- You want to write the contents of a PCollection to some destination other than files, databases, or one of the services that have built-in IO connectors. In my case, I want to write to Java OutputStreams, the common denominator of files, urls, and Swing components such as JTextArea; OutputStream and its subclasses are not Serializable. While Beam can do sophisticated things like handling streaming data, it is surprisingly hard to do things simple like writing text to a screen window.
In order to write a PCollection to a Java List (or other Java Collections), you can use a Beam Combiner. I my case, I use the ArrayAggArray combiner. Writing the data is a two stage process. First you apply the Combiner and run the Pipeline. Then you process the resulting Java object outside the Pipeline. That's the first constraint of this method. Once you've run the Pipeline, you no longer have access to any of the PCollections it has created, so essentially any post-processing must not call back any of the Beam objects.
Note that the Java object you create can be very large and can't be processed in parallel (that's why you want to use Beam in the first place!). That is the second constraint of my method.
The trick is that, in extending the Beam Combiner, you have to save the final accumulator to a Java object outside the Pipeline. I do this by passing a reference to the object in the constructor of the Combiner. In the extractOutput method of the Combiner, you have to add the elements of the final accumulator to the Java object (you can't simply replace it, because Java won't pass a new instance back to the caller).
Here is an example:
First, extend the Beam Combiner:
public static class JsonWriter extends ArrayAgg.ArrayAggArray<String> {
private static final long serialVersionUID = -5573525980983338238L;
/**
* Holds the JSON serialization of the records in applicable PCollection.
* This field serves as the output of the Combine operation on this PCollection.
* It should be passed as an empty list to the constructor, and is populated by the
* extractOutput method.
*/
private List<String> records;
/**
* @param records the output of the Combine operation on this PCollection;
* should be passed as an empty list to the constructor
*
*/
public JsonWriter(List<String> records) {
super();
if(records== null || records.size()!=0)
throw new IllegalArgumentException("Records should be empty");
this.records= records;
}
@Override
public List<String> addInput(
List<String> accum,
String input) {
return super.addInput(accum, input);
}
@Override
public List<String> extractOutput(
List<String> accumulator) {
if (accumulator.isEmpty()) {
return null;
}
this.records.addAll(accumulator);
return accumulator;
}
/**
* @return the records
*/
public List<String> getRecords() {
return records;
}
}//class JsonWriter
The caller looks like this:
instance.recordsToJson(); //instance has a PCollection
pipeline.run().waitUntilFinish();
where
recordsAsJson = new LinkedList<String>();
/**
* Maps the records in dataframe to their JSON representation
* and collects the resulting strings into the recordsAsJson list.
* Called as part of the Beam pipeline.
*/
public void recordsToJson() {
JsonWriter writer= new JsonWriter(recordsAsJson);
this.dataframe //a PCollection<Row>
//transform each record into a JSON string
.apply(
MapElements.into(TypeDescriptors.strings())
.via((Row record) -> MsdxInstance.recordToJson(record)))
//Collect the JSON strings into a list
.apply(
this.dataframe.getName() + "AsJson",
Combine.globally(writer));
}//recordsToJson
Then you use the list outside the pipeline like this:
/**
* Writes the JSON serialization of this dataframe to the destination specified by the generator.
* Called after the recordsToJson method and after the Beam pipeline runs.
*/
@Override
public MsdxWriter.Generator toJson(MsdxWriter.Generator generator) {
if(this.dataframe.isBounded().equals(PCollection.IsBounded.UNBOUNDED))
throw new IllegalArgumentException("Unbounded collection of records");
AtomicBoolean firstRecord= new AtomicBoolean(true);
try {
generator.writeStartArray(false);
recordsAsJson.forEach(record -> {
try {
if(!firstRecord.get())
generator.writeArrayValueSeparator(false);
else
firstRecord.set(false);
generator.writeRaw(record);
} catch (IOException e) {
System.err.println(e.getMessage());
e.printStackTrace();
}
});
generator.writeEndArray(false);
} catch (IOException e) {
System.err.println(e.getMessage());
e.printStackTrace();
}
return generator;
}//toJson
And
window.println("cities:");
instance.toJson(MsdxOutputDestination.toStream(window.printStream()));
I hope this helps.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论