4.5 打通Spark系统运行内幕机制循环流程

Spark通过DAGScheduler面向整个Job划分出了不同的Stage,划分Stage之后,Stage从后往前划分,执行的时候从前往后执行,每个Stage内部有一系列的任务,Stage里面的任务是并行计算,并行任务的逻辑是完全相同的,但处理的数据不同。DAGScheduler以TaskSet的方式,把一个DAG构建的Stage中的所有任务提交给底层的调度器TaskScheduler。TaskScheduler是一个接口,与具体的任务解耦合,可以运行在不同的调度模式下,如可运行在Standalone模式,也可运行在Yarn上。

Spark基础调度(图4-6)包括RDD Objects、DAGScheduler、TaskScheduler、Worker等内容。

图4-6 Spark基础调度图

DAGScheduler在提交TaskSet给底层调度器的时候是面向接口TaskScheduler的,这符合面向对象中依赖抽象而不依赖具体的原则,带来底层资源调度器的可插拔性,导致Spark可以运行在众多的资源调度器模式上,如Standalone、Yarn、Mesos、Local、EC2、其他自定义的资源调度器;在Standalone的模式下我们聚焦于TaskSchedulerImpl。

TaskScheduler是一个接口Trait,底层任务调度接口,由[org.apache.spark.scheduler. TaskSchedulerImpl]实现。这个接口允许插入不同的任务调度程序。每个任务调度器在单独的SparkContext中调度任务。任务调度程序从每个Stage的DAGScheduler获得提交的任务集,负责发送任务到集群运行,如果任务运行失败,将重试,返回DAGScheduler事件。

Spark 2.1.1版本的TaskScheduler.scala的源码如下。

1.    private[spark] trait TaskScheduler {
2.
3.   private val appId = "spark-application-" + System.currentTimeMillis
4.
5.   def rootPool: Pool
6.
7.   def schedulingMode: SchedulingMode
8.
9.   def start(): Unit
10.
11.  //成功初始化后调用(通常在Spark上下文中)。Yarn使用这个来引导基于优先位置的资源
     //分配,等待从节点登记等
12.  def postStartHook() { }
13.
14.  //从群集断开连接
15.  def stop(): Unit
16.
17.  //提交要运行的任务序列
18.  def submitTasks(taskSet: TaskSet): Unit
19.
20.  //取消Stage
21.  def cancelTasks(stageId: Int, interruptThread: Boolean): Unit
22.
23.  //系统为upcalls设置DAG调度,这是保证在submitTasks被调用前被设置
24.  def setDAGScheduler(dagScheduler: DAGScheduler): Unit
25.
26.  //获取集群中使用的默认并行级别,作为对作业的提示
27.  def defaultParallelism(): Int
28.
29.  /**
       * 更新正运行任务,让master 知道BlockManager 仍活着。如果driver 知道给定的
       * 块管理器,则返回true;否则,返回false,指示块管理器应重新注册
30.    */
31.
32.  def executorHeartbeatReceived(
33.      execId: String,
34.      accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
35.      blockManagerId: BlockManagerId): Boolean
36.
37.  /**
38.    *获取与作业相关联的应用程序ID
39.    * @return An application ID
40.    */
41.   def applicationId(): String = appId
42.
43.   /**
        *处理丢失的 executor
44.     */
45.   def executorLost(executorId: String, reason: ExecutorLossReason): Unit
46.
47.   /**
48.     *获取与作业相关联的应用程序的尝试ID
49.     *
50.     * @return应用程序的尝试ID
51.     */
52.   def applicationAttemptId(): Option[String]
53.
54. }

Spark 2.2.0版本的TaskScheduler.scala的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第21行之后新增加了killTaskAttempt方法。

1.  ......
2.  /**
3.    *杀死任务尝试
4.    *
5.    * @return任务是否成功被杀死
6.    */
7.    def killTaskAttempt(taskId: Long, interruptThread: Boolean, reason:
      String): Boolean
8.  .......

