spark的checkpoint

xiaoxiao2021-02-28  47

一.checkpoint 原理

http://spark.apache.org/docs/1.6.1/streaming-programming-guide.html#transformations-on-dstreams

(1)Metadatacheckpointing

将流式计算的信息保存到具备容错性的存储上如HDFS,MetadataCheckpointing适用于当streaming应用程序Driver所在的节点出错时能够恢复,元数据包括:

Configuration(配置信息)-创建streaming应用程序的配置信息

DStreamoperations -在streaming应用程序中定义的DStreaming操作

Incompletebatches -在列队中没有处理完的作业

 

(2)Datacheckpointing将生成的RDD保存到外部可靠的存储当中

具体来说,metadatacheckpointing主要还是从drvier失败中恢复,而DataCheckpoing用于对有状态的transformation操作进行checkpointing

 

在调用spark命令的时候

改代码后可以执行新代码的逻辑

但是修改参数不能执行新的参数逻辑 因为checkpoint保存了  参数信息  新传入的参数无效

 

二:checkpoint  单次jar包容错和 再次启动时候容错

1.单次jar包容错

如果只使用checkpoint,那么checkpoint作用范围在一个jar包运行的过程中,对长期运行过程中可能出现的各种错误进行备份处理;

并不是作用在jar包挂了,再重新启动jar的时候的数据恢复

   val ssc = new StreamingContext(sc, Seconds(10))

   ssc.checkpoint(ConfigurationUtil.getProperty(Constants.SPARK_CHECKPOINT_ACTIVE));

 

2.再次启动jar包容错:

要想使再次启动jar包的时候读取备份的数据,要使用下面的方法创建 ssc

如果使用此方法再次启动程序的时候,sparlstreaming的时间间隔等参数信息不能改变,只能是第一次运行时候的时间间隔(删除checkpoint目录才能重新设置)

 val ssc = StreamingContext.getOrCreate(ConfigurationUtil.getProperty(Constants.SPARK_CHECKPOINT_ACTIVE), recently _);

recently _{

  ssc.checkpoint(ConfigurationUtil.getProperty(Constants.SPARK_CHECKPOINT_ACTIVE));

}

 

3.遇到的问题:sparkStreaming使用的时候 ,多次启用一个jar包,checkpoint目录无法正确读取,导致报一些乱七八糟的问题 

原因分析:在sparkStreaming  main方法 创建的 hiveContext 无法保存在checkpoint目录,需要在每个算子里根据rdd重新创建

 

错误解决方法一:在transform方法里,每次都新建新的hiveContext

  def fiferGameidfromhive(sqlContext: SQLContext, dsKey2imie: DStream[(String, String)]): DStream[(String, (String, Boolean))] = {

    dsKey2imie.transform(rdd => {

      val hiveContext2 = new HiveContext(rdd.sparkContext);

      val rddsql = hiveContext2.sql("select game_id from yyft.ods_publish_game").rdd.map(x => {

        x.getInt(0).toString;

      }).map((_, true)).distinct();

      rdd.join(rddsql);

    });

  }

Exception in thread "JobGenerator"

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "JobGenerator"

16/09/03 13:50:28 INFO JobScheduler: Finished job streaming job 1472910540000 ms.0 from job set of time 1472910540000 ms

Exception in thread "JobScheduler"

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "JobScheduler"

Exception in thread "streaming-job-executor-0"

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "streaming-job-executor-0"

16/09/03 13:50:51 WARN ReceiverSupervisorImpl: Restarting receiver with delay 2000 ms: Error handling message; restarting receiver

java.lang.OutOfMemoryError: PermGen space

原因分析:transform,map,reducebykry 等算子内不要进行耗时和消耗内存(多次创建hive对象)的操作

 

正确解决方法:写一个单列模式(object是单例对象,但是hiveContext 是一个类并不是单例对象)

  def fiferGameidfromhive(sqlContext: SQLContext, dsKey2imie: DStream[(String, String)]): DStream[(String, (String, Boolean))] = {

    dsKey2imie.transform(rdd => {

      val hiveContext = HiveContextSingleton.getInstance(rdd.sparkContext)

       val rddsql = hiveContext .sql("select game_id from yyft.ods_publish_game").rdd.map(x => {

        x.getInt(0).toString;

      }).map((_, true)).distinct();

      rdd.join(rddsql);

    });

  }

 

object HiveContextSingleton {

  @transient private var instance: HiveContext = _

  def getInstance(sparkContext: SparkContext): HiveContext = {

    if (instance == null) {

      instance = new HiveContext(sparkContext)

    }

    instance

  }

}

转载请注明原文地址: https://www.6miu.com/read-46802.html

最新回复(0)