5.3 ExecutorBackend启动原理和源码详解

ExecutorBackend是Executor向集群发送更新消息的一个可插拔的接口。ExecutorBackend拥有不同的实现。Standalone模式下ExecutorBackend的默认实现是CoarseGrainedExecutorBackend;在Local模式下,ExecutorBackend的默认实现是LocalBackend。在Mesos调度模式下,ExecutorBackend的默认实现是MesosExecutorBackend。本节主要探索Standalone模式下的ExecutorBackend,通过源码深入理解ExecutorBackend接口设计的精髓。

5.3.1 ExecutorBackend接口与Executor的关系

本节将详细分析Standalone模式下ExecutorBackend和Executor的关系。在StandaloneSchedulerBackend中会实例化一个StandaloneAppClient。StandaloneAppClient中携带了command信息,command信息中指定了要启动的ExecutorBackend的实现类,Standalone模式下,该ExecutorBackend的实现类是org.apache.spark.executor.CoarseGrainedExecutorBackend类。

StandaloneSchedulerBackend.scala的start方法中构建了一个Command对象,该对象的第一个参数是mainClass,即进程的主类。该类在Standalone模式下为org.apache.spark.executor. CoarseGrainedExecutorBackend。分别将sparkJavaopts、javaOpts、command、appUiAddress、coresPerExecutor、appDes传入StandaloneAppClient构造函数。StandaloneAppClient将会向Master发送RegisterApplication注册请求,Master受理后通过launchExecutor方法在Worker节点启动一个ExecutorRunner对象,该对象用于管理一个Executor进程。在ExecutorRunner中将通过CommandUtil构建一个ProcessBuilder,调用ProcessBuilder的start方法将会以进程的方式启动org.apache.spark.executor.CoarseGrainedExecutorBackend。在CoarseGrainedExecotorBackend的onStart方法中,将会向Driver端发送RegisterExecutor(executorId, self, hostPort, cores, extractLogUrls)消息请求注册,完成注册后将立即返回一个RegisteredExecutor(executorAddress. host)消息,CoarseGraiendExecutorBackend收到该消息,马上实例化出一个Executor。源码如下所示。

CoarseGrainedExecutorBackend.scala的源码如下。

