一.checkpoint 原理
http://spark.apache.org/docs/1.6.1/streaming-programming-guide.html#transformations-on-dstreams
(1)Metadatacheckpointing
将流式计算的信息保存到具备容错性的存储上如HDFS,MetadataCheckpointing适用于当streaming应用程序Driver所在的节点出错时能够恢复,元数据包括:
Configuration(配置信息)-创建streaming应用程序的配置信息
DStreamoperations -在streaming应用程序中定义的DStreaming操作
Incompletebatches -在列队中没有处理完的作业
(2)Datacheckpointing将生成的RDD保存到外部可靠的存储当中
具体来说,metadatacheckpointing主要还是从drvier失败中恢复,而DataCheckpoing用于对有状态的transformation操作进行checkpointing
在调用spark命令的时候
改代码后可以执行新代码的逻辑
但是修改参数不能执行新的参数逻辑 因为checkpoint保存了 参数信息 新传入的参数无效
二:checkpoint 单次jar包容错和 再次启动时候容错
1.单次jar包容错
如果只使用checkpoint,那么checkpoint作用范围在一个jar包运行的过程中,对长期运行过程中可能出现的各种错误进行备份处理;
并不是作用在jar包挂了,再重新启动jar的时候的数据恢复
val ssc = new StreamingContext(sc, Seconds(10))
ssc.checkpoint(ConfigurationUtil.getProperty(Constants.SPARK_CHECKPOINT_ACTIVE));
2.再次启动jar包容错:
要想使再次启动jar包的时候读取备份的数据,要使用下面的方法创建 ssc
如果使用此方法再次启动程序的时候,sparlstreaming的时间间隔等参数信息不能改变,只能是第一次运行时候的时间间隔(删除checkpoint目录才能重新设置)
val ssc = StreamingContext.getOrCreate(ConfigurationUtil.getProperty(Constants.SPARK_CHECKPOINT_ACTIVE), recently _);
recently _{
ssc.checkpoint(ConfigurationUtil.getProperty(Constants.SPARK_CHECKPOINT_ACTIVE));
}
原因分析:在sparkStreaming main方法 创建的 hiveContext 无法保存在checkpoint目录,需要在每个算子里根据rdd重新创建
错误解决方法一:在transform方法里,每次都新建新的hiveContext
def fiferGameidfromhive(sqlContext: SQLContext, dsKey2imie: DStream[(String, String)]): DStream[(String, (String, Boolean))] = {
dsKey2imie.transform(rdd => {
val hiveContext2 = new HiveContext(rdd.sparkContext);
val rddsql = hiveContext2.sql("select game_id from yyft.ods_publish_game").rdd.map(x => {
x.getInt(0).toString;
}).map((_, true)).distinct();
rdd.join(rddsql);
});
}
Exception in thread "JobGenerator"
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "JobGenerator"
16/09/03 13:50:28 INFO JobScheduler: Finished job streaming job 1472910540000 ms.0 from job set of time 1472910540000 ms
Exception in thread "JobScheduler"
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "JobScheduler"
Exception in thread "streaming-job-executor-0"
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "streaming-job-executor-0"
16/09/03 13:50:51 WARN ReceiverSupervisorImpl: Restarting receiver with delay 2000 ms: Error handling message; restarting receiver
java.lang.OutOfMemoryError: PermGen space
原因分析:transform,map,reducebykry 等算子内不要进行耗时和消耗内存(多次创建hive对象)的操作
正确解决方法:写一个单列模式(object是单例对象,但是hiveContext 是一个类并不是单例对象)
def fiferGameidfromhive(sqlContext: SQLContext, dsKey2imie: DStream[(String, String)]): DStream[(String, (String, Boolean))] = {
dsKey2imie.transform(rdd => {
val hiveContext = HiveContextSingleton.getInstance(rdd.sparkContext)
val rddsql = hiveContext .sql("select game_id from yyft.ods_publish_game").rdd.map(x => {
x.getInt(0).toString;
}).map((_, true)).distinct();
rdd.join(rddsql);
});
}
object HiveContextSingleton {
@transient private var instance: HiveContext = _
def getInstance(sparkContext: SparkContext): HiveContext = {
if (instance == null) {
instance = new HiveContext(sparkContext)
}
instance
}
}