Spark Streaming源码初探 (1)

xiaoxiao2021-02-28  64

之前学习过Spark Core源码,接下来一段时间研究一下Spark Streaming相关的内容!下面就从最简单的Streaming程序开始作为入口点(Receiver模式),程序代码如下:

import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.streaming.{Seconds, StreamingContext} /** * Created by Daxin on 2017/8/4. */ object StreamMain { def main(args: Array[String]) { val conf = new SparkConf() conf.setAppName("socketStream") //基于Reciver模式,所以线程数目需要大于1,否则只能接受数据无法处理数据 conf.setMaster("local[*]") //如果设置conf.setMaster("local[1]")的话,将会没有线程负责计算 val sc = new SparkContext(conf) sc.setLogLevel("ERROR") val ssc = new StreamingContext(sc, Seconds(2)) //TODO 最终创建一个SocketInputDStream返回 val line = ssc.socketTextStream("node", 9999) val result = line.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) result.print() //TODO 核心代码入口点,在org.apache.spark.streaming.StreamingContext.start方法中启动JobScheduler,开启接受数据并进行计算 ssc.start() ssc.awaitTermination() } } 如上程序可分为三部分,第一部分是模板代码创建ssc,第二部分创建DStream以及算子计算,第三部分启动ssc。下文将主要这三部分

1:StreamingContext的创建

SparkContext是Spark批处理的的入口,此处StreamingContext也正是Spark Streaming的入口。StreamingContext提供创建DStream的函数,以及调用其start方法启动流计算。

主构造器如下:

class StreamingContext private[streaming] ( _sc: SparkContext, _cp: Checkpoint, _batchDur: Duration ) extends Logging

辅助构造器:

/** * Create a StreamingContext using an existing SparkContext. * @param sparkContext existing SparkContext * @param batchDuration the time interval at which streaming data will be divided into batches */ def this(sparkContext: SparkContext, batchDuration: Duration) = { this(sparkContext, null, batchDuration) } /** * Create a StreamingContext by providing the configuration necessary for a new SparkContext. * @param conf a org.apache.spark.SparkConf object specifying Spark parameters * @param batchDuration the time interval at which streaming data will be divided into batches */ def this(conf: SparkConf, batchDuration: Duration) = { this(StreamingContext.createNewSparkContext(conf), null, batchDuration) } /** * Create a StreamingContext by providing the details necessary for creating a new SparkContext. * @param master cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]). * @param appName a name for your job, to display on the cluster web UI * @param batchDuration the time interval at which streaming data will be divided into batches */ def this( master: String, appName: String, batchDuration: Duration, sparkHome: String = null, jars: Seq[String] = Nil, environment: Map[String, String] = Map()) = { this(StreamingContext.createNewSparkContext(master, appName, sparkHome, jars, environment), null, batchDuration) } /** * Recreate a StreamingContext from a checkpoint file. * @param path Path to the directory that was specified as the checkpoint directory * @param hadoopConf Optional, configuration object if necessary for reading from * HDFS compatible filesystems */ def this(path: String, hadoopConf: Configuration) = this(null, CheckpointReader.read(path, new SparkConf(), hadoopConf).orNull, null) /** * Recreate a StreamingContext from a checkpoint file. * @param path Path to the directory that was specified as the checkpoint directory */ def this(path: String) = this(path, SparkHadoopUtil.get.conf) /** * Recreate a StreamingContext from a checkpoint file using an existing SparkContext. * @param path Path to the directory that was specified as the checkpoint directory * @param sparkContext Existing SparkContext */ def this(path: String, sparkContext: SparkContext) = { this( sparkContext, CheckpointReader.read(path, sparkContext.conf, sparkContext.hadoopConfiguration).orNull, null) } 通过构造器发现都需要一个SparkContext对象,可以更加清楚认识到Spark Streaming是在Spark Core基础之上构建的流计算引擎。

2:DStream的创建

StreamingContext类似于SparkContext同样提供了一些创建抽象数据集的函数,例如:

/** * Creates an input stream from TCP source hostname:port. Data is received using * a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited * lines. * @param hostname Hostname to connect to for receiving data * @param port Port to connect to for receiving data * @param storageLevel Storage level to use for storing the received objects * (default: StorageLevel.MEMORY_AND_DISK_SER_2) * @see [[socketStream]] */ def socketTextStream( hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): ReceiverInputDStream[String] = withNamedScope("socket text stream") { socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel) } 本小节主要分析一下基于Receiver方式的创建DStream过程,主要是以Socket方式创建DStream为主要流程进行分析。接下来看一下DStream的继承结构:

注意:启动关于Flume相关的DStream是来至于Spark Streaming的Flume依赖库中的类,需要自行引入依赖!

通过上图我们可以看到基于Recevier方式创建的DStream都是继承至ReceiverInputDStream(例如:Direct方式的Kafka DStream是继承至InputDStream)。

接下来看一下如下代码创建DStream过程:

val line = ssc.socketTextStream("node", 9999) /** * Creates an input stream from TCP source hostname:port. Data is received using * a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited * lines. * @param hostname Hostname to connect to for receiving data * @param port Port to connect to for receiving data * @param storageLevel Storage level to use for storing the received objects * (default: StorageLevel.MEMORY_AND_DISK_SER_2) * @see [[socketStream]] */ def socketTextStream( hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): ReceiverInputDStream[String] = withNamedScope("socket text stream") { socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel) } /** * Creates an input stream from TCP source hostname:port. Data is received using * a TCP socket and the receive bytes it interpreted as object using the given * converter. * @param hostname Hostname to connect to for receiving data * @param port Port to connect to for receiving data * @param converter Function to convert the byte stream to objects * @param storageLevel Storage level to use for storing the received objects * @tparam T Type of the objects received (after converting bytes to objects) */ def socketStream[T: ClassTag]( hostname: String, port: Int, converter: (InputStream) => Iterator[T], storageLevel: StorageLevel ): ReceiverInputDStream[T] = { new SocketInputDStream[T](this, hostname, port, converter, storageLevel) }

