英文:
Trying to use System.out as a task in an RDD
问题
以下是翻译好的内容:
public class Main {
public static void main(String[] args) {
Logger.getLogger("org.apache").setLevel(Level.WARN);
List<Integer> inputData = new ArrayList<>();
inputData.add(25);
SparkConf conf = new SparkConf().setAppName("startingSpark").setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<Integer> myRDD = sc.parallelize(inputData);
Integer result = myRDD.reduce((x, y) -> x + y);
myRDD.foreach(System.out::println);
System.out.println(result);
sc.close();
}
}
错误堆栈:
Exception in thread "main" org.apache.spark.SparkException: 任务不可序列化...
at com.virtualpairprogrammers.Main.main(Main.java:26)
Caused by: java.io.NotSerializableException: java.io.PrintStream
序列化堆栈:
- 不可序列化的对象 (类: java.io.PrintStream, 值: java.io.PrintStream@11a82d0f)
- 数组元素 (索引: 0)
- 数组 (类 [Ljava.lang.Object;, 大小 1)...
英文:
I am currently just starting out to learn Apache Spark, and had code that I can't quite figure out why is not compiling. It says the task that I am sending into the myRDD forEach is not serializable however a tutorial I am watching did a similar thing as well. Any Ideas or clues would be greatly appreciated.
public class Main {
public static void main(String[] args) {
Logger.getLogger("org.apache").setLevel(Level.WARN);
List<Integer> inputData = new ArrayList<>();
inputData.add(25);
SparkConf conf = new SparkConf().setAppName("startingSpark").setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<Integer> myRDD = sc.parallelize(inputData);
Integer result = myRDD.reduce((x, y) -> x + y);
myRDD.foreach( System.out::println );
System.out.println(result);
sc.close();
}
}
Stack Trace:
Exception in thread "main" org.apache.spark.SparkException: Task not serializable...
at com.virtualpairprogrammers.Main.main(Main.java:26)
Caused by: java.io.NotSerializableException: java.io.PrintStream
Serialization stack:
- object not serializable (class: java.io.PrintStream, value: java.io.PrintStream@11a82d0f)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 1)...
答案1
得分: 3
不要使用Lambda引用。它将尝试将函数 println(..)
从 PrintStream
传递给执行器。请记住,您传递或放入Spark闭包(在map/filter/reduce等内部)的所有方法都必须被序列化。由于 println(..)
是 PrintStream
的一部分,所以 PrintStream
类必须被序列化。
请按照以下方式传递匿名函数-
myRDD.foreach(integer -> System.out.println(integer));
完整示例
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.util.ArrayList;
import java.util.List;
public class Test63321956 {
public static void main(String[] args) {
Logger.getLogger("org.apache").setLevel(Level.WARN);
List<Integer> inputData = new ArrayList<>();
inputData.add(25);
SparkConf conf = new SparkConf().setAppName("startingSpark").setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<Integer> myRDD = sc.parallelize(inputData);
Integer result = myRDD.reduce(Integer::sum);
myRDD.collect().forEach(System.out::println);
myRDD.foreach(integer -> System.out.println(integer));
System.out.println(result);
/**
* 25
* 25
* 25
*/
sc.close();
}
}
英文:
Don't use Lambda reference. It will try to pass the function println(..)
of PrintStream
to executors. Remember all the methods that you pass or put in spark closure (inside map/filter/reduce etc) must be serialised. Since println(..)
is part of PrintStream
, the class PrintStream
must be serialized.
Pass an anonymous function as below-
myRDD.foreach(integer -> System.out.println(integer));
Full Example
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.util.ArrayList;
import java.util.List;
public class Test63321956 {
public static void main(String[] args) {
Logger.getLogger("org.apache").setLevel(Level.WARN);
List<Integer> inputData = new ArrayList<>();
inputData.add(25);
SparkConf conf = new SparkConf().setAppName("startingSpark").setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<Integer> myRDD = sc.parallelize(inputData);
Integer result = myRDD.reduce(Integer::sum);
myRDD.collect().forEach( System.out::println );
myRDD.foreach(integer -> System.out.println(integer));
System.out.println(result);
/**
* 25
* 25
* 25
*/
sc.close();
}
}
答案2
得分: 0
如果您的目的只是打印结果,您可以使用collect()
方法将您的RDD转换为Java列表。这将允许您应用Java的forEach()
方法。
示例:
myRDD.collect().forEach(System.out::println);
英文:
If your porpose is just to print out the result, you can convert your RDD into a java list using the method collect(). this will allow you to apply the java method forEach().
Example:
myRDD.collect().forEach( System.out::println );
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论