使用Java进行Spark文件流检查点。

huangapple go评论67阅读模式
英文:

Checkpoint with spark file streaming in java

问题

我想在Spark文件流应用程序中实现检查点,以便在任何情况下停止/终止我的Spark流应用程序时,可以处理所有未处理的Hadoop文件。我正在按照这个:流处理编程指南,但找不到JavaStreamingContextFactory。请帮助我应该怎么做。

我的代码是:

public class StartAppWithCheckPoint {

    public static void main(String[] args) {
        
        try {
            
            String filePath = "hdfs://Master:9000/mmi_traffic/listenerTransaction/2020/*/*/*/";    
            String checkpointDirectory = "hdfs://Mongo1:9000/probeAnalysis/checkpoint";
            SparkSession sparkSession = JavaSparkSessionSingleton.getInstance();

            JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {
                  @Override public JavaStreamingContext create() {
                      
                    SparkConf sparkConf = new SparkConf().setAppName("ProbeAnalysis");
                    JavaSparkContext sc = new JavaSparkContext(sparkConf);  
                    JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.seconds(300));
                    JavaDStream<String> lines = jssc.textFileStream(filePath).cache();
                    
                    jssc.checkpoint(checkpointDirectory);
                    return jssc;
                  }
                };
                
            JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, contextFactory);
            
            context.start();
            context.awaitTermination();
            context.close();
            sparkSession.close();
            
        } catch(Exception e) {
            e.printStackTrace();
        }    
    }
}
英文:

I want to implement checkpoint with spark file streaming application to process all unprocessed files from hadoop if in any case my spark streaming application stop/terminates. I am following this : streaming programming guide, but not found JavaStreamingContextFactory. Please help me what should I do.

My Code is

public class StartAppWithCheckPoint {

	public static void main(String[] args) {
		
		try {
			
			String filePath = &quot;hdfs://Master:9000/mmi_traffic/listenerTransaction/2020/*/*/*/&quot;;	
			String checkpointDirectory = &quot;hdfs://Mongo1:9000/probeAnalysis/checkpoint&quot;;
			SparkSession sparkSession = JavaSparkSessionSingleton.getInstance();

			JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {
				  @Override public JavaStreamingContext create() {
					  
					SparkConf sparkConf = new SparkConf().setAppName(&quot;ProbeAnalysis&quot;);
					JavaSparkContext sc = new JavaSparkContext(sparkConf);  
				    JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.seconds(300));
				    JavaDStream&lt;String&gt; lines = jssc.textFileStream(filePath).cache();
				    
				    jssc.checkpoint(checkpointDirectory);
				    return jssc;
				  }
				};
				
			JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, contextFactory);
			
			context.start();
			context.awaitTermination();
			context.close();
			sparkSession.close();
			
		} catch(Exception e) {
			e.printStackTrace();
		}	
	}
}

答案1

得分: 1

你必须使用Checkpointing

用于Checkpointing的方法是stateful转换,可以使用updateStateByKey或者reduceByKeyAndWindow。在spark-examples中提供了大量的示例,同时在GitHub上的预构建spark和spark源代码中也提供了这些示例。对于你的特定情况,可以查看JavaStatefulNetworkWordCount.java

英文:

You must use Checkpointing

For checkpointing use stateful transformations either updateStateByKey or reduceByKeyAndWindow. There are a plenty of examples in spark-examples provided along with prebuild spark and spark source in git-hub. For your specific, see JavaStatefulNetworkWordCount.java;

huangapple
  • 本文由 发表于 2020年9月21日 22:15:27
  • 转载请务必保留本文链接:https://go.coder-hub.com/63994113.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定