1.  override def receive: PartialFunction[Any, Unit] = {
2.  case RegisteredExecutor =>
3.    logInfo("Successfully registered with driver")
4.    try {
5.      executor = new Executor(executorId, hostname, env, userClassPath,
        isLocal = false)
6.    } catch {
7.      case NonFatal(e) =>
8.        exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
9.    }

从这里可以看出,CoarseGrainedExecutorBackend比Executor先实例化。CoarseGrained-ExecutorBackend负责与集群通信,而Executor则专注于任务的处理,它们是一对一的关系,在集群中各司其职。

每个Worker节点上可以启动多个CoarseGrainedExecutorBackend进程,每个进程对应一个Executor。

5.3.2 ExecutorBackend的不同实现

ExecutorBackend是与集群交互的接口,该接口在不同的调度模式下有不同的实现。图5-3是ExecutorBackend及其实现的关系类图。

图5-3 ExecutorBackend及其实现的关系类图

不同模式下,ExecutorRunner启动的进程不一样。在Standalone模式下启动的是org.apache.spark.executor.CoarseGrainedExecutorBackend进程;在Local模式下,启动的是org.apache.spark.executor.LocalExecutorBackend进程;在Mesos模式下,启动的是org.apache. spark.executor.MesosExecutorBackend进程。

下面来看Standalone模式下CoarseGrainedExecutorBackend的启动。在Standalone模式下,会启动org.apache.spark.deploy.Client类,该类将向Master发送RequestSubmitDriver (driverDescription)消息,Master中匹配到RequestSubmitDriver(driverDescription)后,将会调用schedule方法。该调用的源码如下所示。

Master.scala的receiveAndReply的源码如下。

1.  override def receiveAndReply(context: RpcCallContext): PartialFunction
    [Any, Unit] = {
2.  ......
3.   case RequestSubmitDriver(description) =>
4.   //若state不为ALIVE,直接向Client返回SubmitDriverResponse(self,false,
     //None,msg)消息
5.        if (state != RecoveryState.ALIVE) {
6.          val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
7.            "Can only accept driver submissions in ALIVE state."
8.          context.reply(SubmitDriverResponse(self, false, None, msg))
9.        } else {
10.         logInfo("Driver submitted " + description.command.mainClass)
11. //使用description创建driver,该方法返回DriverDescription
12.         val driver = createDriver(description)
13.         persistenceEngine.addDriver(driver)
14.         waitingDrivers += driver
15.  //waitingDrivers等待在调度数组中加入该driver
16.         drivers.add(driver)
17.  //用schedule方法调度资源
18.         schedule()
19.  //向ClientEndpoint回复SubmitDriverResponse消息
20.
21.         context.reply(SubmitDriverResponse(self, true, Some(driver.id),
22.           s"Driver successfully submitted as ${driver.id}"))
23.       }

Master的receiveAndReply收到RequestSubmitDriver消息后,调用schedule方法。

Master的schedule的源码如下。

1.      private def schedule(): Unit = {
2.   if (state != RecoveryState.ALIVE) {
3.     return
4.   }
5.   //Drivers 优先于executors
6.   val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter
     (_.state == WorkerState.ALIVE))
7.   val numWorkersAlive = shuffledAliveWorkers.size
8.   var curPos = 0
9.   for (driver <- waitingDrivers.toList) { //遍历waitingDrivers
10.    //以循环的方式给每个等候的driver分配Worker。对于每个driver,我们从分配
       //给driver的最后一个Worker开始,继续前进,直到所有活跃的Worker节点
11.
12.    var launched = false
13.    var numWorkersVisited = 0
14.    while (numWorkersVisited < numWorkersAlive && !launched) {
15.      val worker = shuffledAliveWorkers(curPos)
16.      numWorkersVisited += 1
17.      if (worker.memoryFree >= driver.desc.mem && worker.coresFree >=
         driver.desc.cores) {
18.        launchDriver(worker, driver)
19.        waitingDrivers -= driver
20.        launched = true
21.        }
22.        curPos = (curPos + 1) % numWorkersAlive
23.      }
24.    }
25.    startExecutorsOnWorkers()
26.  }

上面代码中,RecoveryState若不为ALIVE,则直接返回,否则使用Random.shuffle将Workers集合打乱,过滤出ALIVE的Worker,生成新的集合shuffledAliveWorkers,尽量考虑到选择Driver的负载均衡。在for语句中遍历waitingDrivers队列,判断Worker剩余内存和剩余物理核是否满足Driver需求,如满足,则调用launchDriver(worker,driver)方法在选中的Worker上启动Driver进程。

实例化SparkContext时,在SparkContext中将实例化出DAGScheduler、StandaloneSchedulerBackend。Driver在Worker节点上启动之后,在StandaloneSchedulerBackend中将会调用new()函数创建一个StandaloneAppClient。StandaloneAppClient中有一个ClientEndpoint,在其onStart方法中将向Master发送RegisterApplication请求注册application,注册好application后,Master又会调用schedule方法,在满足条件的Worker上为application启动Executor,首先会启动ExecutorRunner,在ExecutorRunner中启动CoarseGrainedExecutor-Backend,启动后将会实例化出Executor。为什么在Standalone模式下会启动CoarseGrained-ExecutorBackend呢?在什么地方设置要启动的CoarseGrainedExecutorBackend进程呢?其实,在实例化StandaloneAppClient的时候就已经传入了。

StandaloneSchedulerBackend.scala的start方法代码中设置了Command对象。Command对象的第一个参数是启动进程的mainClass。因此,ExecutorRunner中启动进程时,启动的是org.apache.spark.executor.CoarseGrainedExecutorBackend。

5.3.3 ExecutorBackend中的通信

ExecutorBackend是一个被Executor使用的可插拔的与集群通信的接口。在ExecutorBackend中有statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer)方法,通过这个方法向集群发送Task执行的各种信息,如果任务执行失败,则返回失败的信息;如果执行成功,则返回任务执行的结果。本节重点讲解在Standalone模式下CoarseGrainedExecutor-Backend中的通信。CoarseGrainedExecutorBackend在整个集群中的通信如图5-4所示。

在图5-4中,Executor与CoarseGrainedExecutorBackend协作,将任务计算的结果通过CoarseGrainedExecutorBackend的statusUpdate方法将taskId、TaskState以及结果数据发送给Driver。Driver收到StatusUpdate(executorId,tasked,state,data)消息,通过判断state的不同状态,进行不同的处理。例如,当state的状态为TaskState.LOST时,Driver端会移除Executor;当state的状态为TaskState.FINISHED时,Driver端会调用enqueueSuccessfulTask进行处理。

这里主要看CoarseGrainedExecutorBackend与Driver之间的通信。当Worker节点中启动ExecutorRunner时,ExecutorRunner中会启动CoarseGrainedExecutorBackend进程,在CoarseGrainedExecutorBackend的onStart方法中,向Driver发出RegisterExecutor注册请求。源码如下所示。

图5-4 CoarseGrainedExecutorBackend在整个集群中的通信

CoarseGrainedExecutorBackend的onStart方法的源码如下。

1.      override def onStart() {
2.      logInfo("Connecting to driver: " + driverUrl)
3.      rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
4.        //这是一个非常快的行动,所以我们可以用"ThreadUtils.sameThread"
5.        driver = Some(ref)
6.   //向Driver发送ask请求,等待Driver回应
7.        ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores,
          extractLogUrls))
