英文:
Apache Spark + Java: "java.lang.AssertionError: assertion failed" in ExpressionEncoder
问题
以下是已翻译的内容:
我尝试在数据集上使用以下代码调用 groupByKey:
SparkSession SPARK_SESSION = new SparkSession(new SparkContext("local", "app"));
JavaSparkContext JAVA_SPARK_CONTEXT = new JavaSparkContext(SPARK_SESSION.sparkContext());
@Data
@NoArgsConstructor
@AllArgsConstructor
class Chunk implements Serializable {
private Integer id;
private String letters;
}
class JavaAggregator extends Aggregator<Chunk, String, String> {
@Override
public String zero() {
return "";
}
@Override
public String reduce(String b, Chunk a) {
return b + a.getLetters();
}
@Override
public String merge(String b1, String b2) {
return b1 + b2;
}
@Override
public String finish(String reduction) {
return reduction;
}
@Override
public Encoder<String> bufferEncoder() {
return Encoders.bean(String.class);
}
@Override
public Encoder<String> outputEncoder() {
return Encoders.bean(String.class);
}
}
List<Chunk> chunkList = List.of(
new Chunk(1, "a"), new Chunk(2, "1"), new Chunk(3, "-*-"),
new Chunk(1, "b"), new Chunk(2, "2"), new Chunk(3, "-**-"),
new Chunk(1, "c"), new Chunk(2, "3"), new Chunk(3, "-***-"));
Dataset<Row> df = SPARK_SESSION.createDataFrame(JAVA_SPARK_CONTEXT.parallelize(chunkList), Chunk.class);
Dataset<Chunk> ds = df.as(Encoders.bean(Chunk.class));
KeyValueGroupedDataset<Integer, Chunk> grouped = ds.groupByKey((Function1<Chunk, Integer>) v -> v.getId(), Encoders.bean(Integer.class));
但我得到了一个异常,异常信息如下:
java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:208)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.javaBean(ExpressionEncoder.scala:87)
at org.apache.spark.sql.Encoders$.bean(Encoders.scala:142)
at org.apache.spark.sql.Encoders.bean(Encoders.scala)
我并不是 Scala 内部的专家,很难确定代码出了什么问题,因为异常是由一些已编译的代码引发的,而断言信息 "assertion failed" 也没有提供很有帮助的信息。但也许在我的代码中做了一些基本错误导致了这个异常?
英文:
I try to call groupByKey on a Dataset using below code:
SparkSession SPARK_SESSION = new SparkSession(new SparkContext("local", "app"));
JavaSparkContext JAVA_SPARK_CONTEXT = new JavaSparkContext(SPARK_SESSION.sparkContext());
@Data
@NoArgsConstructor
@AllArgsConstructor
class Chunk implements Serializable {
private Integer id;
private String letters;
}
class JavaAggregator extends Aggregator<Chunk, String, String> {
@Override
public String zero() {
return "";
}
@Override
public String reduce(String b, Chunk a) {
return b + a.getLetters();
}
@Override
public String merge(String b1, String b2) {
return b1 + b2;
}
@Override
public String finish(String reduction) {
return reduction;
}
@Override
public Encoder<String> bufferEncoder() {
return Encoders.bean(String.class);
}
@Override
public Encoder<String> outputEncoder() {
return Encoders.bean(String.class);
}
}
List<Chunk> chunkList = List.of(
new Chunk(1, "a"), new Chunk(2, "1"), new Chunk(3, "-*-"),
new Chunk(1, "b"), new Chunk(2, "2"), new Chunk(3, "-**-"),
new Chunk(1, "c"), new Chunk(2, "3"), new Chunk(3, "-***-"));
Dataset<Row> df = SPARK_SESSION.createDataFrame(JAVA_SPARK_CONTEXT.parallelize(chunkList), Chunk.class);
Dataset<Chunk> ds = df.as(Encoders.bean(Chunk.class));
KeyValueGroupedDataset<Integer, Chunk> grouped = ds.groupByKey((Function1<Chunk, Integer>) v -> v.getId(), Encoders.bean(Integer.class));
But I get Exception which says
java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:208)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.javaBean(ExpressionEncoder.scala:87)
at org.apache.spark.sql.Encoders$.bean(Encoders.scala:142)
at org.apache.spark.sql.Encoders.bean(Encoders.scala)
I am not an expert in scala internals and it is hard for me to say what is wrong with the code as the exception is thrown by some compiled code and assertion message "assertion failed" is not very helpful. But maybe there is something fundamentally wrong what I have done in my code that causes this exception?
答案1
得分: 4
我发现问题出在编码器的使用上。我应该使用 Encoders.INT(),而不是 Encoders.bean(Integer.class)。
英文:
I found out that the problem was with usage of Encoders. I should have used Encoders.INT() instead of Encoders.bean(Integer.class)
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论