Spark Streaming处理流程源码走读

Spark Streaming处理流程简介

​ Spark Streaming 是基于Spark的流式处理框架,会将流式计算分解成一系列短小的批处理作业。Spark Streaming会不停地接收、存储外部数据(如Kafka、MQTT、Socket等),然后每隔一定时间(称之为batch,通常为秒级别的)启动Spark Job来处理这段时间内接收到的数据。

简单来说,Spark Streaming处理流程为:

  • 不停存储外部数据
  • 定期启动Spark Job,处理一个时间段内的数据

存储外部数据

  • 程序调用流程

    StreamingContext.start() -> JobScheduler.start() -> ReceiverTracker.start() && JobGenerator.start()

    • ReceiverTracker.start() 不停地存储外部数据
    • JobGenerator.start() 用于处理数据
  • ReceiverTracker

    • 位于Driver端,用于管理所有的Receiver

      • Note:所有ReceiverInputDStream类型的DStream 都对应一个Receiver(用于接收外部数据)

        ReceiverInputDStream.getReceiver() 返回Receiver

    • 内含ReceivedBlockTracker类型成员

      • ReceivedBlockTracker的2个重要方法

        • def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean

          记录worker发送过来的BlockInfo,存储格式:streamId -> mutable.Queue[ReceivedBlockInfo]

        • def allocateBlocksToBatch(batchTime: Time): Unit

          构造Time -> Map[Int, Seq[ReceivedBlockInfo]] ,即构造每个batch对应的流以及Blocks的映射关系

    • 构造endpoint,处理ReceiverTrackerLocalMessage类型的本地消息

      • case StartAllReceivers(receivers)
      • case RestartReceiver(receiver)
      • case c: CleanupOldBlocks
      • case UpdateReceiverRateLimit(streamUID, newRate)
      • case ReportError(streamId, message, error)
    • start()

      • 启动endpoint,发送StartAllReceivers Message

      • Note: 在发送StartAllReceivers Message前,执行了runDummySparkJob,用于避免所有receiver被分配到同一个Executor。(设置分区数为50可以确保启动的所有task很小的概率分配到同一host上,而该Spark Job运行结束后,未执行SparkContext.stop,故而BlockManagerMaster中存储的各work的executor信息未清空,可以用于后续需求)

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        /**
        * Run the dummy Spark job to ensure that all slaves have registered. This avoids all the
        * receivers to be scheduled on the same node.
        *
        * TODO Should poll the executor number and wait for executors according to
        * "spark.scheduler.minRegisteredResourcesRatio" and
        * "spark.scheduler.maxRegisteredResourcesWaitingTime" rather than running a dummy job.
        */
        private def runDummySparkJob(): Unit = {
        if (!ssc.sparkContext.isLocal) {
        ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
        }
        assert(getExecutors.nonEmpty)
        }
      • startReceiver()。ReceiverTracker收到自己发送的StartAllReceivers Message后,对每个receiver执行startReceiver()

        • 构造RDD

          1
          val receiverRDD: RDD[Receiver[_]] = ssc.sc.makeRDD(Seq(receiver), 1)
        • 指定RDD将执行的function

          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          val startReceiverFunc: Iterator[Receiver[_]] => Unit =
          (iterator: Iterator[Receiver[_]]) => {
          if (!iterator.hasNext) {
          throw new SparkException(
          "Could not start receiver as object not found.")
          }
          if (TaskContext.get().attemptNumber() == 0) {
          val receiver = iterator.next()
          assert(iterator.hasNext == false)
          val supervisor = new ReceiverSupervisorImpl(
          receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
          supervisor.start()
          supervisor.awaitTermination()
          } else {
          // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
          }
          }

          Note:RDD实际执行ReceiverSupervisorImpl.start(),task失败后重试时将重新调度

        • 提交Spark Job

          1
          2
          val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
          receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())

          至此,Streaming启动了Spark Job,不停的接收外部数据

        • ReceiverSupervisorImpl 直接接收外部数据的关键类,重点分析

          • BlockGenerator。ReceiverSupervisorImpl包含的重要数据成员

            • def addData(data: Any): Unit

              将data存入currentBuffer(ArrayBuffer类型)

            • updateCurrentBuffer()

              将currentBuffer中的数据构造成Block(使用time做block的uniq id),然后重新构造新的currentBuffer,将Block push 到blocksForPushing 队列(后续有线程不停处理该队列中的block)

              1
              2
              3
              4
              5
              6
              7
              8
              9
              10
              11
              12
              13
              14
              15
              16
              17
              private def updateCurrentBuffer(time: Long): Unit = {
              try {
              var newBlock: Block = null
              synchronized {
              if (currentBuffer.nonEmpty) {
              val newBlockBuffer = currentBuffer
              currentBuffer = new ArrayBuffer[Any]
              val blockId = StreamBlockId(receiverId, time - blockIntervalMs)
              listener.onGenerateBlock(blockId)
              newBlock = new Block(blockId, newBlockBuffer)
              }
              }
              if (newBlock != null) {
              blocksForPushing.put(newBlock) // put is blocking when queue is full
              }
              }
            • keepPushingBlocks()

              从blocksForPushing队列中不停地取block,然后处理block。

              处理block可分为2个步骤:

              1. 将block添加进worker本地的BlockManager中
              2. 将blockInfo发送到Driver的ReceiverTracker,至此Driver就能感知外部数据了
            • 2个线程之blockIntervalTimer

              1
              2
              private val blockIntervalTimer =
              new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator")

              该Timer会启一个线程,周期性的(默认间隔为200ms)调用updateCurrentBuffer

            • 2个线程之blockPushingThread,调用keepPushingBlocks

              1
              private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }
          • ReceiverSupervisorImpl.start() //回到该方法,继续分析

            • onStart()

              BlockGenerator.start(),做好接收外部数据的准备,外部数据会存放在BlockGenerator的currentBuffer中

            • onReceiverStart()

              向Driver端的ReceiverTracker注册Receiver

            • receiver.onStart()

              即 ReceiverInputDStream.getReceiver().onStart(),用于实际接收外部数据,传递外部数据到BlockGenerator.addData

              例如MQTTReceiver.onStart()执行流程: 创建MQTT连接,接收topic,将接收到的message存入BlockGenerator.currentBuffer

          • 至此,ReceiverTracker 接收外部数据的流程分析完毕,总结为

            1. Driver端的ReceiverTracker 启动Spark Job
            2. worker上调用MQTTReceiver.onStart() 接收外部MQTT数据,并存入BlockGenerator.currentBuffer
            3. BlockGenerator周期性将currentBuffer构造成Block,并同步BlockInfo到Driver的ReceiverTracker
            4. ReceiverTracker能感知Blocks