8.      }(ThreadUtils.sameThread).onComplete {
9.        //这是一个非常快的行动,所以我们可以用"ThreadUtils.sameThread"
10.       case Success(msg) =>
11.         //经常收到true,可以忽略
12.       case Failure(e) =>
13.         exitExecutor(1, s"Cannot register with driver: $driverUrl", e,
            notifyDriver = false)
14.     }(ThreadUtils.sameThread)
15.   }

上面的代码中,Some(ref)得到Driver的引用,通过ask方法返回Future[Boolean],然后在Future对象上调用onComplete方法进行额外的处理。Driver端收到注册请求,将会注册Executor的请求,并向ListenerBus中发送SparkListenerExecutorAdded事件。

如果executorDataMap中已经存在该Executor的id,就返回RegisterExecutorFailed,如果不存在该Executor的id,则在executorDataMap中加入该Executor的id,并返回RegisteredExecutor消息且向listenerBus中添加SparkListenerExecutorAdded事件。CoarseGrainedExecutorBackend收到RegisteredExecutor消息后,将会新建一个Executor执行器,并为此Executor充当信使,与Driver通信。CoarseGrainedExecutorBackend收到RegisteredExecutor消息的源码如下所示。

CoarseGrainedExecutorBackend.scala的receive的源码如下。

1.   override def receive: PartialFunction[Any, Unit] = {
2.      case RegisteredExecutor =>
3.        logInfo("Successfully registered with driver")
4.        try {
5.    //收到RegisteredExecutor消息,立即创建Executor
6.          executor = new Executor(executorId, hostname, env, userClassPath,
            isLocal = false)
7.        } catch {
8.          case NonFatal(e) =>
9.            exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
10.       }

从上面的代码中可以看到,CoarseGrainedExecutorBackend收到RegisteredExecutor消息后,将会新建一个Executor。由此可见,Executor在CoarseGrainedExecutorBackend后实例化,这与Executor和CoarseGrainedExecutorBackend的不同职责有关,Executor主要负责计算,而CoarseGrainedExecutorBackend主要负责通信,通信环境准备好了,架起同CoarseGrainedSchedulerBackend通信的桥梁,就可以接收CoarseGrainedSchedulerBackend中调用launchTask方法发送的LaunchTask消息了,因此通信在前,计算在后。

Executor中的计算结果是通过CoarseGrainedExecutorBackend的statusUpdate方法返回给CoarseGrainedExecutorBackend的。statusUpdate方法的代码如下所示。

CoarseGrainedExecutorBackend.scala的源码如下。

1.      override def statusUpdate(taskId: Long, state: TaskState, data:
        ByteBuffer) {
2.      val msg = StatusUpdate(executorId, taskId, state, data)
3.      driver match {
4.  //向Driver发送StatusUpdate消息
5.        case Some(driverRef) => driverRef.send(msg)
6.        case None => logWarning(s"Drop $msg because has not yet connected
          to driver")
7.      }
8.    }

上面源码中,通过参数taskId、state、data构建一个StatusUpdate对象,该对象将被当作消息发送到Driver端,Driver根据返回结果的需要,将会向CoarseGrainedExecutorBackend发送新的指令消息,如LaunchTask、KillTask、StopExecutors、Shutdown等。

5.3.4 ExecutorBackend的异常处理

若CoarseGrainedExecutorBackend在运行中出现异常,将调用exitExecutor方法进行处理,处理以后,系统退出。exitExecutor函数可以由其他子类重载来处理,Executor执行的退出方式不同。例如,当Executor挂掉了,后台程序可能不会让父进程也挂掉。如果须通知Driver,Driver将清理挂掉的Executor的数据。

CoarseGrainedExecutorBackend的exitExecutor方法的源码如下。

1.  protected def exitExecutor(code: Int,
2.                           reason: String,
3.                               throwable: Throwable = null,
4.                               notifyDriver: Boolean = true) = {
5.      val message = "Executor self-exiting due to : " + reason
6.      if (throwable != null) {
7.        logError(message, throwable)
8.      } else {
9.        logError(message)
10.     }
11.
12.     if (notifyDriver && driver.nonEmpty) {
13.       driver.get.ask[Boolean](
14.         RemoveExecutor(executorId, new ExecutorLossReason(reason))
15.       ).onFailure { case e =>
16.         logWarning(s"Unable to notify the driver due to " + e.getMessage, e)
17.       }(ThreadUtils.sameThread)
18.     }
19.
20.     System.exit(code)
21.   }
22. }

CoarseGrainedExecutorBackend在运行中一旦出现异常情况,将调用exitExecutor方法处理。

 Executor向Driver注册RegisterExecutor失败。

 Executor收到Driver的RegisteredExecutor注册成功消息以后,创建Executor实例失败。

 Driver返回Executor注册失败消息RegisterExecutorFailed。

 Executor收到Driver的LaunchTask启动任务消息,但是Executor为null。

 Executor收到Driver的KillTask消息,但是Executor为null。

 Executor和Driver失去连接。