10.1 Spark中Broadcast原理和源码详解

本节讲解Spark中Broadcast原理及Spark中Broadcast源码。

10.1.1 Spark中Broadcast原理详解

Broadcast在机器学习、图计算、构建日常的各种算法中到处可见。 Broadcast将数据从一个节点发送到其他节点上;例如,Driver上有一张表,而Executor中的每个并行执行的Task (100万个Task)都要查询这张表,那我们通过Broadcast的方式只需要往每个Executor发送一次这张表就行了,Executor中的每个运行的Task查询这张唯一的表,而不是每次执行的时候都从Driver获得这张表。

Java中的Servlet里有一个ServletContext,是JSP或Java代码运行时的上下文,通过上下文可以获取各种资源。Broadcast类似于ServletContext中的资源、变量或数据,Broadcast广播出去是基于Executor的,里面的每个任务可以用上下文,Task的上下文就是Executor,可以抓取数据。这就好像ServletContext的具体作用,只是Broadcast是分布式的共享数据,默认情况下,只要程序在运行,Broadcast变量就会存在,因为Broadcast在底层是通过BlockManager管理的。但是,你可以手动指定或者配置具体周期来销毁Broadcast变量。可以指定Broadcast的unpersist销毁Broadcast变量,因为Spark应用程序中可能运行很多Job,可能一个Job需要很多Broadcast变量,但下一个Job不需要这些变量,但是应用程序还存在,因此需手工销毁Broadcast变量。

Broadcast一般用于处理共享配置文件、通用的Dataset、常用的数据结构等;但是在Broadcast中不适合存放太大的数据,Broadcast不会内存溢出,因为其数据的保存的StorageLevel是MEMORY_AND_DISK的方式;虽然如此,我们也不可以放入太大的数据在Broadcast中,因为网络I/O和可能的单点压力会非常大!(Spark 1.6版本Broadcast有两种方式:HttpBroadcast、TorrentBroadcast。HttpBroadcast可能有单点压力; TorrentBroadcast下载没有单点压力,但可能有网络压力)Spark 2.0版本中已经去掉HTTPBroadcast (SPARK-12588)了,Spark 2.0版本的TorrentBroadcast是Broadcast唯一的广播实现方式。

广播Broadcast变量是只读变量,如果Broadcast不是只读变量而可以更新,那带来的问题是:①一个节点上Broadcast可以更新,其他的节点Broadcast也要更新;②如果多个节点Broadcast同时更新,如何确定更新的顺序,以及容错等内容。因此,广播Broadcast变量是只读变量,最为轻松保持了数据的一致性!

Broadcast广播变量是只读变量,缓存在每个节点上,而不是每个Task去获取它的一份复制副本。例如,以高效的方式给每个节点发送一个dataset的副本。Spark尝试在分布式发送广播变量时使用高效的广播算法减少通信的成本。

