尝试将System.out用作RDD中的任务

huangapple go评论80阅读模式
英文:

Trying to use System.out as a task in an RDD

问题

以下是翻译好的内容:

public class Main {
    public static void main(String[] args) {
        Logger.getLogger("org.apache").setLevel(Level.WARN);
        List<Integer> inputData = new ArrayList<>();

        inputData.add(25);

        SparkConf conf = new SparkConf().setAppName("startingSpark").setMaster("local[*]");
        JavaSparkContext sc = new JavaSparkContext(conf);

        JavaRDD<Integer> myRDD = sc.parallelize(inputData);
        Integer result = myRDD.reduce((x, y) -> x + y);

        myRDD.foreach(System.out::println);
        System.out.println(result);

        sc.close();
    }
}

错误堆栈:

Exception in thread "main" org.apache.spark.SparkException: 任务不可序列化...
	at com.virtualpairprogrammers.Main.main(Main.java:26)
Caused by: java.io.NotSerializableException: java.io.PrintStream
序列化堆栈:
	- 不可序列化的对象 (: java.io.PrintStream,: java.io.PrintStream@11a82d0f)
	- 数组元素 (索引: 0)
	- 数组 ([Ljava.lang.Object;, 大小 1)...
英文:

I am currently just starting out to learn Apache Spark, and had code that I can't quite figure out why is not compiling. It says the task that I am sending into the myRDD forEach is not serializable however a tutorial I am watching did a similar thing as well. Any Ideas or clues would be greatly appreciated.

public class Main {
    public static void main(String[] args) {
        Logger.getLogger(&quot;org.apache&quot;).setLevel(Level.WARN);
        List&lt;Integer&gt; inputData = new ArrayList&lt;&gt;();

        inputData.add(25);


        SparkConf conf = new SparkConf().setAppName(&quot;startingSpark&quot;).setMaster(&quot;local[*]&quot;);
        JavaSparkContext sc = new JavaSparkContext(conf);

        JavaRDD&lt;Integer&gt; myRDD = sc.parallelize(inputData);
        Integer result = myRDD.reduce((x, y) -&gt; x + y);

        myRDD.foreach( System.out::println );
        System.out.println(result);

        sc.close();

    }
}

Stack Trace:

Exception in thread &quot;main&quot; org.apache.spark.SparkException: Task not serializable...
	at com.virtualpairprogrammers.Main.main(Main.java:26)
Caused by: java.io.NotSerializableException: java.io.PrintStream
Serialization stack:
	- object not serializable (class: java.io.PrintStream, value: java.io.PrintStream@11a82d0f)
	- element of array (index: 0)
	- array (class [Ljava.lang.Object;, size 1)...

答案1

得分: 3

不要使用Lambda引用。它将尝试将函数 println(..)PrintStream 传递给执行器。请记住,您传递或放入Spark闭包(在map/filter/reduce等内部)的所有方法都必须被序列化。由于 println(..)PrintStream 的一部分,所以 PrintStream 类必须被序列化。

请按照以下方式传递匿名函数-

myRDD.foreach(integer -> System.out.println(integer));

完整示例

import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import java.util.ArrayList;
import java.util.List;

public class Test63321956 {
    public static void main(String[] args) {
        Logger.getLogger("org.apache").setLevel(Level.WARN);
        List<Integer> inputData = new ArrayList<>();

        inputData.add(25);

        SparkConf conf = new SparkConf().setAppName("startingSpark").setMaster("local[*]");
        JavaSparkContext sc = new JavaSparkContext(conf);

        JavaRDD<Integer> myRDD = sc.parallelize(inputData);
        Integer result = myRDD.reduce(Integer::sum);

        myRDD.collect().forEach(System.out::println);
        myRDD.foreach(integer -> System.out.println(integer));
        System.out.println(result);
        /**
         * 25
         * 25
         * 25
         */

        sc.close();
    }
}
英文:

Don't use Lambda reference. It will try to pass the function println(..) of PrintStream to executors. Remember all the methods that you pass or put in spark closure (inside map/filter/reduce etc) must be serialised. Since println(..) is part of PrintStream, the class PrintStream must be serialized.

Pass an anonymous function as below-

myRDD.foreach(integer -&gt; System.out.println(integer));

Full Example


import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import java.util.ArrayList;
import java.util.List;

public class Test63321956 {
    public static void main(String[] args) {
        Logger.getLogger(&quot;org.apache&quot;).setLevel(Level.WARN);
        List&lt;Integer&gt; inputData = new ArrayList&lt;&gt;();

        inputData.add(25);


        SparkConf conf = new SparkConf().setAppName(&quot;startingSpark&quot;).setMaster(&quot;local[*]&quot;);
        JavaSparkContext sc = new JavaSparkContext(conf);

        JavaRDD&lt;Integer&gt; myRDD = sc.parallelize(inputData);
        Integer result = myRDD.reduce(Integer::sum);

        myRDD.collect().forEach( System.out::println );
        myRDD.foreach(integer -&gt; System.out.println(integer));
        System.out.println(result);
        /**
         * 25
         * 25
         * 25
         */

        sc.close();

    }
}

答案2

得分: 0

如果您的目的只是打印结果,您可以使用collect()方法将您的RDD转换为Java列表。这将允许您应用Java的forEach()方法。

示例:

myRDD.collect().forEach(System.out::println);
英文:

If your porpose is just to print out the result, you can convert your RDD into a java list using the method collect(). this will allow you to apply the java method forEach().

Example:

myRDD.collect().forEach( System.out::println );

huangapple
  • 本文由 发表于 2020年8月9日 10:28:00
  • 转载请务必保留本文链接:https://go.coder-hub.com/63321956.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定