DAGScheduler把TaskSet交给底层的接口TaskScheduler,具体实现时有不同的方法。TaskScheduler主要由TaskSchedulerImpl实现。

TaskSchedulerImpl也有自己的子类YarnScheduler。

1.     private[spark] class YarnScheduler(sc: SparkContext) extends
       TaskSchedulerImpl(sc) {
2.
3.    //RackResolver 记录INFO日志信息时,解析rack的信息
4.    if (Logger.getLogger(classOf[RackResolver]).getLevel == null) {
5.      Logger.getLogger(classOf[RackResolver]).setLevel(Level.WARN)
6.    }
7.
8.    //默认情况下,rack是未知的
9.    override def getRackForHost(hostPort: String): Option[String] = {
10.     val host = Utils.parseHostPort(hostPort)._1
11.     Option(RackResolver.resolve(sc.hadoopConfiguration, host).
        getNetworkLocation)
12.   }
13. }

YarnScheduler的子类YarnClusterScheduler实现如下。

1.  private[spark] class YarnClusterScheduler(sc: SparkContext) extends
    YarnScheduler(sc) {
2.  logInfo("Created YarnClusterScheduler")
3.
4.  override def postStartHook() {
5.      ApplicationMaster.sparkContextInitialized(sc)
6.      super.postStartHook()
7.      logInfo("YarnClusterScheduler.postStartHook done")
8.    }
9.
10. }

默认情况下,我们研究Standalone的模式,所以主要研究TaskSchedulerImpl。DAGScheduler把TaskSet交给TaskScheduler,TaskScheduler中通过TastSetManager管理具体的任务。TaskScheduler的核心任务是提交TaskSet到集群运算,并汇报结果。

 为TaskSet创建和维护一个TaskSetManager,并追踪任务的本地性以及错误信息。

 遇到延后的Straggle任务,会放到其他节点重试。

 向DAGScheduler汇报执行情况,包括在Shuffle输出lost的时候报告fetch failed错误等信息。

TaskSet是一个普通的类,第一个成员是tasks,tasks是一个数组。TaskSet的源码如下。

1.     private[spark] class TaskSet(
2.      val tasks: Array[Task[_]],
3.      val stageId: Int,
4.      val stageAttemptId: Int,
5.      val priority: Int,
6.      val properties: Properties) {
7.    val id: String = stageId + "." + stageAttemptId
8.
9.    override def toString: String = "TaskSet " + id
10. }

TaskScheduler内部有SchedulerBackend,SchedulerBackend管理Executor资源。从Standalone的模式来讲,具体实现是StandaloneSchedulerBackend(Spark 2.0版本将之前的AppClient名字更新为StandaloneAppClient)。

SchedulerBackend本身是一个接口,是一个trait。SchedulerBackend的源码如下。

1.   private[spark] trait SchedulerBackend {
2.    private val appId = "spark-application-" + System.currentTimeMillis
3.
4.    def start(): Unit
5.    def stop(): Unit
6.    def reviveOffers(): Unit
7.    def defaultParallelism(): Int
8.
9.    def killTask(taskId: Long, executorId: String, interruptThread:
      Boolean): Unit =
10.     throw new UnsupportedOperationException
11.   def isReady(): Boolean = true
12.
13.   /**
14.     *获取与作业关联的应用ID
15.     *
16.     * @return 应用程序 ID
17.     */
18.   def applicationId(): String = appId
19.
20.   /**
21.     *如果集群管理器支持多个尝试,则获取此运行的尝试ID,应用程序运行在客户端模式将没
        *有尝试ID
22.     *
23.     * @return如果可用,返回应用程序尝试ID
24.     */
25.   def applicationAttemptId(): Option[String] = None
26.
27.   /**
28.     *得到driver 日志的URL。这些URL是用来在用户界面中显示链接driver的Executors
        *选项卡
29.     *
30.     * @return Map 包含日志名称和URLs
31.     */
32.   def getDriverLogUrls: Option[Map[String, String]] = None
33.
34. }

