英文:
Checkpoint with spark file streaming in java
问题
我想在Spark文件流应用程序中实现检查点,以便在任何情况下停止/终止我的Spark流应用程序时,可以处理所有未处理的Hadoop文件。我正在按照这个:流处理编程指南,但找不到JavaStreamingContextFactory。请帮助我应该怎么做。
我的代码是:
public class StartAppWithCheckPoint {
public static void main(String[] args) {
try {
String filePath = "hdfs://Master:9000/mmi_traffic/listenerTransaction/2020/*/*/*/";
String checkpointDirectory = "hdfs://Mongo1:9000/probeAnalysis/checkpoint";
SparkSession sparkSession = JavaSparkSessionSingleton.getInstance();
JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {
@Override public JavaStreamingContext create() {
SparkConf sparkConf = new SparkConf().setAppName("ProbeAnalysis");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.seconds(300));
JavaDStream<String> lines = jssc.textFileStream(filePath).cache();
jssc.checkpoint(checkpointDirectory);
return jssc;
}
};
JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, contextFactory);
context.start();
context.awaitTermination();
context.close();
sparkSession.close();
} catch(Exception e) {
e.printStackTrace();
}
}
}
英文:
I want to implement checkpoint with spark file streaming application to process all unprocessed files from hadoop if in any case my spark streaming application stop/terminates. I am following this : streaming programming guide, but not found JavaStreamingContextFactory. Please help me what should I do.
My Code is
public class StartAppWithCheckPoint {
public static void main(String[] args) {
try {
String filePath = "hdfs://Master:9000/mmi_traffic/listenerTransaction/2020/*/*/*/";
String checkpointDirectory = "hdfs://Mongo1:9000/probeAnalysis/checkpoint";
SparkSession sparkSession = JavaSparkSessionSingleton.getInstance();
JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {
@Override public JavaStreamingContext create() {
SparkConf sparkConf = new SparkConf().setAppName("ProbeAnalysis");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.seconds(300));
JavaDStream<String> lines = jssc.textFileStream(filePath).cache();
jssc.checkpoint(checkpointDirectory);
return jssc;
}
};
JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, contextFactory);
context.start();
context.awaitTermination();
context.close();
sparkSession.close();
} catch(Exception e) {
e.printStackTrace();
}
}
}
答案1
得分: 1
你必须使用Checkpointing
用于Checkpointing的方法是stateful转换,可以使用updateStateByKey
或者reduceByKeyAndWindow
。在spark-examples中提供了大量的示例,同时在GitHub上的预构建spark和spark源代码中也提供了这些示例。对于你的特定情况,可以查看JavaStatefulNetworkWordCount.java。
英文:
You must use Checkpointing
For checkpointing use stateful transformations either updateStateByKey
or reduceByKeyAndWindow
. There are a plenty of examples in spark-examples provided along with prebuild spark and spark source in git-hub. For your specific, see JavaStatefulNetworkWordCount.java;
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论