思考过程:对模型进行实时训练问题,由于spark的算子与egads不兼容,每个批次预测之后,再DStream.foreachRDD方法中,把RDD的数据collect到driver端,然后,再进行模型的训练,训练之后 再广播到各个executor,这样每个批次都要进行广播,当广播模型比较多时,网络开销特别大;通过调研采用mapWithState来保证增量更新的状态,优势,不需要每批次模型被增量更新后都要存储到redis,下一个批次再从redis读取数据,这样网络开销也比较大。
优化点:1)目前这些配置文件和训练好的模型都在服务器本地文件系统中,后续把这些文件放到hdfs上面以保证spark程序在预测时间能够driver模式;
2)模型训练当模型比较多时间,由于采用单线程 性能是一个瓶颈。
遇到的问题:1)模型太多,要同时训练多个模型;2)动态训练模型时,要能保证模型更新的状态(调研后,采用mapWithState算子)
package com.tingyun.mlpredict.done import com.networkbench.avro.cache.ZookeeperAvroSchemaPersister import com.networkbench.avro.serialize.AvroMessageDecoder import com.networkbench.newlens.datacollector.backend.aggregate.wrappedmessage.own.MonitorWrappedMessage import kafka.serializer.{DefaultDecoder, StringDecoder} import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming._ import com.yahoo.egads.control.ModelAdapter import com.yahoo.egads.data.TimeSeries object ModelUpdateOnline{ def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("StreamingAnomalyDetector") val ssc = new StreamingContext(sparkConf, Minutes(1)) val sc = ssc.sparkContext ssc.checkpoint("E:\\tmp") //val mAdapter = Egads.loadModel("E:\\andy_ty\\work\\ml_egads\\anomolydetection\\src\\main\\resources\\mem\\2017-08-30_127082_2897_TripleExponentialSmoothingModel") //val initialRDD = ssc.sparkContext.parallelize(List[(String, ModelAdapter)](("127287_-1",TestModel()),("127287_3272",mAdapter),("127116_-1",mAdapter),("126887_2552",mAdapter),("127082_2897",mAdapter))) val initialRDD = List[(String,TestModel)](("127287_-1",TestModel(Seq[MonitorWrappedMessage]())),("127287_3272",TestModel(Seq[MonitorWrappedMessage]())),("127116_-1",TestModel(Seq[MonitorWrappedMessage]())),("126887_2552",TestModel(Seq[MonitorWrappedMessage]()))) var initialRddBC = sc.broadcast(initialRDD) val numThreads = "2" val topics = "alarm-detect-streaming" val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap val kafkaParams = Map[String, String]("zookeeper.connect" -> "10.194.1.2:2181,10.194.1.12:2181,10.194.1.13:2181", "group.id" -> "group01","zookeeper.connection.timeout.ms" -> "10000") val monitorWrappedMessage1 = KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder]( ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_SER_2).mapPartitions( partitions => { val zookeeperAvroSchemaPersister = new ZookeeperAvroSchemaPersister zookeeperAvroSchemaPersister.setServers("10.194.1.2:2181") zookeeperAvroSchemaPersister.setConnectionTimeout(10000) zookeeperAvroSchemaPersister.init() val avroMessageDecoder = new AvroMessageDecoder avroMessageDecoder.setAvroMessageEntityPackageToScan("com.networkbench.newlens.datacollector.backend.aggregate.wrappedmessage.own") avroMessageDecoder.setAvroSchemaPersister(zookeeperAvroSchemaPersister) val mWMessage = partitions.map(line => avroMessageDecoder.decode(line._2).asInstanceOf[MonitorWrappedMessage]).toList zookeeperAvroSchemaPersister.destroy() // 关闭zk链接 mWMessage.toIterator }) monitorWrappedMessage1.print(100) val monitorWrappedMessage = monitorWrappedMessage1.map(mmm => (mmm.getApplicationId + "_" + mmm.getApplicationInstanceId,mmm)) /* val params = Map("bootstrap.servers" -> "master:9092", "group.id" -> "scala-stream-group") val topic = Set("test") val initialRDD = ssc.sparkContext.parallelize(List[(String, Int)]()) val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, params, topic) val word = messages.flatMap(_._2.split(" ")).map { x => (x, 1) }*/ //自定义mappingFunction,累加单词出现的次数并更新状态 def mappingFuncDemo(word: String, monitorWrappedMessage: Option[MonitorWrappedMessage], state: State[ModelAdapter]):Option[(String,ModelAdapter)] = { /*state.get.update(new TimeSeries.DataSequence(monitorWrappedMessage.get.getTimestamp,1f)) state.update(state.get())*/ /* val preMA = state.getOption().getOrElse(new ModelAdapter()) preMA.update(new TimeSeries.DataSequence(monitorWrappedMessage.get.getApplicationId,1f)) state.update(preMA) val output = (word, preMA) Some(output)*/ val preMA = state.getOption() var ma = new ModelAdapter() preMA match{ case Some(modelAdapter) =>{ println(modelAdapter.firstTimeStamp + "==111=" + monitorWrappedMessage.get.getApplicationId); ma = preMA.get } case _ =>{ println( "=222==" + monitorWrappedMessage.get.getApplicationId); } } /*preMA.update(new TimeSeries.DataSequence(monitorWrappedMessage.get.getApplicationId,1f)) state.update(preMA)*/ val output = (word, ma) ma.update(new TimeSeries.DataSequence(monitorWrappedMessage.get.getApplicationId,1f)) Some(output) } //word来自于DStream中的key,monitorWrappedMessage来自于DStream中的value,state参数来自于initialState初始化的RDD,当不初始化则来自于 创建的默认空置(即val existingEvents: Seq[MonitorWrappedMessage] = state.getOption().map(_.monitorWrappedMessages.getOrElse(Seq[MonitorWrappedMessage]())) //initialState初始化的RDD为prevRDD,当前批次为currentRDD;当没有通过initialState初始化的RDD时,则prevRDD为新创建的 空对象。 def mappingFunc(word: String, monitorWrappedMessage: Option[MonitorWrappedMessage], state: State[TestModel]):Option[(String,TestModel)] = { val preMA = state.getOption() //var ma = new TestModel(Seq[MonitorWrappedMessage]()) preMA match{ case Some(testModel) =>{ //println(monitorWrappedMessage.get.getApplicationId +"==111=="+ testModel.monitorWrappedMessages ); val testModelnew = TestModel(monitorWrappedMessage.get +: testModel.monitorWrappedMessages) state.update(testModelnew) Some((word,testModelnew)) } case _ =>{ //println( "=222==" + monitorWrappedMessage.get.getApplicationId); None; } } /* val existingEvents: Seq[MonitorWrappedMessage] = state.getOption().map(_.monitorWrappedMessages) .getOrElse(Seq[MonitorWrappedMessage]()) 当没有初始化RDD时则创建默认值*/ /* val testModel = TestModel(monitorWrappedMessage.get +: existingEvents) state.update(testModel)*/ //Some((word,ma)) } // 当initialState 初始化,第一个批次会从 这个实例好的rdd 对应的 map中根据key(就是word,来自于 上一个DSTream中的key)取值,并执行 mappingFunc 中的业务逻辑; // 当没有通过initialState 初始化,在创建时间要添加 //调用mapWithState进行管理流数据的状态 val stateDstream = monitorWrappedMessage.mapWithState(StateSpec.function(mappingFunc _).initialState(sc.parallelize(initialRDD)).timeout(Minutes(5))).map( ll => {ll match { case Some(test) =>{test._1 +"===33333=="+ test._2.monitorWrappedMessages} case _ => {"======NODATA======="} } } ).print() ssc.start() ssc.awaitTermination() } }