博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
SparkSteaming运行流程分析以及CheckPoint操作
阅读量:4920 次
发布时间:2019-06-11

本文共 16444 字,大约阅读时间需要 54 分钟。

本文主要通过源码来了解SparkStreaming程序从任务生成到任务完成整个执行流程以及中间伴随的checkpoint操作

注:下面源码只贴出跟分析内容有关的代码,其他省略

1 分析流程

应用程序入口:

val sparkConf = new SparkConf().setAppName("SparkStreaming")val sc = new SparkContext(sparkConf)val ssc = new StreamingContext(sc, Seconds(batchDuration.toLong))ssc.start()ssc.awaitTermination()

一旦ssc.start()调用之后,程序便真正开始运行

第一步:

SparkStreamingContext.start()进行如下主要工作:

  • 调用JobScheduler.start()
  • 发送StreamingListenerStreamingStarted消息
JobScheduler.start()def start(): Unit = synchronized {    state match {      case INITIALIZED =>        StreamingContext.ACTIVATION_LOCK.synchronized {          StreamingContext.assertNoOtherContextIsActive()          try{              ...              scheduler.start()            }            state = StreamingContextState.ACTIVE            scheduler.listenerBus.post(              StreamingListenerStreamingStarted(System.currentTimeMillis()))          } catch {            ...          }          StreamingContext.setActiveContext(this)        }        ...      case ACTIVE =>        logWarning("StreamingContext has already been started")      case STOPPED =>        throw new IllegalStateException("StreamingContext has already been stopped")    }  }

第二步:

调用JobScheduler.start()执行以下主要操作:

  • 创建EventLoop用于处理接收到的JobSchedulerEvent,processEvent就是实际的处理逻辑
  • 调用jobGenerator.start()