StandaloneSchedulerBackend:专门负责收集Worker的资源信息。接收Worker向Driver注册的信息,ExecutorBackend启动的时候进行注册,为当前应用程序准备计算资源,以进程为单位。

StandaloneSchedulerBackend的源码如下。

1.   private[spark] class StandaloneSchedulerBackend(
2.      scheduler: TaskSchedulerImpl,
3.      sc: SparkContext,
4.      masters: Array[String])
5.    extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)
6.    with StandaloneAppClientListener
7.    with Logging {
8.    private var client: StandaloneAppClient = null
9.  ......

StandaloneSchedulerBackend里有一个Client: StandaloneAppClient。

1.  private[spark] class StandaloneAppClient(
2.    rpcEnv: RpcEnv,
3.    masterUrls: Array[String],
4.    appDescription: ApplicationDescription,
5.    listener: StandaloneAppClientListener,
6.    conf: SparkConf)
7.  extends Logging {

StandaloneAppClient允许应用程序与Spark standalone集群管理器通信。获取Master的URL、应用程序描述和集群事件监听器,当各种事件发生时可以回调监听器。masterUrls的格式为spark://host:port,StandaloneAppClient需要向Master注册。

StandaloneAppClient在StandaloneSchedulerBackend.scala的start方法启动时进行赋值,用new()函数创建一个StandaloneAppClient。

Spark 2.1.1版本的StandaloneSchedulerBackend.scala的源码如下。

1.    private[spark] class StandaloneSchedulerBackend(
2.  ......
3.
4.  override def start() {
5.  ......
6.   val appDesc = new ApplicationDescription(sc.appName, maxCores,
     sc.executorMemory, command, appUIAddress, sc.eventLogDir, sc.eventLogCodec,
     coresPerExecutor, initialExecutorLimit)
7.     client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this,
       conf)
8.      client.start()
9.      launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
10.     waitForRegistration()
11.     launcherBackend.setState(SparkAppHandle.State.RUNNING)
12.   }

Spark 2.2.0版本的StandaloneSchedulerBackend.scala的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第6行ApplicationDescription传入的第5个参数appUIAddress更改为webUrl。

1.    ......
2.  val appDesc = ApplicationDescription(sc.appName, maxCores, sc.
    executorMemory, command, webUrl, sc.eventLogDir,sc.eventLogCodec,
          coresPerExecutor, initialExecutorLimit)
3.  .....

StandaloneAppClient.scala中,里面有一个类是ClientEndpoint,核心工作是在启动时向Master注册。StandaloneAppClient的start方法启动时,就调用new函数创建一个ClientEndpoint。

StandaloneAppClient的源码如下。

1.      private[spark] class StandaloneAppClient(
2.  ......
3.   private class ClientEndpoint(override val rpcEnv: RpcEnv) extends
     ThreadSafeRpcEndpoint
4.      with Logging {
5.  ......
6.  def start() {
7.      //启动 rpcEndpoint; 将回调listener
8.      endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint
        (rpcEnv)))
9.    }

StandaloneSchedulerBackend在启动时构建StandaloneAppClient实例,并在StandaloneAppClient实例start时启动了ClientEndpoint消息循环体。ClientEndpoint在启动时会向Master注册当前程序。

StandaloneAppClient中ClientEndpoint类的onStart()方法如下。

1.   override def onStart(): Unit = {
2.     try {
3.       registerWithMaster(1)
4.     } catch {
5.       case e: Exception =>
6.         logWarning("Failed to connect to master", e)
7.         markDisconnected()
8.         stop()
9.     }
10.  }

这是StandaloneSchedulerBackend的第一个注册的核心功能。StandaloneSchedulerBackend继承自CoarseGrainedSchedulerBackend。而CoarseGrainedSchedulerBackend在启动时就创建DriverEndpoint,从实例的角度讲,DriverEndpoint也属于StandaloneSchedulerBackend实例。

1.     private[spark]
2.  class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val
    rpcEnv: RpcEnv)
3.    extends ExecutorAllocationClient with SchedulerBackend with Logging
4.  {
5.  ......
6.   class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties:
     Seq[(String, String)])
7.      extends ThreadSafeRpcEndpoint with Logging {
8.  ......

StandaloneSchedulerBackend的父类CoarseGrainedSchedulerBackend在start的时候会实例化类型为DriverEndpoint(这就是我们程序运行时的经典对象Driver)的消息循环体。StandaloneSchedulerBackend在运行时向Master注册申请资源,当Worker的ExecutorBackend启动时会发送RegisteredExecutor信息向DriverEndpoint注册,此时StandaloneSchedulerBackend就掌握了当前应用程序拥有的计算资源,TaskScheduler就是通过StandaloneSchedulerBackend拥有的计算资源来具体运行Task的;StandaloneSchedulerBackend不是应用程序的总管,应用程序的总管是DAGScheduler、TaskScheduler,StandaloneSchedulerBackend向应用程序的Task分配具体的计算资源,并把Task发送到集群中。

SparkContext、DAGScheduler、TaskSchedulerImpl、StandaloneSchedulerBackend在应用程序启动时只实例化一次,应用程序存在期间始终存在这些对象。

这里基于Spark 2.2版本讲解:

Spark调度器三大核心资源:SparkContext、DAGScheduler、TaskSchedulerImpl,TaskSchedulerImpl作为具体的底层调度器,运行时需要计算资源,因此需要StandaloneSchedulerBackend。StandaloneSchedulerBackend设计巧妙的地方是启动时启动StandaloneAppClient,而StandaloneAppClient在start时有一个ClientEndpoint的消息循环体,ClientEndpoint的消息循环体启动的时候向Master注册应用程序。

StandaloneSchedulerBackend的父类CoarseGrainedSchedulerBackend在start启动的时候会实例化DriverEndpoint,所有的ExecutorBackend启动的时候都要向DriverEndpoint注册,注册最后落到了StandaloneSchedulerBackend的内存数据结构中,表面上看是在CoarseGrainedSchedulerBackend,但是实例化的时候是StandaloneSchedulerBackend,注册给父类的成员其实就是子类的成员。

作为前提问题:TaskScheduler、StandaloneSchedulerBackend是如何启动的?TaskSchedulerImpl是什么时候实例化的?

TaskSchedulerImpl是在SparkContext中实例化的。在SparkContext类实例化的时候,只要不是方法体里面的内容,都会被执行,(sched, ts)是SparkContext的成员,将调用createTaskScheduler方法。调用createTaskScheduler方法返回一个Tuple,包括两个元素:sched是我们的schedulerBackend;ts是taskScheduler。

1. class SparkContext(config: SparkConf) extends Logging {
2.  ......
3. //创建启动调度器 scheduler
4.    val (sched, ts) = SparkContext.createTaskScheduler(this, master,
      deployMode)
5.    _schedulerBackend = sched
6.    _taskScheduler = ts
7.    _dagScheduler = new DAGScheduler(this)
8.    _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

createTaskScheduler里有很多运行模式,这里关注Standalone模式,首先调用new()函数创建一个TaskSchedulerImpl,TaskSchedulerImpl和SparkContext是一一对应的,整个程序运行的时候只有一个TaskSchedulerImpl,也只有一个SparkContext;接着实例化StandaloneSchedulerBackend,整个程序运行的时候只有一个StandaloneSchedulerBackend。createTaskScheduler方法如下。

1.      private def createTaskScheduler(
2.        sc: SparkContext,
3.        master: String,
4.        deployMode: String): (SchedulerBackend, TaskScheduler) = {
5.      import SparkMasterRegex._
6.  ......
7.    master match {
8.  ......
9.        case SPARK_REGEX(sparkUrl) =>
10.         val scheduler = new TaskSchedulerImpl(sc)
11.         val masterUrls = sparkUrl.split(",").map("spark://" + _)
12.         val   backend    =  new   StandaloneSchedulerBackend(scheduler,  sc,
            masterUrls)
13.         scheduler.initialize(backend)
14.         (backend, scheduler)
15. ......

在SparkContext实例化的时候通过createTaskScheduler来创建TaskSchedulerImpl和StandaloneSchedulerBackend。然后在createTaskScheduler中调用scheduler.initialize(backend)。

initialize的方法参数把StandaloneSchedulerBackend传进来,schedulingMode模式匹配有两种方式:FIFO、FAIR。

initialize的方法中调用schedulableBuilder.buildPools()。buildPools方法根据FIFOSchedulableBuilder、FairSchedulableBuilder不同的模式重载方法实现。

1.   private[spark] trait SchedulableBuilder {
2.    def rootPool: Pool
3.
4.    def buildPools(): Unit
5.
6.    def addTaskSetManager(manager: Schedulable, properties: Properties):
      Unit
7.  }

initialize的方法把StandaloneSchedulerBackend传进来了,但还没有启动StandaloneSchedulerBackend。在TaskSchedulerImpl的initialize方法中把StandaloneSchedulerBackend传进来,从而赋值为TaskSchedulerImpl的backend;在TaskSchedulerImpl调用start方法时会调用backend.start方法,在start方法中会最终注册应用程序。

下面来看SparkContext.scala的taskScheduler的启动。

1.    val   (sched,    ts)   =  SparkContext.createTaskScheduler(this,  master,
      deployMode)
2.      _schedulerBackend = sched
3.      _taskScheduler = ts
4.      _dagScheduler = new DAGScheduler(this)
5.  ......
6.      _taskScheduler.start()
7.      _applicationId = _taskScheduler.applicationId()
8.      _applicationAttemptId = taskScheduler.applicationAttemptId()
9.      _conf.set("spark.app.id", _applicationId)
10. ......

其中调用了_taskScheduler的start方法。

1.   private[spark] trait TaskScheduler {
2.  ......
3.
4.    def start(): Unit
5.  .....

TaskScheduler的start()方法没有具体实现。TaskScheduler子类的TaskSchedulerImpl的start()方法的源码如下。

1.    override def start() {
2.     backend.start()
3.
4.     if (!isLocal && conf.getBoolean("spark.speculation", false)) {
5.       logInfo("Starting speculative execution thread")
6.       speculationScheduler.scheduleAtFixedRate(new Runnable {
7.         override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
8.           checkSpeculatableTasks()
9.         }
10.      },   SPECULATION_INTERVAL_MS,        SPECULATION_INTERVAL_MS,   TimeUnit.
         MILLISECONDS)
11.    }
12.  }

TaskSchedulerImpl的start通过backend.start启动了StandaloneSchedulerBackend的start方法。

StandaloneSchedulerBackend的start方法中,将command封装注册给Master,Master转过来要Worker启动具体的Executor。command已经封装好指令,Executor具体要启动进程入口类CoarseGrainedExecutorBackend。然后调用new()函数创建一个StandaloneAppClient,通过client.start启动client。

StandaloneAppClient的start方法中调用new()函数创建一个ClientEndpoint:

1.   def start() {
2.    //启动 rpcEndpoint; 将回调listener
3.    endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint
      (rpcEnv)))
4.  }

ClientEndpoint的源码如下。

1.      private class ClientEndpoint(override val rpcEnv: RpcEnv) extends
        ThreadSafeRpcEndpoint
2.      with Logging {
3.  ......
4.      override def onStart(): Unit = {
5.        try {
6.          registerWithMaster(1)
7.        } catch {
8.          case e: Exception =>
9.            logWarning("Failed to connect to master", e)
10.       markDisconnected()
11.       stop()
12.    }
13.  }

ClientEndpoint是一个ThreadSafeRpcEndpoint。ClientEndpoint的onStart方法中调用registerWithMaster(1)进行注册,向Master注册程序。registerWithMaster方法如下。

1.     private def registerWithMaster(nthRetry: Int) {
2.     registerMasterFutures.set(tryRegisterAllMasters())
3.     registrationRetryTimer.set(registrationRetryThread.schedule(new
       Runnable {
4.       override def run(): Unit = {
5.         if (registered.get) {
6.           registerMasterFutures.get.foreach(_.cancel(true))
7.           registerMasterThreadPool.shutdownNow()
8.         } else if (nthRetry >= REGISTRATION_RETRIES) {
9.           markDead("All masters are unresponsive! Giving up.")
10.        } else {
11.          registerMasterFutures.get.foreach(_.cancel(true))
12.          registerWithMaster(nthRetry + 1)
13.        }
14.      }
15.    }, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS))
16.  }

程序注册后,Master通过schedule分配资源,通知Worker启动Executor,Executor启动的进程是CoarseGrainedExecutorBackend,Executor启动后又转过来向Driver注册,Driver其实是StandaloneSchedulerBackend的父类CoarseGrainedSchedulerBackend的一个消息循环体DriverEndpoint。

总结:

在SparkContext实例化的时候调用createTaskScheduler来创建TaskSchedulerImpl和StandaloneSchedulerBackend,同时在SparkContext实例化的时候会调用TaskSchedulerImpl的start,在start方法中会调用StandaloneSchedulerBackend的start,在该start方法中会创建StandaloneAppClient对象,并调用StandaloneAppClient对象的start方法,在该start方法中会创建ClientEndpoint,创建ClientEndpoint时会传入Command来指定具体为当前应用程序启动的Executor的入口类的名称为CoarseGrainedExecutorBackend,然后ClientEndpoint启动并通过tryRegisterMaster来注册当前的应用程序到Master中,Master接收到注册信息后如果可以运行程序,为该程序生产Job ID并通过schedule来分配计算资源,具体计算资源的分配是通过应用程序的运行方式、Memory、cores等配置信息决定的。最后,Master会发送指令给Worker,Worker为当前应用程序分配计算资源时会首先分配ExecutorRunner。ExecutorRunner内部会通过Thread的方式构建ProcessBuilder来启动另外一个JVM进程,这个JVM进程启动时加载的main方法所在的类的名称就是在创建ClientEndpoint时传入的Command来指定具体名称为CoarseGrainedExecutorBackend的类,此时JVM在通过ProcessBuilder启动时获得了CoarseGrainedExecutorBackend后加载并调用其中的main方法,在main方法中会实例化CoarseGrainedExecutorBackend本身这个消息循环体,而CoarseGrainedExecutorBackend在实例化时会通过回调onStart向DriverEndpoint发送RegisterExecutor来注册当前的CoarseGrainedExecutorBackend,此时DriverEndpoint收到该注册信息并保存在StandaloneSchedulerBackend实例的内存数据结构中,这样Driver就获得了计算资源。

CoarseGrainedExecutorBackend.scala的main方法如下。

1.   def main(args: Array[String]) {
2.      var driverUrl: String = null
3.      var executorId: String = null
4.      var hostname: String = null
5.      var cores: Int = 0
6.      var appId: String = null
7.      var workerUrl: Option[String] = None
8.      val userClassPath = new mutable.ListBuffer[URL]()
9.
10.     var argv = args.toList
11.    ......
12.     run(driverUrl, executorId, hostname, cores, appId, workerUrl,
        userClassPath)
13.     System.exit(0)
14.   }

CoarseGrainedExecutorBackend的main然后开始调用run方法。

1.     private def run(
2.        driverUrl: String,
3.        executorId: String,
4.        hostname: String,
5.        cores: Int,
6.        appId: String,
7.        workerUrl: Option[String],
8.        userClassPath: Seq[URL]) {
9.  ......
10.        env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
11.         env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath,
            env))
12. ......

在CoarseGrainedExecutorBackend的main方法中,通过env.rpcEnv.setupEndpoint ("Executor", new CoarseGrainedExecutorBackend(env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))构建了CoarseGrainedExecutorBackend实例本身。