Apache Spark + Java: “java.lang.AssertionError: assertion failed” in ExpressionEncoder

huangapple go评论79阅读模式
英文:

Apache Spark + Java: "java.lang.AssertionError: assertion failed" in ExpressionEncoder

问题

以下是已翻译的内容:

我尝试在数据集上使用以下代码调用 groupByKey:

SparkSession SPARK_SESSION = new SparkSession(new SparkContext("local", "app"));
JavaSparkContext JAVA_SPARK_CONTEXT = new JavaSparkContext(SPARK_SESSION.sparkContext());

@Data
@NoArgsConstructor
@AllArgsConstructor
class Chunk implements Serializable {
    private Integer id;
    private String letters;
}

class JavaAggregator extends Aggregator<Chunk, String, String> {

    @Override
    public String zero() {
        return "";
    }

    @Override
    public String reduce(String b, Chunk a) {
        return b + a.getLetters();
    }

    @Override
    public String merge(String b1, String b2) {
        return b1 + b2;
    }

    @Override
    public String finish(String reduction) {
        return reduction;
    }

    @Override
    public Encoder<String> bufferEncoder() {
        return Encoders.bean(String.class);
    }

    @Override
    public Encoder<String> outputEncoder() {
        return Encoders.bean(String.class);
    }
}

List<Chunk> chunkList = List.of(
        new Chunk(1, "a"), new Chunk(2, "1"), new Chunk(3, "-*-"),
        new Chunk(1, "b"), new Chunk(2, "2"), new Chunk(3, "-**-"),
        new Chunk(1, "c"), new Chunk(2, "3"), new Chunk(3, "-***-"));

Dataset<Row> df = SPARK_SESSION.createDataFrame(JAVA_SPARK_CONTEXT.parallelize(chunkList), Chunk.class);
Dataset<Chunk> ds = df.as(Encoders.bean(Chunk.class));

KeyValueGroupedDataset<Integer, Chunk> grouped = ds.groupByKey((Function1<Chunk, Integer>) v -> v.getId(), Encoders.bean(Integer.class));

但我得到了一个异常,异常信息如下:

java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:208)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.javaBean(ExpressionEncoder.scala:87)
at org.apache.spark.sql.Encoders$.bean(Encoders.scala:142)
at org.apache.spark.sql.Encoders.bean(Encoders.scala)

我并不是 Scala 内部的专家,很难确定代码出了什么问题,因为异常是由一些已编译的代码引发的,而断言信息 "assertion failed" 也没有提供很有帮助的信息。但也许在我的代码中做了一些基本错误导致了这个异常?

英文:

I try to call groupByKey on a Dataset using below code:

    SparkSession SPARK_SESSION = new SparkSession(new SparkContext(&quot;local&quot;, &quot;app&quot;));
JavaSparkContext JAVA_SPARK_CONTEXT = new JavaSparkContext(SPARK_SESSION.sparkContext());
@Data
@NoArgsConstructor
@AllArgsConstructor
class Chunk implements Serializable {
private Integer id;
private String letters;
}
class JavaAggregator extends Aggregator&lt;Chunk, String, String&gt; {
@Override
public String zero() {
return &quot;&quot;;
}
@Override
public String reduce(String b, Chunk a) {
return b + a.getLetters();
}
@Override
public String merge(String b1, String b2) {
return b1 + b2;
}
@Override
public String finish(String reduction) {
return reduction;
}
@Override
public Encoder&lt;String&gt; bufferEncoder() {
return Encoders.bean(String.class);
}
@Override
public Encoder&lt;String&gt; outputEncoder() {
return Encoders.bean(String.class);
}
}
List&lt;Chunk&gt; chunkList = List.of(
new Chunk(1, &quot;a&quot;), new Chunk(2, &quot;1&quot;), new Chunk(3, &quot;-*-&quot;),
new Chunk(1, &quot;b&quot;), new Chunk(2, &quot;2&quot;), new Chunk(3, &quot;-**-&quot;),
new Chunk(1, &quot;c&quot;), new Chunk(2, &quot;3&quot;), new Chunk(3, &quot;-***-&quot;));
Dataset&lt;Row&gt; df = SPARK_SESSION.createDataFrame(JAVA_SPARK_CONTEXT.parallelize(chunkList), Chunk.class);
Dataset&lt;Chunk&gt; ds = df.as(Encoders.bean(Chunk.class));
KeyValueGroupedDataset&lt;Integer, Chunk&gt; grouped = ds.groupByKey((Function1&lt;Chunk, Integer&gt;)  v -&gt; v.getId(), Encoders.bean(Integer.class));

But I get Exception which says
java.lang.AssertionError: assertion failed

at scala.Predef$.assert(Predef.scala:208)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.javaBean(ExpressionEncoder.scala:87)
at org.apache.spark.sql.Encoders$.bean(Encoders.scala:142)
at org.apache.spark.sql.Encoders.bean(Encoders.scala)

I am not an expert in scala internals and it is hard for me to say what is wrong with the code as the exception is thrown by some compiled code and assertion message "assertion failed" is not very helpful. But maybe there is something fundamentally wrong what I have done in my code that causes this exception?

答案1

得分: 4

我发现问题出在编码器的使用上。我应该使用 Encoders.INT(),而不是 Encoders.bean(Integer.class)。

英文:

I found out that the problem was with usage of Encoders. I should have used Encoders.INT() instead of Encoders.bean(Integer.class)

huangapple
  • 本文由 发表于 2020年4月6日 04:24:00
  • 转载请务必保留本文链接:https://go.coder-hub.com/61049006.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定