最后底层调用ssc的socketStream方法 new 一个SocketInputDStream直接返回,此处没有触发计算。接下来看一下SocketInputDStream源码:

private[streaming] class SocketInputDStream[T: ClassTag]( _ssc: StreamingContext, host: String, port: Int, bytesToObjects: InputStream => Iterator[T], storageLevel: StorageLevel ) extends ReceiverInputDStream[T](_ssc) { def getReceiver(): Receiver[T] = { new SocketReceiver(host, port, bytesToObjects, storageLevel) //后期org.apache.spark.streaming.scheduler.ReceiverTracker#launchReceivers中进行调用getReceiver()方法 }}再看一下SocketReceiver的源码:

private[streaming] class SocketReceiver[T: ClassTag]( host: String, port: Int, bytesToObjects: InputStream => Iterator[T], storageLevel: StorageLevel ) extends Receiver[T](storageLevel) with Logging { private var socket: Socket = _ def onStart() { logInfo(s"Connecting to $host:$port") try { socket = new Socket(host, port) } catch { case e: ConnectException => restart(s"Error connecting to $host:$port", e) return } logInfo(s"Connected to $host:$port") // Start the thread that receives data over a connection new Thread("Socket Receiver") { setDaemon(true) override def run() { receive() }//调用进行数据数据接受 }.start() } def onStop() { // in case restart thread close it twice synchronized { if (socket != null) { socket.close() socket = null logInfo(s"Closed socket to $host:$port") } } } /** Create a socket connection and receive data until receiver is stopped */ def receive() { try { val iterator = bytesToObjects(socket.getInputStream()) while(!isStopped && iterator.hasNext) { store(iterator.next())//重点部分 } if (!isStopped()) { restart("Socket data stream had no more data") } else { logInfo("Stopped receiving") } } catch { case NonFatal(e) => logWarning("Error receiving data", e) restart("Error receiving data", e) } finally { onStop() } } }到此处就完成了socketTextStream的功能。到此处由于org.apache.spark.streaming.dstream.SocketInputDStream#getReceiver方法还没有被调用,所以还没有创建SocketReceiver,因此此处ssc.socketTextStream返回的只是一个数据的表示,并不代表真实的数据,只有当ssc.start()调用之后才会真正进行数据接受和处理!

3:StreamingContext的启动(StreamingContext.start()) 在分析start过程之前先简述一下几个相关组件的功能,组件分别如下: a: JobScheduler调度流作业在spark上运行 b:使用JobGenerator产生spark job并在线程池中运行

StreamingContext.start()源码如下:

/** * Start the execution of the streams. * * @throws IllegalStateException if the StreamingContext is already stopped. */ def start(): Unit = synchronized { state match { case INITIALIZED => startSite.set(DStream.getCreationSite()) StreamingContext.ACTIVATION_LOCK.synchronized { StreamingContext.assertNoOtherContextIsActive() try { validate() // Start the streaming scheduler in a new thread, so that thread local properties // like call sites and job groups can be reset without affecting those of the // current thread. ThreadUtils.runInNewThread("streaming-start") { sparkContext.setCallSite(startSite.get) sparkContext.clearJobGroup() sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false") savedProperties.set(SerializationUtils.clone(sparkContext.localProperties.get())) //TODO JobScheduler scheduler.start() } state = StreamingContextState.ACTIVE } catch { case NonFatal(e) => logError("Error starting the context, marking it as stopped", e) scheduler.stop(false) state = StreamingContextState.STOPPED throw e } StreamingContext.setActiveContext(this) } logDebug("Adding shutdown hook") // force eager creation of logger shutdownHookRef = ShutdownHookManager.addShutdownHook( StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown) // Registering Streaming Metrics at the start of the StreamingContext assert(env.metricsSystem != null) env.metricsSystem.registerSource(streamingSource) uiTab.foreach(_.attach()) logInfo("StreamingContext started") case ACTIVE => logWarning("StreamingContext has already been started") case STOPPED => throw new IllegalStateException("StreamingContext has already been stopped") } }

如下是 JobScheduler.start()代码:

def start(): Unit = synchronized { if (eventLoop != null) return // scheduler has already been started logDebug("Starting JobScheduler") eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") { override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e) } eventLoop.start() // attach rate controllers of input streams to receive batch completion updates for { inputDStream <- ssc.graph.getInputStreams rateController <- inputDStream.rateController } ssc.addStreamingListener(rateController) listenerBus.start() //TODO ReceiverTracker初始化 receiverTracker = new ReceiverTracker(ssc) //TODO 主要用来统计输入数据集的统计信息,用来进行UI显示和监控 inputInfoTracker = new InputInfoTracker(ssc) val executorAllocClient: ExecutorAllocationClient = ssc.sparkContext.schedulerBackend match { case b: ExecutorAllocationClient => b.asInstanceOf[ExecutorAllocationClient] case _ => null } executorAllocationManager = ExecutorAllocationManager.createIfEnabled( executorAllocClient, receiverTracker, ssc.conf, ssc.graph.batchDuration.milliseconds, clock) executorAllocationManager.foreach(ssc.addStreamingListener) //TODO 重点:ReceiverTracker启动,负责数据接受和处理 receiverTracker.start() //TODO 重点:jobGenerator启动,负责产生Spark Job jobGenerator.start() executorAllocationManager.foreach(_.start()) logInfo("Started JobScheduler") } 这里面核心代码是:ReceiverTracker和JobGenerator的创建初始化以及启动!ReceiverTracker的启动(如下代码运行在Driver):

/** Start the endpoint and receiver execution thread. */ def start(): Unit = synchronized { if (isTrackerStarted) { throw new SparkException("ReceiverTracker already started") } //TODO 此处进行判断数据源是否是Receiver类型的,如果是的话启动Receiver,否则不启动Receiver if (!receiverInputStreams.isEmpty) { endpoint = ssc.env.rpcEnv.setupEndpoint("ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv)) //TODO 启动Receiver if (!skipReceiverLaunch) launchReceivers() logInfo("ReceiverTracker started") trackerState = Started } }  launchReceivers()方法实现:

/** * Get the receivers from the ReceiverInputDStreams, distributes them to the * worker nodes as a parallel collection, and runs them. * <br><br> * org.apache.spark.streaming.scheduler.ReceiverTracker#launchReceivers() */ private def launchReceivers(): Unit = { //TODO 获取ReceiverInputDStream中的Receiver val receivers = receiverInputStreams.map { nis => val rcvr = nis.getReceiver() rcvr.setReceiverId(nis.id) rcvr } runDummySparkJob() logInfo("Starting " + receivers.length + " receivers") //TODO endpoint为ReceiverTracker的Endpoint引用 //TODO 就是给自己发送消息 endpoint.send(StartAllReceivers(receivers)) } org.apache.spark.streaming.scheduler.ReceiverTracker.ReceiverTrackerEndpoint#receive对StartAllReceivers消息的处理:

override def receive: PartialFunction[Any, Unit] = { // Local messages //TODO 处理消息 case StartAllReceivers(receivers) => val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors) for (receiver <- receivers) { val executors = scheduledLocations(receiver.streamId) updateReceiverScheduledExecutors(receiver.streamId, executors) receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation //TODO 启动receiver startReceiver(receiver, executors) } case RestartReceiver(receiver) => // Old scheduled executors minus the ones that are not active any more val oldScheduledExecutors = getStoredScheduledExecutors(receiver.streamId) val scheduledLocations = if (oldScheduledExecutors.nonEmpty) { // Try global scheduling again oldScheduledExecutors } else { val oldReceiverInfo = receiverTrackingInfos(receiver.streamId) // Clear "scheduledLocations" to indicate we are going to do local scheduling val newReceiverInfo = oldReceiverInfo.copy( state = ReceiverState.INACTIVE, scheduledLocations = None) receiverTrackingInfos(receiver.streamId) = newReceiverInfo schedulingPolicy.rescheduleReceiver( receiver.streamId, receiver.preferredLocation, receiverTrackingInfos, getExecutors) } // Assume there is one receiver restarting at one time, so we don't need to update // receiverTrackingInfos startReceiver(receiver, scheduledLocations) case c: CleanupOldBlocks => receiverTrackingInfos.values.flatMap(_.endpoint).foreach(_.send(c)) case UpdateReceiverRateLimit(streamUID, newRate) => for (info <- receiverTrackingInfos.get(streamUID); eP <- info.endpoint) { eP.send(UpdateRateLimit(newRate)) } // Remote messages case ReportError(streamId, message, error) => reportError(streamId, message, error) }

最后完成ReceiverSupervisorImpl的创建: /** * Start a receiver along with its scheduled executors */ private def startReceiver( receiver: Receiver[_], scheduledLocations: Seq[TaskLocation]): Unit = { def shouldStartReceiver: Boolean = { // It's okay to start when trackerState is Initialized or Started !(isTrackerStopping || isTrackerStopped) } val receiverId = receiver.streamId if (!shouldStartReceiver) { onReceiverJobFinish(receiverId) return } val checkpointDirOption = Option(ssc.checkpointDir) val serializableHadoopConf = new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration) // Function to start the receiver on the worker node val startReceiverFunc: Iterator[Receiver[_]] => Unit = (iterator: Iterator[Receiver[_]]) => { if (!iterator.hasNext) { throw new SparkException( "Could not start receiver as object not found.") } if (TaskContext.get().attemptNumber() == 0) { val receiver = iterator.next() assert(iterator.hasNext == false) //TODO 完成ReceiverSupervisorImpl的创建 val supervisor = new ReceiverSupervisorImpl( receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption) supervisor.start() supervisor.awaitTermination() } else { // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it. } } // Create the RDD using the scheduledLocations to run the receiver in a Spark job val receiverRDD: RDD[Receiver[_]] = if (scheduledLocations.isEmpty) { ssc.sc.makeRDD(Seq(receiver), 1) } else { val preferredLocations = scheduledLocations.map(_.toString).distinct ssc.sc.makeRDD(Seq(receiver -> preferredLocations)) } receiverRDD.setName(s"Receiver $receiverId") ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId") ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite())) val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit]( receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ()) // We will keep restarting the receiver job until ReceiverTracker is stopped future.onComplete { case Success(_) => if (!shouldStartReceiver) { onReceiverJobFinish(receiverId) } else { logInfo(s"Restarting Receiver $receiverId") self.send(RestartReceiver(receiver)) } case Failure(e) => if (!shouldStartReceiver) { onReceiverJobFinish(receiverId) } else { logError("Receiver has been stopped. Try to restart it.", e) logInfo(s"Restarting Receiver $receiverId") self.send(RestartReceiver(receiver)) } }(ThreadUtils.sameThread) logInfo(s"Receiver ${receiver.streamId} started") }

转载请注明原文地址: https://www.6miu.com/read-26919.html

最新回复(0)