6.5 Spark 1.6 RPC内幕解密:运行机制、源码详解、Netty与Akka等

Spark 1.6推出了以RpcEnv、RPCEndpoint、RPCEndpointRef为核心的新型架构下的RPC通信方式,就目前的实现而言,其底层依旧是Akka。Akka是基于Actor的分布式消息通信系统,而在Spark 1.6中封装了Akka,提供更高层的Rpc实现,目的是移除对Akka的依赖,为扩展和自定义Rpc打下基础。

Spark 2.0版本中Rpc的变化情况如下。

 SPARK-6280:从Spark中删除Akka systemName。

 SPARK-7995:删除AkkaRpcEnv,并从Core的依赖中删除Akka。

 SPARK-7997:删除开发人员api SparkEnv.actorSystem和AkkaUtils。

RpcEnv是一个抽象类abstract class,传入SparkConf。RPC环境中[RpcEndpoint]需要注册自己的名字[RpcEnv]来接收消息。[RpcEnv]将处理消息发送到[RpcEndpointRef]或远程节点,并提供给相应的[RpcEndpoint]。[RpcEnv]]未被捕获的异常,[RpcEnv]将使用[RpcCallContext.sendFailure]发送异常给发送者,如果没有这样的发送者,则记录日志NotSerializableException。

RpcEnv.scala的源码如下。