定期启动Spark Job

  • GenerateJobs

    1
    2
    private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
    longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

    JobGenerator.start() 后,周期性(周期为构造StreamingContext时传入的batch时间参数)调用GenerateJobs。

    generateJobs执行步骤:

    1. ReceiverTracker.allocateBlocksToBatch()

      收集当前batch下,各inputStream产生的BlockInfos

    2. createBlockRDD(validTime, blockInfos)

      根据上述BlockInfos 创建BlockRDD

    3. 构造jobFunction 与 job

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      getOrCompute(time) match {
      case Some(rdd) => {
      val jobFunc = () => {
      val emptyFunc = { (iterator: Iterator[T]) => {} }
      context.sparkContext.runJob(rdd, emptyFunc)
      }
      Some(new Job(time, jobFunc))
      }
      case None => None
      }

      Job.run() 就是直接调用jobFunc(),运行新的Spark Job,处理rdd

    4. 将Job封装成JobSet,丢给线程池(默认一个线程)去实际执行job.run

      1
      jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
  • 总结

    1. 启动独立的Spark Job用于接收外部数据。worker 接收外部数据,周期性(与batch值无关,默认200ms)封装成Block,存入Spark内存中,并将BlockInfo同步给Driver。

      实际执行Class:BlockGenerator

    2. Driver周期性(batch值)根据BlockInfos生成BlockRDD,根据RDD构造Spark Job,并执行。