广播变量是由一个变量V通过调用[org.apache.spark.SparkContext#broadcast]创建的。广播变量是一个围绕V的包装器,它的值可以通过调用value方法来获取。例如:

1.  scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
2.  broadcastVar:org.apache.spark.broadcast.Broadcast[Array[Int]]=Broadcast(0)
3.
4.  scala> broadcastVar.value
5.  res0: Array[Int] = Array(1, 2, 3)

如果要更新广播变量,只有再广播一次,那就是一个新的广播变量,使用一个新的广播变量ID。

广播变量创建后,在群集上运行时,V变量不是在任何函数都使用,以便V传送到节点时不止一次。此外,对象V不应该被修改,是为了确保广播所有节点得到相同的广播变量值(例如,如果变量被发送到后来的一个新节点)。

Broadcast的源码如下。

1.   @param id  广播变量的唯一标识符。
2.   @tparam T   广播变量的数据类型。
3.   abstract class Broadcast[T: ClassTag](val id: Long) extends Serializable
     with Logging {
4.
5.    @volatile private var _isValid = true
6.
7.    private var _destroySite = ""
8.
9.    /**获得广播值.*/
10.   def value: T = {
11.     assertValid()
12.     getValue()
13.   }
14. ......

Spark 1.6版本的HttpBroadcast方式的Broadcast,最开始的时候数据放在Driver的本地文件系统中,Driver在本地会创建一个文件夹来存放Broadcast中的data,然后启动HttpServer访问文件夹中的数据,同时写入到BlockManager(StorageLevel是MEMORY_AND_DISK)中获得BlockId(BroadcastBlockId),当第一次Executor中的Task要访问Broadcast变量的时候,会向Driver通过HttpServer来访问数据,然后会在Executor中的BlockManager中注册该Broadcast中的数据BlockManager,Task访问Broadcast变量时,首先查询BlockManager,如果BlockManager中已有此数据,Task就可直接使用BlockManager中的数据(说明SPARK-12588,HTTPBroadcast方式在Spark 2.0版本中已经去掉)。

10.1.2 Spark中Broadcast源码详解

BroadcastManager是用来管理Broadcast的,该实例对象是在SparkContext创建SparkEnv的时候创建的。

SparkEnv.scala的源码如下。

1.  val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
2.
3.   val mapOutputTracker = if (isDriver) {
4.     new MapOutputTrackerMaster(conf, broadcastManager, isLocal)
5.   } else {
6.     new MapOutputTrackerWorker(conf)
7.   }

BroadcastManager.scala中BroadcastManager实例化的时候会调用initialize方法,initialize方法就创建TorrentBroadcastFactory的方式。

BroadcastManager的源码如下。

1.
2.  private[spark] class BroadcastManager(
3.      val isDriver: Boolean,
4.      conf: SparkConf,
5.      securityManager: SecurityManager)
6.    extends Logging {
7.
8.    private var initialized = false
9.    private var broadcastFactory: BroadcastFactory = null
10.
11.   initialize()
12.
13.   //使用广播前,被SparkContext或Executor调用
14.   private def initialize() {
15.     synchronized {
16.       if (!initialized) {
17.         broadcastFactory = new TorrentBroadcastFactory
18.         broadcastFactory.initialize(isDriver, conf, securityManager)
19.         initialized = true
20.       }
21.     }
22.   }

Spark 2.0版本中的TorrentBroadcast方式:数据开始在Driver中,A节点如果使用了数据,A就成为供应源,这时Driver节点、A节点两个节点成为供应源,如第三个节点B访问的时候,第三个节点B也成为了供应源,同样地,第四个节点、第五个节点……等都成为了供应源,这些都被BlockManager管理,这样不会导致一个节点压力太大,从理论上讲,数据使用的节点越多,网络速度就越快。

TorrentBroadcast按照BLOCK_SIZE(默认是4MB)将Broadcast中的数据划分成为不同的Block,然后将分块信息(也就是Meta信息)存放到Driver的BlockManager中,同时会告诉BlockManagerMaster,说明Meta信息存放完毕。

SparkContext.scala的broadcast方法的源码如下。

1.        def broadcast[T: ClassTag](value: T): Broadcast[T] = {
2.     assertNotStopped()
3.     require(!classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass),
4.       "Can not directly broadcast RDDs; instead, call collect() and
          broadcast the result.")
5.     val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
6.     val callSite = getCallSite
7.     logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm)
8.     cleaner.foreach(_.registerBroadcastForCleanup(bc))
9.     bc
10.  }

SparkContext.scala的broadcast方法中调用env.broadcastManager.newBroadcast。BroadcastManager.scala的newBroadcast方法如下。

1.  def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean): Broadcast[T]
     = {
2.     broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.
       getAndIncrement())
3.   }

newBroadcast方法调用new()函数创建一个Broadcast,第一个参数是Value,第三个参数是BroadcastId。这里,BroadcastFactory是一个trait,没有具体的实现。