1.    private[spark] abstract class RpcEnv(conf: SparkConf) {
2.
3.    private[spark] val defaultLookupTimeout = RpcUtils.lookupRpcTimeout(conf)
4.  ......

RpcCallContext.scala处理异常的方法包括reply、sendFailure、senderAddress,其中reply是给发送者发送一个信息。如果发送者是[RpcEndpoint],它的[RpcEndpoint.receive]将被调用。

其中,RpcCallContext的地址RpcAddress是一个case class,包括hostPort、toSparkURL等成员。

RpcAddress.scala的源码如下。

1.    private[spark] case class RpcAddress(host: String, port: Int) {
2.    def hostPort: String = host + ":" + port
3.    /**返回一个字符串,该字符串的形式为:spark://host:port*/
4.    def toSparkURL: String = "spark://" + hostPort
5.    override def toString: String = hostPort
6.  }

RpcAddress伴生对象object RpcAddress属于包org.apache.spark.rpc,fromURIString方法从String中提取出RpcAddress;fromSparkURL方法也是从String中提取出RpcAddress。说明:case class RpcAddress通过伴生对象object RpcAddress的方法调用,case class RpcAddress也有自己的方法fromURIString、fromSparkURL,而且方法fromURIString、fromSparkURL的返回值也是RpcAddress。

伴生对象RpcAddress的源码如下。

1.   private[spark] object RpcAddress {
2.    /**返回[RpcAddress]为代表的uri */
3.    def fromURIString(uri: String): RpcAddress = {
4.      val uriObj = new java.net.URI(uri)
5.      RpcAddress(uriObj.getHost, uriObj.getPort)
6.    }
7.    /**返回[RpcAddress],编码的形式:spark://host:port */
8.    def fromSparkURL(sparkUrl: String): RpcAddress = {
9.      val (host, port) = Utils.extractHostPortFromSparkUrl(sparkUrl)
10.     RpcAddress(host, port)
11.   }
12. }

RpcEnv解析:

(1)RpcEnv是RPC的环境(相当于Akka中的ActorSystem),所有的RPCEndpoint都需要注册到RpcEnv实例对象中(注册的时候会指定注册的名称,这样客户端就可以通过名称查询到RpcEndpoint的RpcEndpointRef引用,从而进行通信),在RpcEndpoint接收到消息后会调用receive方法进行处理。

(2)RpcEndpoint如果接收到需要reply的消息,就会交给自己的receiveAndReply来处理(回复时是通过RpcCallContext中的relpy方法来回复发送者的),如果不需要reply,就交给receive方法来处理。

(3)RpcEnvFactory是负责创建RpcEnv的,通过create方法创建RpcEnv实例对象,默认用Netty。

RpcEnv示意图如图6-4所示。

图6-4 RPCEnv示意图

回到RpcEnv.scala的源码,首先调用RpcUtils.lookupRpcTimeout(conf),返回RPC远程端点查找时默认Spark的超时时间。方法lookupRpcTimeout中构建了一个RpcTimeout,定义spark.rpc.lookupTimeout。spark.network.timeout的超时时间是120s。

RpcUtils.scala的lookupRpcTimeout方法的源码如下。

1.  def lookupRpcTimeout(conf: SparkConf): RpcTimeout = {
2.    RpcTimeout(conf, Seq("spark.rpc.lookupTimeout", "spark.network
      .timeout"), "120s")
3.  }

进入RpcTimeout,进行RpcTimeout关联超时的原因描述,当TimeoutException发生的时候,关于超时的额外的上下文将包含在异常消息中。

RpcTimeout.scala的源码如下。

1.  private[spark] class RpcTimeout(val duration: FiniteDuration, val
    timeoutProp: String)
2.  extends Serializable {
3.
4.  /**修正TimeoutException标准的消息包括描述 */
5.  private def createRpcTimeoutException(te: TimeoutException):
    RpcTimeoutException = {
6.    new RpcTimeoutException(te.getMessage + ". This timeout is controlled
      by " + timeoutProp, te)
7.  }

其中的RpcTimeoutException继承自TimeoutException。

1.    private[rpc] class RpcTimeoutException(message: String, cause:
      TimeoutException)
2.  extends TimeoutException(message) { initCause(cause) }

其中的TimeoutException继承自Exception。

1.     public class TimeoutException extends Exception {
2.  ......
3.      public TimeoutException(String message) {
4.          super(message);
5.      }
6.  }

回到RpcTimeout.scala,其中的addMessageIfTimeout方法,如果出现超时,将加入这些信息。

RpcTimeout.scala的addMessageIfTimeout的源码如下。

1.  def addMessageIfTimeout[T]: PartialFunction[Throwable, T] = {
2.    //异常已被转换为一个RpcTimeoutException,就抛出它
3.    case rte: RpcTimeoutException => throw rte
4.    //其他TimeoutException异常转换为修改的消息RpcTimeoutException
5.    case te: TimeoutException => throw createRpcTimeoutException(te)
6.  }

RpcTimeout.scala中的awaitResult方法比较关键:awaitResult一直等结果完成并获得结果,如果在指定的时间没有返回结果,就抛出异常[RpcTimeoutException]。

Spark 2.1.1版本的RpcTimeout.scala的源码如下。

1.    def awaitResult[T](future: Future[T]): T = {
2.      val wrapAndRethrow: PartialFunction[Throwable, T] = {
3.        case NonFatal(t) =>
4.          throw new SparkException("Exception thrown in awaitResult", t)
5.      }
6.      try {
7.        //scalastyle:关闭awaitresult
8.        Await.result(future, duration)
9.        //scalastyle:打开awaitresult
10.     } catch addMessageIfTimeout.orElse(wrapAndRethrow)
11.   }
12. }

Spark 2.2.0版本的RpcTimeout.scala的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第2~10行整体被替换,调整为调用ThreadUtils.awaitResult(future, duration)。

1.   /**
2.    * 等待完成的结果并返回结果。如果结果不在这个超时  timeout   范围内,就抛出一个异常
      * [RpcTimeoutException]表示配置控制超时
3.    *
4.    * @param  future    `Future` 将被等待
5.    * @throws RpcTimeoutException如果在等待指定的时间future还没准备好
6.    */
7.   def awaitResult[T](future: Future[T]): T = {
8.     try {
9.       ThreadUtils.awaitResult(future, duration)
10.    } catch addMessageIfTimeout
11.  }
12. }

其中的future是Future[T]类型,继承自Awaitable。

1.  trait Future[+T] extends Awaitable[T]

Awaitable是一个trait,其中的ready方法是指Duration时间片内,Awaitable的状态变成completed状态,就是ready。在Await.result中,result本身是阻塞的。

Awaitable.scala的源码如下。

1.    trait Awaitable[+T] {
2.  ......
3.  def ready(atMost: Duration)(implicit permit: CanAwait): this.type
4.  ......
5.    @throws(classOf[Exception])
6.    def result(atMost: Duration)(implicit permit: CanAwait): T
7.  }
8.

回到RpcEnv.scala中,其中endpointRef方法返回我们注册的RpcEndpoint的引用,是代理的模式。我们要使用RpcEndpoint,是通过RpcEndpointRef来使用的。Address方法是RpcEnv监听的地址;setupEndpoint方法注册时根据RpcEndpoint名称返回RpcEndpointRef。fileServer返回用于服务文件的文件服务器实例。如果RpcEnv不以服务器模式运行,可能是null值。

RpcEnv.scala的源码如下。

1.   private[spark] abstract class RpcEnv(conf: SparkConf) {
2.
3.    private[spark] val defaultLookupTimeout = RpcUtils.lookupRpcTimeout
      (conf)
4.  ......
5.    private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef
6.  def address: RpcAddress
7.  def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef
8.  .......
9.    def fileServer: RpcEnvFileServer
10. ......

RpcEnv.scala中的RpcEnvFileServer方法中的RpcEnvConfig是一个case class。RpcEnvFileServer的源码如下。

1.    private[spark] trait RpcEnvFileServer {
2.    def addFile(file: File): String
3.  ......
4.  private[spark] case class RpcEnvConfig(
5.      conf: SparkConf,
6.      name: String,
7.      bindAddress: String,
8.      advertiseAddress: String,
9.      port: Int,
10.     securityManager: SecurityManager,
11.     clientMode: Boolean)

RpcEnv是一个抽象类,其具体的子类是NettyRpcEnv。Spark 1.6版本中包括AkkaRpcEnv和NettyRpcEnv两种方式。Spark 2.0版本中只有NettyRpcEnv。

下面看一下RpcEnvFactory。RpcEnvFactory是一个工厂类,创建[RpcEnv],必须有一个空构造函数,以便可以使用反射创建。create根据具体的配置,反射出具体的实例对象。RpcEndpoint方法中定义了receiveAndReply方法和receive方法。

RpcEndpoint.scala的源码如下。

1.   private[spark] trait RpcEnvFactory {
2.
3.    def create(config: RpcEnvConfig): RpcEnv
4.  }
5.  private[spark] trait RpcEndpoint {
6.  ......
7.    val rpcEnv: RpcEnv
8.
9.    ......
10.   final def self: RpcEndpointRef = {
11.     require(rpcEnv != null, "rpcEnv has not been initialized")
12.     rpcEnv.endpointRef(this)
13.   }
14. .......
15.
16.   def receive: PartialFunction[Any, Unit] = {
17.     case _ => throw new SparkException(self + " does not implement
        'receive'")
18.   }
19. ......
20.   def receiveAndReply(context: RpcCallContext): PartialFunction[Any,
      Unit] = {
21.     case _ => context.sendFailure(new SparkException(self + " won't reply
        anything"))
22.   }
23. ......

Master继承自ThreadSafeRpcEndpoint,接收消息使用receive方法和receiveAndReply方法。

其中,ThreadSafeRpcEndpoint继承自RpcEndpoint:ThreadSafeRpcEndpoint是一个trait,需要RpcEnv线程安全地发送消息给它。线程安全是指在处理下一个消息之前通过同样的[ThreadSafeRpcEndpoint]处理一条消息。换句话说,改变[ThreadSafeRpcEndpoint]的内部字段在处理下一个消息是可见的,[ThreadSafeRpcEndpoint]的字段不需要volatile或equivalent,不能保证对于不同的消息在相同的[ThreadSafeRpcEndpoint]线程中来处理。

1.  private[spark] trait ThreadSafeRpcEndpoint extends RpcEndpoint

回到RpcEndpoint.scala,重点看一下receiveAndReply方法和receive方法。receive方法处理从[RpcEndpointRef.send]或者[RpcCallContext.reply]发过来的消息,如果收到一个不匹配的消息,[SparkException]会抛出一个异常onError。receiveAndReply方法处理从[RpcEndpointRef.ask]发过来的消息,如果收到一个不匹配的消息,[SparkException]会抛出一个异常onError。receiveAndReply方法返回PartialFunction对象。

RpcEndpoint.scala的源码如下。

1.     def receive: PartialFunction[Any, Unit] = {
2.    case _ => throw new SparkException(self + " does not implement
      'receive'")
3.  }
4.
5.  ......
6.    def receiveAndReply(context: RpcCallContext): PartialFunction[Any,
      Unit] = {
7.      case _ => context.sendFailure(new SparkException(self + " won't reply
        anything"))
8.    }

在Master中,Receive方法中收到消息以后,不需要回复对方。

Master.scala的Receive方法的源码如下。

1.    override def receive: PartialFunction[Any, Unit] = {
2.     case ElectedLeader =>
3.     .....
4.   recoveryCompletionTask = forwardMessageThread.schedule(new Runnable {
5.           override def run(): Unit = Utils.tryLogNonFatalError {
6.             self.send(CompleteRecovery)
7.           }
8.         }, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
9.       }
10.
11.    case CompleteRecovery => completeRecovery()
12.
13.
14.    case RevokedLeadership =>
15.      logError("Leadership has been revoked -- master shutting down.")
16.      System.exit(0)
17.
18.    case RegisterApplication(description, driver) =>
19.     ......
20.        schedule()
21.

在Master中,receiveAndReply方法中收到消息以后,都要通过context.reply回复对方。

在Master中,RpcEndpoint如果接收到需要reply的消息,就会交给自己的receiveAndReply来处理(回复时是通过RpcCallContext中的relpy方法来回复发送者的),如果不需要reply,就交给receive方法来处理。

RpcCallContext的源码如下。

1.  private[spark] trait RpcCallContext {
2.
3.   /**
       *回复消息的发送者。如果发送者是[RpcEndpoint],其[RpcEndpoint.receive]
       *将被调用
5.     */
6.    def reply(response: Any): Unit
7.
8.   /**
       *向发送方报告故障
9.     */
10.   def sendFailure(e: Throwable): Unit
11.
12.  /**
       *此消息的发送者
13.    */
14.   def senderAddress: RpcAddress
15. }

回到RpcEndpoint.scala,RpcEnvFactory是一个trait,负责创建RpcEnv,通过create方法创建RpcEnv实例对象,默认用Netty。

RpcEndpoint.scala的源码如下。

1.   private[spark] trait RpcEnvFactory {
2.
3.    def create(config: RpcEnvConfig): RpcEnv
4.  }

RpcEnvFactory的create方法没有具体的实现。下面看一下RpcEnvFactory子类NettyRpcEnvFactory中create的具体实现,使用的方式为nettyEnv。

NettyRpcEnv.scala的create方法的源码如下。

1.    def create(config: RpcEnvConfig): RpcEnv = {
2.      val sparkConf = config.conf
3.      //在多个线程中使用JavaSerializerInstance 是安全的。然而,如果将来计划支持
        //KryoSerializer,必须使用ThreadLocal来存储SerializerInstance
4.
5.      val javaSerializerInstance =
6.        new JavaSerializer(sparkConf).newInstance().asInstanceOf
          [JavaSerializerInstance]
7.      val nettyEnv =
8.        new NettyRpcEnv(sparkConf, javaSerializerInstance,
          config.advertiseAddress,
9.          config.securityManager)
10.     if (!config.clientMode) {
11.       val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
12.         nettyEnv.startServer(config.bindAddress, actualPort)
13.         (nettyEnv, nettyEnv.address.port)
14.       }
15.       try {
16.         Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf,
            config.name)._1
17.       } catch {
18.         case NonFatal(e) =>
19.           nettyEnv.shutdown()
20.           throw e
21.       }
22.     }
23.     nettyEnv
24.   }
25. }

在Spark 2.0版本中回溯一下NettyRpcEnv的实例化过程。在SparkContext实例化时调用createSparkEnv方法。

SparkContext.scala的源码如下。

1.  ......
2.  _env = createSparkEnv(_conf, isLocal, listenerBus)
3.      SparkEnv.set(_env)
4.  ......
5.
6.    private[spark] def createSparkEnv(
7.        conf: SparkConf,
8.        isLocal: Boolean,
9.        listenerBus: LiveListenerBus): SparkEnv = {
10.     SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.
        numDriverCores(master))
11.   }
12.
13. .....

SparkContext的createSparkEnv方法中调用了SparkEnv.createDriverEnv方法。下面看一下createDriverEnv方法的实现,其调用了create方法。

SparkEnv.scala的createDriverEnv的源码如下。

1.    private[spark] def createDriverEnv(
2.      .......
3.      create(
4.        conf,
5.        SparkContext.DRIVER_IDENTIFIER,
6.        bindAddress,
7.        advertiseAddress,
8.        port,
9.        isLocal,
10.       numCores,
11.       ioEncryptionKey,
12.       listenerBus = listenerBus,
13.       mockOutputCommitCoordinator = mockOutputCommitCoordinator
14.     )
15.   }
16.
17.  private def create(
18.     ........
19.     val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress,
        port, conf,
20.       securityManager, clientMode = !isDriver)
21. ......

在RpcEnv.scala中,creat方法直接调用new()函数创建一个NettyRpcEnvFactory,调用NettyRpcEnvFactory().create方法,NettyRpcEnvFactory继承自RpcEnvFactory。在Spark 2.0中,RpcEnvFactory直接使用NettyRpcEnvFactory的方式。

RpcEnv.scala的源码如下。

1.   private[spark] object RpcEnv {
2.   .......
3.
4.    def create(
5.        name: String,
6.        bindAddress: String,
7.        advertiseAddress: String,
8.        port: Int,
9.        conf: SparkConf,
10.       securityManager: SecurityManager,
11.       clientMode: Boolean): RpcEnv = {
12.     val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress,
        port, securityManager,
13.       clientMode)
14.     new NettyRpcEnvFactory().create(config)
15.   }

NettyRpcEnvFactory().create的方法如下。

NettyRpcEnv.scala的源码如下。

1.    private[rpc]     class   NettyRpcEnvFactory       extends   RpcEnvFactory  with
      Logging {
2.
3.    def create(config: RpcEnvConfig): RpcEnv = {
4.    ......
5.      val nettyEnv =
6.        new NettyRpcEnv(sparkConf, javaSerializerInstance,
          config.advertiseAddress,
7.          config.securityManager)
8.      if (!config.clientMode) {
9.        val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
10.         nettyEnv.startServer(config.bindAddress, actualPort)
11.         (nettyEnv, nettyEnv.address.port)
12.       }
13.       try {
14.         Utils.startServiceOnPort(config.port, startNettyRpcEnv,
            sparkConf, config.name)._1
15.       } catch {
16.         case NonFatal(e) =>
17.           nettyEnv.shutdown()
18.           throw e
19.       }
20.     }
21.     nettyEnv
22.   }
23. }

在NettyRpcEnvFactory().create中调用new()函数创建一个NettyRpcEnv。NettyRpcEnv传入SparkConf参数,包括fileServer、startServer等方法。

NettyRpcEnv的源码如下。

1.   private[netty] class NettyRpcEnv(
2.      val conf: SparkConf,
3.      javaSerializerInstance: JavaSerializerInstance,
4.      host: String,
5.      securityManager: SecurityManager) extends RpcEnv(conf) with Logging {
6.
7.  ......
8.    override def fileServer: RpcEnvFileServer = streamManager
9.  ......
10.   def startServer(bindAddress: String, port: Int): Unit = {
11.     val bootstraps: java.util.List[TransportServerBootstrap] =
12.       if (securityManager.isAuthenticationEnabled()) {
13.         java.util.Arrays.asList(new         SaslServerBootstrap(transportConf,
            securityManager))
14.       } else {
15.         java.util.Collections.emptyList()
16.       }
17.     server = transportContext.createServer(bindAddress, port, bootstraps)
18.     dispatcher.registerRpcEndpoint(
19.       RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
20.   }

NettyRpcEnv.scala的startServer中,通过transportContext.createServer创建Server,使用dispatcher.registerRpcEndpoint方法dispatcher注册RpcEndpoint。在createServer方法中调用new()函数创建一个TransportServer。

TransportContext的createServer方法的源码如下。

1.  public TransportServer createServer(
2.      String host, int port, List<TransportServerBootstrap> bootstraps) {
3.    return new TransportServer(this, host, port, rpcHandler, bootstraps);
4.  }

TransportServer.java的源码如下。

1.  public TransportServer(
2.        TransportContext context,
3.        String hostToBind,
4.        int portToBind,
5.        RpcHandler appRpcHandler,
6.        List<TransportServerBootstrap> bootstraps) {
7.      this.context = context;
8.      this.conf = context.getConf();
9.      this.appRpcHandler = appRpcHandler;
10.     this.bootstraps = Lists.newArrayList(Preconditions.checkNotNull
        (bootstraps));
11.
12.     try {
13.       init(hostToBind, portToBind);
14.     } catch (RuntimeException e) {
15.       JavaUtils.closeQuietly(this);
16.       throw e;
17.     }
18.   }

TransportServer.java中的关键方法是init,这是Netty本身的实现内容。

TransportServer.java中的init的源码如下。

1.         private void init(String hostToBind, int portToBind) {
2.
3.      IOMode ioMode = IOMode.valueOf(conf.ioMode());
4.      EventLoopGroup bossGroup =
5.        NettyUtils.createEventLoop(ioMode, conf.serverThreads(),
          conf.getModuleName() + "-server");
6.      EventLoopGroup workerGroup = bossGroup;
7.  .......

接下来,我们看一下RpcEndpointRef。RpcEndpointRef是一个抽象类,是代理模式。

RpcEndpointRef.scala的源码如下。

1.   private[spark] abstract class RpcEndpointRef(conf: SparkConf)
2.    extends Serializable with Logging {
3.
4.    private[this] val maxRetries = RpcUtils.numRetries(conf)
5.    private[this] val retryWaitMs = RpcUtils.retryWaitMs(conf)
6.    private[this] val defaultAskTimeout = RpcUtils.askRpcTimeout(conf)
7.  ......
8.  def send(message: Any): Unit
9.  def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]
10. .....

NettyRpcEndpointRef是RpcEndpointRef的具体实现子类。ask方法通过调用nettyEnv.ask传递消息。RequestMessage是一个case class。

Spark 2.1.1版本的NettyRpcEnv.scala的NettyRpcEndpointRef的源码如下。

1.   private[netty] class NettyRpcEndpointRef(
2.      @transient private val conf: SparkConf,
3.      endpointAddress: RpcEndpointAddress,
4.      @transient @volatile private var nettyEnv: NettyRpcEnv)
5.    extends RpcEndpointRef(conf) with Serializable with Logging {
6.  ......
7.   override def ask[T: ClassTag](message: Any, timeout: RpcTimeout):
     Future[T] = {
8.      nettyEnv.ask(RequestMessage(nettyEnv.address, this, message), timeout)
9.    }
10. ......

Spark 2.2.0版本的NettyRpcEnv.scala的NettyRpcEndpointRef的源码与Spark 2.1.1版本相比具有如下特点。

 上段代码中第3行endpointAddress增加了private访问限制。

 上段代码中第5行删掉了Serializable及Logging的继承。

1.  private[netty] class NettyRpcEndpointRef(
2.     @transient private val conf: SparkConf,
3.     private val endpointAddress: RpcEndpointAddress,
4.     @transient @volatile private var nettyEnv: NettyRpcEnv) extends
       RpcEndpointRef(conf) {

下面从实例的角度来看RPC的应用:

RpcEndpoint的生命周期:构造(constructor)->启动(onStart)、消息接收(receive、receiveAndReply )、停止(onStop)。

Master中接收消息的方式有两种:①receive接收消息不回复;②receiveAndReply通过context.reply的方式回复消息。例如,Worker发送Master的RegisterWorker消息,当Master注册成功,Master就返回Worker RegisteredWorker消息。

Worker启动时,从生命周期的角度,Worker实例化的时候提交Master进行注册。

Worker的onStart的源码如下。

1.    override def onStart() {
2.  .......
3.    registerWithMaster()
4.
5.    metricsSystem.registerSource(workerSource)
6.    metricsSystem.start()
7.    //Attach the worker metrics servlet handler to the web ui after the
      //metrics system is started.
8.    metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
9.  }

进入registerWithMaster方法:

Worker的registerWithMaster的源码如下。

1.  private def registerWithMaster() {
2.     ......
3.         registerMasterFutures = tryRegisterAllMasters()
4.      ....

进入tryRegisterAllMasters方法:在rpcEnv.setupEndpointRef中根据masterAddress、ENDPOINT_NAME名称获取RpcEndpointRef。

Spark 2.1.1版本的Worker的tryRegisterAllMasters的源码如下。

1.  private def tryRegisterAllMasters(): Array[JFuture[_]] = {
2.   ......
3.            val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress,
              Master.ENDPOINT_NAME)
4.            registerWithMaster(masterEndpoint)
5.       ......

Spark 2.2.0版本的Worker的tryRegisterAllMasters的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第4行registerWithMaster方法调整为sendRegisterMessageToMaster方法。

1.    private def tryRegisterAllMasters(): Array[JFuture[_]] = {
2.  ......
3.     val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress,
       Master.ENDPOINT_NAME)
4.              sendRegisterMessageToMaster(masterEndpoint)
5.  ......

基于masterEndpoint,使用registerWithMaster方法注册。registerWithMaster方法中通过ask方法发送RegisterWorker消息,并要求发送返回结果,返回的消息类型为RegisterWorkerResponse。然后进行模式匹配,如果成功,就handleRegisterResponse。如果失败,就退出。

Spark 2.1.1版本的Worker.scala的registerWithMaster的源码如下。

1.  private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = {
2.      masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker(
3.        workerId, host, port, self, cores, memory, workerWebUiUrl))
4.        .onComplete {
5.          //这是一个非常快的行动,所以可以用ThreadUtils.sameThread
6.          case Success(msg) =>
7.            Utils.tryLogNonFatalError {
8.              handleRegisterResponse(msg)
9.            }
10.         case Failure(e) =>
11.           logError(s"Cannot register with master: ${masterEndpoint
              .address}", e)
12.           System.exit(1)
13.       }(ThreadUtils.sameThread)
14.   }

handleRegisterResponse方法中的模式匹配,收到RegisteredWorker消息进行相应的处理。

Worker.scala的handleRegisterResponse的源码如下。

1.  private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit =
    synchronized {
2.      msg match {
3.        case RegisteredWorker(masterRef, masterWebUiUrl) =>
4.       .......
5.

Spark 2.1.1版本中,registerWithMaster方法中的Worker发送RegisterWorker消息给Master,此时,Worker同步收到Master回复的RegisterWorkerResponse消息以后还须根据成功或失败的情况,通过handleRegisterResponse进行后续的处理。

Spark 2.2.0版本将registerWithMaster方法调整为sendRegisterMessageToMaster方法。sendRegisterMessageToMaster方法中的Worker发送RegisterWorker消息给Master以后,就完成此次注册。Master节点收到RegisterWorker消息另行处理,如果注册成功,Master就发送Worker节点成功的RegisteredWorker消息;如果注册失败,Master就发送Worker节点失败的RegisterWorkerFailed消息。

1.    private def sendRegisterMessageToMaster(masterEndpoint:
      RpcEndpointRef): Unit = {
2.     masterEndpoint.send(RegisterWorker(
3.       workerId,
4.       host,
5.       port,
6.       self,
7.       cores,
8.       memory,
9.       workerWebUiUrl,
10.      masterEndpoint.address))
11.  }