JobScheduler.start():def start(): Unit = synchronized {    if (eventLoop != null) return // scheduler has already been started    logDebug("Starting JobScheduler")    //创建一个Event监听器并启动    eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {      override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)      override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)    }    eventLoop.start()    ...    //启动JobGenerator    jobGenerator.start()    ...  }

第三步:

JobGenerator.start()执行以下主要操作:

  • 创建EventLoop[JobGeneratorEvent]用于处理JobGeneratorEvent事件
  • 开始执行job的生成工作
    • 创建一个timer周期地执行eventLoop.post(GenerateJobs(new Time(longTime)))
    • JobGenerator.start()中的EventLoop收到GenerateJobs事件后,去执行generateJobs(time)
    • generateJobs()中生成JobSet并调用jobScheduler.submitJobSet()进行提交,然后发送一个DoCheckpointEvent进行checkpoint
JobGenerator.start()def start(): Unit = synchronized {    if (eventLoop != null) return // generator has already been started    //创建checkpointWriter用于将checkpoint信息持久化    checkpointWriter    //创建了Event监听器,用于监听JobGeneratorEvent并处理    eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {      override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)      override protected def onError(e: Throwable): Unit = {        jobScheduler.reportError("Error in job generator", e)      }    }    eventLoop.start()    if (ssc.isCheckpointPresent) {      //从checkpoint中恢复      restart()    } else {      //首次创建      startFirstTime()    }}

首次启动会调用startFirstTime(),在该方法中主要是调用已经初始化好的RecurringTimer.start()进行周期性的发送GenerateJobs事件,这个周期是ssc.graph.batchDuration.milliseconds也就是你所设置的batchTime,JobGenerate.start()中所创建好的EventLoop收到GenerateJobs事件,就会执行processEvent(),从而执行generateJobs(time)来进行Job的生成工作

private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,    longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")  private def startFirstTime() {    val startTime = new Time(timer.getStartTime())    graph.start(startTime - graph.batchDuration)    timer.start(startTime.milliseconds)    logInfo("Started JobGenerator at " + startTime)  }  private def processEvent(event: JobGeneratorEvent) {    logDebug("Got event " + event)    event match {      case GenerateJobs(time) => generateJobs(time)      case ClearMetadata(time) => clearMetadata(time)      case DoCheckpoint(time, clearCheckpointDataLater) =>        doCheckpoint(time, clearCheckpointDataLater)      case ClearCheckpointData(time) => clearCheckpointData(time)    }  }

generateJobs的主要工作:

  • 生成JobSet并调用jobScheduler.submitJobSet()进行提交
  • 发送一个DoCheckpointEvent进行checkpoint
private def generateJobs(time: Time) {    ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")    Try {      jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch      graph.generateJobs(time) // generate jobs using allocated block    } match {      case Success(jobs) =>        val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)        jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))      case Failure(e) =>        jobScheduler.reportError("Error generating jobs for time " + time, e)        PythonDStream.stopStreamingContextIfPythonProcessIsDead(e)    }    eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))  }

第一个操作:jobScheduler.submitJobSet()中的主要操作是遍历jobSet中的job,并将其作为参数传入JobHandler对象中,并将JobHandler丢到jobExecutor中去执行。JobHandler是实现了Runnable,它的run()主要做了以下三件事

  • 发送JobStarted事件
  • 执行Job.run()
  • 发送JobCompleted事件
def submitJobSet(jobSet: JobSet) {    if (jobSet.jobs.isEmpty) {      logInfo("No jobs added for time " + jobSet.time)    } else {      listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))      jobSets.put(jobSet.time, jobSet)      jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))      logInfo("Added jobs for time " + jobSet.time)    }}private class JobHandler(job: Job) extends Runnable with Logging {    import JobScheduler._    def run() {      try {        var _eventLoop = eventLoop        if (_eventLoop != null) {          _eventLoop.post(JobStarted(job, clock.getTimeMillis()))//发送JobStarted事件          SparkHadoopWriterUtils.disableOutputSpecValidation.withValue(true) {            job.run()          }          _eventLoop = eventLoop          if (_eventLoop != null) {            _eventLoop.post(JobCompleted(job, clock.getTimeMillis()))//发送JobCompleted事件          }        } else {        }      } finally {        ssc.sparkContext.setLocalProperties(oldProps)      }    }  }

第二个操作:发送DoCheckpoint事件

JobScheduler.start()中创建的EventLoop的核心内容是processEvent(event)方法,Event的类型有三种,分别是JobStarted、JobCompleted和ErrorReported,EventLoop收到DoCheckpoint事件后会执行doCheckpoint():

//JobGenerator.processEvent()  private def processEvent(event: JobGeneratorEvent) {    logDebug("Got event " + event)    event match {      ...      case DoCheckpoint(time, clearCheckpointDataLater) =>        doCheckpoint(time, clearCheckpointDataLater)      ...    }  }

doCheckpoint()调用graph.updateCheckpointData进行checkpoint信息的更新,调用checkpointWriter.write对checkpoint信息进行持久化

private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) {    if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {      logInfo("Checkpointing graph for time " + time)      //将新的checkpoint写到      ssc.graph.updateCheckpointData(time)      //将checkpoint写到文件系统中      checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater)    } else if (clearCheckpointDataLater) {      markBatchFullyProcessed(time)    }  }

checkpoint的update中主要是调用DStreamGraph.updateCheckpointData:

def updateCheckpointData(time: Time) {    logInfo("Updating checkpoint data for time " + time)    this.synchronized {      outputStreams.foreach(_.updateCheckpointData(time))    }    logInfo("Updated checkpoint data for time " + time)  }

outputStreams.foreach(_.updateCheckpointData(time))则是调用了DStream中的updateCheckpointData方法,而该方法主要是调用checkpointData.update(currentTime)来进行更新,并且调用该DStream所依赖的DStream进行更新。

private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]]()private[streaming] def updateCheckpointData(currentTime: Time) {    logDebug(s"Updating checkpoint data for time $currentTime")    checkpointData.update(currentTime)    dependencies.foreach(_.updateCheckpointData(currentTime))    logDebug(s"Updated checkpoint data for time $currentTime: $checkpointData")  }

我们接下来来看看checkpointData.update(currentTime):我们可以在DStream中看到以下的实现:

private[streaming] val checkpointData = new DStreamCheckpointData(this)

我们接着找到了:DStreamCheckpointData.update,DStreamCheckpointData有其他子类用于自定义保存的内容和逻辑

//key为指定时间,value为checkpoint file内容  @transient private var timeToCheckpointFile = new HashMap[Time, String]  // key为batchtime,value该batch中最先checkpointed RDD的time  @transient private var timeToOldestCheckpointFileTime = new HashMap[Time, Time]  protected[streaming] def currentCheckpointFiles = data.asInstanceOf[HashMap[Time, String]]def update(time: Time) {    //从dsteam中获得要checkpoint的RDDs,generatedRDDs就是一个HashMap[Time, RDD[T]]    val checkpointFiles = dstream.generatedRDDs.filter(_._2.getCheckpointFile.isDefined)                                       .map(x => (x._1, x._2.getCheckpointFile.get))    logDebug("Current checkpoint files:\n" + checkpointFiles.toSeq.mkString("\n"))    // checkpoint文件添加到最后要进行序列化的HashMap中    if (!checkpointFiles.isEmpty) {      currentCheckpointFiles.clear()      currentCheckpointFiles ++= checkpointFiles      //更新checkpointfile      timeToCheckpointFile ++= currentCheckpointFiles      // key为传入的time,value为最先进行checkpoint的rdd的time      timeToOldestCheckpointFileTime(time) = currentCheckpointFiles.keys.min(Time.ordering)    }  }

第四步:任务完成

在上面generateJobs中所调用的jobScheduler.submitJobSet()中提到每个Job都会作为参数传入JobHandler,而JobHandler会丢到JobExecutor中去执行,而JobHandler的主要工作是发送JobStarted事件,执行完任务后会发送JobCompleted事件,而JobScheduler.EventLoop收到事件后会执行handleJobCompletion方法

//JobScheduler.processEvent() private def processEvent(event: JobSchedulerEvent) {    try {      event match {        case JobStarted(job, startTime) => handleJobStart(job, startTime)        case JobCompleted(job, completedTime) => handleJobCompletion(job, completedTime)        case ErrorReported(m, e) => handleError(m, e)      }    } catch {      case e: Throwable =>        reportError("Error in job scheduler", e)    }  }

handleJobCompletion方法会调用jobSet.handleJobCompletion(job),并且在最后会判断jobSet是否已经全部完成,如果是的话会执行jobGenerator.onBatchCompletion(jobSet.time)

private def handleJobCompletion(job: Job, completedTime: Long) {    val jobSet = jobSets.get(job.time)    jobSet.handleJobCompletion(job)    job.setEndTime(completedTime)    listenerBus.post(StreamingListenerOutputOperationCompleted(job.toOutputOperationInfo))    logInfo("Finished job " + job.id + " from job set of time " + jobSet.time)    if (jobSet.hasCompleted) {      listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))    }    job.result match {      case Failure(e) =>        reportError("Error running job " + job, e)      case _ => //如果所有事件完成则会执行以下操作        if (jobSet.hasCompleted) {          jobSets.remove(jobSet.time)          jobGenerator.onBatchCompletion(jobSet.time)          logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format(            jobSet.totalDelay / 1000.0, jobSet.time.toString,            jobSet.processingDelay / 1000.0          ))        }    }  }

此时到JobGenerator中找到onBatchCompletion():

def onBatchCompletion(time: Time) {    eventLoop.post(ClearMetadata(time))}

JobGenerator.processEvent()执行clearMetadata(time)

private def processEvent(event: JobGeneratorEvent) {    logDebug("Got event " + event)    event match {      case GenerateJobs(time) => generateJobs(time)      case ClearMetadata(time) => clearMetadata(time)      case DoCheckpoint(time, clearCheckpointDataLater) =>        doCheckpoint(time, clearCheckpointDataLater)      case ClearCheckpointData(time) => clearCheckpointData(time)    }}

clearMetadata()对原数据进行checkpoint或者删除

private def clearMetadata(time: Time) {    ssc.graph.clearMetadata(time)    // If checkpointing is enabled, then checkpoint,    // else mark batch to be fully processed    if (shouldCheckpoint) {      //如果需要进行checkpoint,发送DoCheckpoint事件       eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = true))    } else {      //将meta数据进行删除    }}

2 总结

到这里SparkStreaming的启动、任务生成、任务结束、Checkpoint操作基本就了解完毕了,最后我们来做一个总结,SparkStreming程序的运行流程如下:

  • SparkStreamingContext.start() 启动 JobScheduler
  • JobScheduler的启动操作
    • JobScheduler 创建了 EventLoop[JobSchedulerEvent] 来处理 JobStarted 和 JobCompleted 事件
    • 启动 JobGenerator
  • JobGenerator 的启动操作
    • JobGenerator 创建了 EventLoop[JobGeneratorEvent] 来处理 GenerateJobs、ClearMetaData、DoCheckPoint和ClearCheckpointData 事件
    • 创建RecurringTimer周期性地发送 GenerateJobs 事件用于任务的周期性创建和执行
  • JobGenerator的任务生成工作
    • 调用 geneateJobs() 来生成 JobSet 并通过 JobScheduler.submitJobset 进行任务的提交
      • submitJobset 将 Job 作为参数传入 JobHandler ,并将 JobHandler 丢进线程池 JobExecutor 中执行
    • 发送 DoCheckPoint 事件进行元数据的 checkpoint
  • 任务完成
    • JobHandler 中任务完成之后会发送 JobCompleted 事件,JobScheduler.EventLoop 接收到该事件后会进行处理,并且判断 JobSet 全部完成之后,发送 ClearMetaData 事件,进行数据的 checkpoint 或者删除

附:RecurringTimer和EventLoop的源码,并作简单的注释

RecurringTimer的代码如下:

private[streaming]class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: String)  extends Logging {  //创建一个thread,用来执行loop  private val thread = new Thread("RecurringTimer - " + name) {    setDaemon(true)    override def run() { loop }  }  @volatile private var prevTime = -1L  @volatile private var nextTime = -1L  @volatile private var stopped = false  def getStartTime(): Long = {    (math.floor(clock.getTimeMillis().toDouble / period) + 1).toLong * period  }  def getRestartTime(originalStartTime: Long): Long = {    val gap = clock.getTimeMillis() - originalStartTime    (math.floor(gap.toDouble / period).toLong + 1) * period + originalStartTime  }  //start方法中主要是启动thread,用于执行thread中的loop  def start(startTime: Long): Long = synchronized {    nextTime = startTime    thread.start()    logInfo("Started timer for " + name + " at time " + nextTime)    nextTime  }  def start(): Long = {    start(getStartTime())  }  def stop(interruptTimer: Boolean): Long = synchronized {    if (!stopped) {      stopped = true      if (interruptTimer) {        thread.interrupt()      }      thread.join()      logInfo("Stopped timer for " + name + " after time " + prevTime)    }    prevTime  }  private def triggerActionForNextInterval(): Unit = {    clock.waitTillTime(nextTime)    callback(nextTime)    prevTime = nextTime    nextTime += period    logDebug("Callback for " + name + " called at time " + prevTime)  }  //周期性地执行callback的内容,也就是   private def loop() {    try {      while (!stopped) {        triggerActionForNextInterval()      }      triggerActionForNextInterval()    } catch {      case e: InterruptedException =>    }  }}

EventLoop的源码:主要功能就是创建一个线程在后台判断是否Event进来,有的话则进行相应的处理

private[spark] abstract class EventLoop[E](name: String) extends Logging {  private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()  private val stopped = new AtomicBoolean(false)  private val eventThread = new Thread(name) {    setDaemon(true)    override def run(): Unit = {      try {        while (!stopped.get) {          val event = eventQueue.take()          try {            onReceive(event)          } catch {            case NonFatal(e) =>              try {                onError(e)              } catch {                case NonFatal(e) => logError("Unexpected error in " + name, e)              }          }        }      } catch {        case ie: InterruptedException => // exit even if eventQueue is not empty        case NonFatal(e) => logError("Unexpected error in " + name, e)      }    }  }  def start(): Unit = {    if (stopped.get) {      throw new IllegalStateException(name + " has already been stopped")    }    // Call onStart before starting the event thread to make sure it happens before onReceive    onStart()    eventThread.start()  }  def stop(): Unit = {    if (stopped.compareAndSet(false, true)) {      eventThread.interrupt()      var onStopCalled = false      try {        eventThread.join()        // Call onStop after the event thread exits to make sure onReceive happens before onStop        onStopCalled = true        onStop()      } catch {        case ie: InterruptedException =>          Thread.currentThread().interrupt()          if (!onStopCalled) {            // ie is thrown from `eventThread.join()`. Otherwise, we should not call `onStop` since            // it's already called.            onStop()          }      }    } else {      // Keep quiet to allow calling `stop` multiple times.    }  }  //将event放进eventQueue中,在eventThread进行相应的处理  def post(event: E): Unit = {    eventQueue.put(event)  }  //返回eventThread是否存活  def isActive: Boolean = eventThread.isAlive  //eventThread启动前调用  protected def onStart(): Unit = {}  //在eventThred结束后调用  protected def onStop(): Unit = {}  //实现类实现Event的处理逻辑  protected def onReceive(event: E): Unit  //出错时的处理逻辑  protected def onError(e: Throwable): Unit}

转载于:https://www.cnblogs.com/simple-focus/p/8463259.html

你可能感兴趣的文章
网络相关知识
查看>>
Chapter 1 Securing Your Server and Network(5):使用SSL加密会话
查看>>
19、路由和拓扑图和lan
查看>>
编写高性能 Web 应用程序的 10 个技巧
查看>>
a 锚点跳转滑动效果
查看>>
iOS9.0 LaunchScreen.StroyBoard自定义启动图片
查看>>
14、求出最大元素的下标及地址值——数组
查看>>
rm 删除不掉文件,报错解决 以及 chattr的介绍
查看>>
《需求工程——软件建模与分析》读后感
查看>>
ovs-vsctl 命令详解
查看>>
超级账本Fabric教程(一):超级账本入门
查看>>
.net core 使用阿里云短信发送SMS
查看>>
Unity5.1 新的网络引擎UNET(四) UNET Remote Actions
查看>>
How to get service execuable path
查看>>
39岁了,我依旧要谈梦想
查看>>
java的IO流初探
查看>>
反射实现java深度克隆
查看>>
转载 Javascript DOM Document|Element|Attribute对象方法详解
查看>>
图书助手冲刺第六天
查看>>
需求评审
查看>>