1.    private[spark] trait BroadcastFactory {
2.  ......
3.  def newBroadcast[T: ClassTag](value: T, isLocal: Boolean, id: Long):
    Broadcast[T]
4.  ...

TorrentBroadcastFactory是BroadcastFactory的具体实现。

1.  private[spark] class TorrentBroadcastFactory extends BroadcastFactory {
2.  ......
3.  override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id:
    Long): Broadcast[T] = {
4.      new TorrentBroadcast[T](value_, id)
5.    }

BroadcastFactory的newBroadcast方法创建TorrentBroadcast实例。

Spark 2.1.1版本的TorrentBroadcast.scala的源码如下。

1.   private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
2.    extends Broadcast[T](id) with Logging with Serializable {
3.  ......
4.  private def readBlocks(): Array[ChunkedByteBuffer] = {
5.      //获取数据块。注意,所有这些块存储在BlockManager 且向driver汇报,其他
        //Executors 也可以从这个Executors 中提取这些块
6.      val blocks = new Array[ChunkedByteBuffer](numBlocks)
7.      val bm = SparkEnv.get.blockManager
8.
9.      for (pid <- Random.shuffle(Seq.range(0, numBlocks))) {
10.       val pieceId = BroadcastBlockId(id, "piece" + pid)
11.       logDebug(s"Reading piece $pieceId of $broadcastId")
12.       //第一次尝试getLocalBytes从本地读取:因为以前试图获取广播块时已经获取了一些
          //块,在这种情况下,一些块将在本地(在Executor上)
13.       bm.getLocalBytes(pieceId) match {
14.         case Some(block) =>
15.          blocks(pid) = block
16.          releaseLock(pieceId)
17.        case None =>
18.          bm.getRemoteBytes(pieceId) match {
19.            case Some(b) =>
20.              if (checksumEnabled) {
21.                val sum = calcChecksum(b.chunks(0))
22.                if (sum != checksums(pid)) {
23.                  throw new SparkException(s"corrupt remote block $pieceId
                     of $broadcastId:" +
24.                    s" $sum != ${checksums(pid)}")
25.                }
26.              }
27.              //从远程Executors/driver的BlockManager查找块,所以把块在
                 //Executor节点BlockManager
28.              if (!bm.putBytes(pieceId, b, StorageLevel.MEMORY_AND_DISK_
                  SER, tellMaster = true)) {
29.                throw new SparkException(
30.                  s"Failed to store $pieceId of $broadcastId in local
                     BlockManager")
31.              }
32.              blocks(pid) = b
33.            case None =>
34.              throw new SparkException(s"Failed to get $pieceId of
                 $broadcastId")
35.          }
36.      }
37.    }
38.    blocks
39.  }

Spark 2.2.0版本的TorrentBroadcast.scala的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第32行blocks(pid) = b调整为blocks(pid) = new ByteBufferBlockData(b, true) 。

1.     ......
2.      blocks(pid) = new ByteBufferBlockData(b, true)
3.  .......

TorrentBroadcast.scala的readBlocks方法中Random.shuffle(Seq.range(0, numBlocks)进行随机洗牌,是因为数据有很多来源DataServer,为了保持负载均衡,因此使用shuffle。

TorrentBroadcast将元数据信息存放到BlockManager,然后汇报给BlockManagerMaster。数据存放到BlockManagerMaster中就变成了全局数据,BlockManagerMaster具有所有的信息,Driver、Executor就可以访问这些内容。Executor运行具体的TASK的时候,通过TorrentBroadcast的方式readBlocks,如果本地有数据,就从本地读取,如果本地没有数据,就从远程读取数据。Executor读取信息以后,通过TorrentBroadcast的机制通知BlockManagerMaster数据多了一份副本,下一个Task读取数据的时候,就有两个选择,分享的节点越多,下载的供应源就越多,最终变成点到点的方式。

Broadcast可以广播RDD,Join操作性能优化之一也是采用Broadcast。