本文主要通过源码来了解SparkStreaming程序从任务生成到任务完成整个执行流程以及中间伴随的checkpoint操作
注:下面源码只贴出跟分析内容有关的代码,其他省略
1 分析流程
应用程序入口:
val sparkConf = new SparkConf().setAppName("SparkStreaming")val sc = new SparkContext(sparkConf)val ssc = new StreamingContext(sc, Seconds(batchDuration.toLong))ssc.start()ssc.awaitTermination()
一旦ssc.start()调用之后,程序便真正开始运行
第一步:
SparkStreamingContext.start()进行如下主要工作:- 调用JobScheduler.start()
- 发送StreamingListenerStreamingStarted消息
JobScheduler.start()def start(): Unit = synchronized { state match { case INITIALIZED => StreamingContext.ACTIVATION_LOCK.synchronized { StreamingContext.assertNoOtherContextIsActive() try{ ... scheduler.start() } state = StreamingContextState.ACTIVE scheduler.listenerBus.post( StreamingListenerStreamingStarted(System.currentTimeMillis())) } catch { ... } StreamingContext.setActiveContext(this) } ... case ACTIVE => logWarning("StreamingContext has already been started") case STOPPED => throw new IllegalStateException("StreamingContext has already been stopped") } }
第二步:
调用JobScheduler.start()执行以下主要操作:- 创建EventLoop用于处理接收到的JobSchedulerEvent,processEvent就是实际的处理逻辑
- 调用jobGenerator.start()
JobScheduler.start():def start(): Unit = synchronized { if (eventLoop != null) return // scheduler has already been started logDebug("Starting JobScheduler") //创建一个Event监听器并启动 eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") { override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e) } eventLoop.start() ... //启动JobGenerator jobGenerator.start() ... }
第三步:
JobGenerator.start()执行以下主要操作:- 创建EventLoop[JobGeneratorEvent]用于处理JobGeneratorEvent事件
- 开始执行job的生成工作
- 创建一个timer周期地执行eventLoop.post(GenerateJobs(new Time(longTime)))
- JobGenerator.start()中的EventLoop收到GenerateJobs事件后,去执行generateJobs(time)
- generateJobs()中生成JobSet并调用jobScheduler.submitJobSet()进行提交,然后发送一个DoCheckpointEvent进行checkpoint
JobGenerator.start()def start(): Unit = synchronized { if (eventLoop != null) return // generator has already been started //创建checkpointWriter用于将checkpoint信息持久化 checkpointWriter //创建了Event监听器,用于监听JobGeneratorEvent并处理 eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") { override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = { jobScheduler.reportError("Error in job generator", e) } } eventLoop.start() if (ssc.isCheckpointPresent) { //从checkpoint中恢复 restart() } else { //首次创建 startFirstTime() }}
首次启动会调用startFirstTime(),在该方法中主要是调用已经初始化好的RecurringTimer.start()进行周期性的发送GenerateJobs事件,这个周期是ssc.graph.batchDuration.milliseconds也就是你所设置的batchTime,JobGenerate.start()中所创建好的EventLoop收到GenerateJobs事件,就会执行processEvent(),从而执行generateJobs(time)来进行Job的生成工作
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator") private def startFirstTime() { val startTime = new Time(timer.getStartTime()) graph.start(startTime - graph.batchDuration) timer.start(startTime.milliseconds) logInfo("Started JobGenerator at " + startTime) } private def processEvent(event: JobGeneratorEvent) { logDebug("Got event " + event) event match { case GenerateJobs(time) => generateJobs(time) case ClearMetadata(time) => clearMetadata(time) case DoCheckpoint(time, clearCheckpointDataLater) => doCheckpoint(time, clearCheckpointDataLater) case ClearCheckpointData(time) => clearCheckpointData(time) } }
generateJobs的主要工作:
- 生成JobSet并调用jobScheduler.submitJobSet()进行提交
- 发送一个DoCheckpointEvent进行checkpoint
private def generateJobs(time: Time) { ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true") Try { jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch graph.generateJobs(time) // generate jobs using allocated block } match { case Success(jobs) => val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time) jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e) PythonDStream.stopStreamingContextIfPythonProcessIsDead(e) } eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false)) }
第一个操作:jobScheduler.submitJobSet()中的主要操作是遍历jobSet中的job,并将其作为参数传入JobHandler对象中,并将JobHandler丢到jobExecutor中去执行。JobHandler是实现了Runnable,它的run()主要做了以下三件事
- 发送JobStarted事件
- 执行Job.run()
- 发送JobCompleted事件
def submitJobSet(jobSet: JobSet) { if (jobSet.jobs.isEmpty) { logInfo("No jobs added for time " + jobSet.time) } else { listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo)) jobSets.put(jobSet.time, jobSet) jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job))) logInfo("Added jobs for time " + jobSet.time) }}private class JobHandler(job: Job) extends Runnable with Logging { import JobScheduler._ def run() { try { var _eventLoop = eventLoop if (_eventLoop != null) { _eventLoop.post(JobStarted(job, clock.getTimeMillis()))//发送JobStarted事件 SparkHadoopWriterUtils.disableOutputSpecValidation.withValue(true) { job.run() } _eventLoop = eventLoop if (_eventLoop != null) { _eventLoop.post(JobCompleted(job, clock.getTimeMillis()))//发送JobCompleted事件 } } else { } } finally { ssc.sparkContext.setLocalProperties(oldProps) } } }
第二个操作:发送DoCheckpoint事件
JobScheduler.start()中创建的EventLoop的核心内容是processEvent(event)方法,Event的类型有三种,分别是JobStarted、JobCompleted和ErrorReported,EventLoop收到DoCheckpoint事件后会执行doCheckpoint():
//JobGenerator.processEvent() private def processEvent(event: JobGeneratorEvent) { logDebug("Got event " + event) event match { ... case DoCheckpoint(time, clearCheckpointDataLater) => doCheckpoint(time, clearCheckpointDataLater) ... } }
doCheckpoint()调用graph.updateCheckpointData进行checkpoint信息的更新,调用checkpointWriter.write对checkpoint信息进行持久化
private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) { if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) { logInfo("Checkpointing graph for time " + time) //将新的checkpoint写到 ssc.graph.updateCheckpointData(time) //将checkpoint写到文件系统中 checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater) } else if (clearCheckpointDataLater) { markBatchFullyProcessed(time) } }
checkpoint的update中主要是调用DStreamGraph.updateCheckpointData:
def updateCheckpointData(time: Time) { logInfo("Updating checkpoint data for time " + time) this.synchronized { outputStreams.foreach(_.updateCheckpointData(time)) } logInfo("Updated checkpoint data for time " + time) }
outputStreams.foreach(_.updateCheckpointData(time))则是调用了DStream中的updateCheckpointData方法,而该方法主要是调用checkpointData.update(currentTime)来进行更新,并且调用该DStream所依赖的DStream进行更新。
private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]]()private[streaming] def updateCheckpointData(currentTime: Time) { logDebug(s"Updating checkpoint data for time $currentTime") checkpointData.update(currentTime) dependencies.foreach(_.updateCheckpointData(currentTime)) logDebug(s"Updated checkpoint data for time $currentTime: $checkpointData") }
我们接下来来看看checkpointData.update(currentTime):我们可以在DStream中看到以下的实现:
private[streaming] val checkpointData = new DStreamCheckpointData(this)
我们接着找到了:DStreamCheckpointData.update,DStreamCheckpointData有其他子类用于自定义保存的内容和逻辑
//key为指定时间,value为checkpoint file内容 @transient private var timeToCheckpointFile = new HashMap[Time, String] // key为batchtime,value该batch中最先checkpointed RDD的time @transient private var timeToOldestCheckpointFileTime = new HashMap[Time, Time] protected[streaming] def currentCheckpointFiles = data.asInstanceOf[HashMap[Time, String]]def update(time: Time) { //从dsteam中获得要checkpoint的RDDs,generatedRDDs就是一个HashMap[Time, RDD[T]] val checkpointFiles = dstream.generatedRDDs.filter(_._2.getCheckpointFile.isDefined) .map(x => (x._1, x._2.getCheckpointFile.get)) logDebug("Current checkpoint files:\n" + checkpointFiles.toSeq.mkString("\n")) // checkpoint文件添加到最后要进行序列化的HashMap中 if (!checkpointFiles.isEmpty) { currentCheckpointFiles.clear() currentCheckpointFiles ++= checkpointFiles //更新checkpointfile timeToCheckpointFile ++= currentCheckpointFiles // key为传入的time,value为最先进行checkpoint的rdd的time timeToOldestCheckpointFileTime(time) = currentCheckpointFiles.keys.min(Time.ordering) } }
第四步:任务完成
在上面generateJobs中所调用的jobScheduler.submitJobSet()中提到每个Job都会作为参数传入JobHandler,而JobHandler会丢到JobExecutor中去执行,而JobHandler的主要工作是发送JobStarted事件,执行完任务后会发送JobCompleted事件,而JobScheduler.EventLoop收到事件后会执行handleJobCompletion方法//JobScheduler.processEvent() private def processEvent(event: JobSchedulerEvent) { try { event match { case JobStarted(job, startTime) => handleJobStart(job, startTime) case JobCompleted(job, completedTime) => handleJobCompletion(job, completedTime) case ErrorReported(m, e) => handleError(m, e) } } catch { case e: Throwable => reportError("Error in job scheduler", e) } }
handleJobCompletion方法会调用jobSet.handleJobCompletion(job),并且在最后会判断jobSet是否已经全部完成,如果是的话会执行jobGenerator.onBatchCompletion(jobSet.time)
private def handleJobCompletion(job: Job, completedTime: Long) { val jobSet = jobSets.get(job.time) jobSet.handleJobCompletion(job) job.setEndTime(completedTime) listenerBus.post(StreamingListenerOutputOperationCompleted(job.toOutputOperationInfo)) logInfo("Finished job " + job.id + " from job set of time " + jobSet.time) if (jobSet.hasCompleted) { listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo)) } job.result match { case Failure(e) => reportError("Error running job " + job, e) case _ => //如果所有事件完成则会执行以下操作 if (jobSet.hasCompleted) { jobSets.remove(jobSet.time) jobGenerator.onBatchCompletion(jobSet.time) logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format( jobSet.totalDelay / 1000.0, jobSet.time.toString, jobSet.processingDelay / 1000.0 )) } } }
此时到JobGenerator中找到onBatchCompletion():
def onBatchCompletion(time: Time) { eventLoop.post(ClearMetadata(time))}
JobGenerator.processEvent()执行clearMetadata(time)
private def processEvent(event: JobGeneratorEvent) { logDebug("Got event " + event) event match { case GenerateJobs(time) => generateJobs(time) case ClearMetadata(time) => clearMetadata(time) case DoCheckpoint(time, clearCheckpointDataLater) => doCheckpoint(time, clearCheckpointDataLater) case ClearCheckpointData(time) => clearCheckpointData(time) }}
clearMetadata()对原数据进行checkpoint或者删除
private def clearMetadata(time: Time) { ssc.graph.clearMetadata(time) // If checkpointing is enabled, then checkpoint, // else mark batch to be fully processed if (shouldCheckpoint) { //如果需要进行checkpoint,发送DoCheckpoint事件 eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = true)) } else { //将meta数据进行删除 }}
2 总结
到这里SparkStreaming的启动、任务生成、任务结束、Checkpoint操作基本就了解完毕了,最后我们来做一个总结,SparkStreming程序的运行流程如下:
- SparkStreamingContext.start() 启动 JobScheduler
- JobScheduler的启动操作
- JobScheduler 创建了 EventLoop[JobSchedulerEvent] 来处理 JobStarted 和 JobCompleted 事件
- 启动 JobGenerator
- JobGenerator 的启动操作
- JobGenerator 创建了 EventLoop[JobGeneratorEvent] 来处理 GenerateJobs、ClearMetaData、DoCheckPoint和ClearCheckpointData 事件
- 创建RecurringTimer周期性地发送 GenerateJobs 事件用于任务的周期性创建和执行
- JobGenerator的任务生成工作
- 调用 geneateJobs() 来生成 JobSet 并通过 JobScheduler.submitJobset 进行任务的提交
- submitJobset 将 Job 作为参数传入 JobHandler ,并将 JobHandler 丢进线程池 JobExecutor 中执行
- 发送 DoCheckPoint 事件进行元数据的 checkpoint
- 调用 geneateJobs() 来生成 JobSet 并通过 JobScheduler.submitJobset 进行任务的提交
- 任务完成
- JobHandler 中任务完成之后会发送 JobCompleted 事件,JobScheduler.EventLoop 接收到该事件后会进行处理,并且判断 JobSet 全部完成之后,发送 ClearMetaData 事件,进行数据的 checkpoint 或者删除
附:RecurringTimer和EventLoop的源码,并作简单的注释
RecurringTimer的代码如下:
private[streaming]class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: String) extends Logging { //创建一个thread,用来执行loop private val thread = new Thread("RecurringTimer - " + name) { setDaemon(true) override def run() { loop } } @volatile private var prevTime = -1L @volatile private var nextTime = -1L @volatile private var stopped = false def getStartTime(): Long = { (math.floor(clock.getTimeMillis().toDouble / period) + 1).toLong * period } def getRestartTime(originalStartTime: Long): Long = { val gap = clock.getTimeMillis() - originalStartTime (math.floor(gap.toDouble / period).toLong + 1) * period + originalStartTime } //start方法中主要是启动thread,用于执行thread中的loop def start(startTime: Long): Long = synchronized { nextTime = startTime thread.start() logInfo("Started timer for " + name + " at time " + nextTime) nextTime } def start(): Long = { start(getStartTime()) } def stop(interruptTimer: Boolean): Long = synchronized { if (!stopped) { stopped = true if (interruptTimer) { thread.interrupt() } thread.join() logInfo("Stopped timer for " + name + " after time " + prevTime) } prevTime } private def triggerActionForNextInterval(): Unit = { clock.waitTillTime(nextTime) callback(nextTime) prevTime = nextTime nextTime += period logDebug("Callback for " + name + " called at time " + prevTime) } //周期性地执行callback的内容,也就是 private def loop() { try { while (!stopped) { triggerActionForNextInterval() } triggerActionForNextInterval() } catch { case e: InterruptedException => } }}
EventLoop的源码:主要功能就是创建一个线程在后台判断是否Event进来,有的话则进行相应的处理
private[spark] abstract class EventLoop[E](name: String) extends Logging { private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]() private val stopped = new AtomicBoolean(false) private val eventThread = new Thread(name) { setDaemon(true) override def run(): Unit = { try { while (!stopped.get) { val event = eventQueue.take() try { onReceive(event) } catch { case NonFatal(e) => try { onError(e) } catch { case NonFatal(e) => logError("Unexpected error in " + name, e) } } } } catch { case ie: InterruptedException => // exit even if eventQueue is not empty case NonFatal(e) => logError("Unexpected error in " + name, e) } } } def start(): Unit = { if (stopped.get) { throw new IllegalStateException(name + " has already been stopped") } // Call onStart before starting the event thread to make sure it happens before onReceive onStart() eventThread.start() } def stop(): Unit = { if (stopped.compareAndSet(false, true)) { eventThread.interrupt() var onStopCalled = false try { eventThread.join() // Call onStop after the event thread exits to make sure onReceive happens before onStop onStopCalled = true onStop() } catch { case ie: InterruptedException => Thread.currentThread().interrupt() if (!onStopCalled) { // ie is thrown from `eventThread.join()`. Otherwise, we should not call `onStop` since // it's already called. onStop() } } } else { // Keep quiet to allow calling `stop` multiple times. } } //将event放进eventQueue中,在eventThread进行相应的处理 def post(event: E): Unit = { eventQueue.put(event) } //返回eventThread是否存活 def isActive: Boolean = eventThread.isAlive //eventThread启动前调用 protected def onStart(): Unit = {} //在eventThred结束后调用 protected def onStop(): Unit = {} //实现类实现Event的处理逻辑 protected def onReceive(event: E): Unit //出错时的处理逻辑 protected def onError(e: Throwable): Unit}