7.2 Shuffle的框架

本节讲解Shuffle的框架、Shuffle的框架内核、Shuffle数据读写的源码解析。Spark Shuffle从基于Hash的Shuffle,引入了Shuffle Consolidate机制(即文件合并机制),演进到基于Sort的Shuffle实现方式。随着Tungsten计划的引入与优化,引入了基于Tungsten-Sort的Shuffle实现方式。

7.2.1 Shuffle的框架演进

Spark的Shuffle框架演进历史可以从框架本身的演进、Shuffle具体实现机制的演进两部分进行解析。

框架本身的演进可以从面向接口编程的原则出发,结合Build设计模式进行理解。整个Spark的Shuffle框架从Spark 1.1版本开始,提供便于测试、扩展的可插拔式框架。

而对应Shuffle的具体实现机制的演进部分,可以跟踪Shuffle实现细节在各个版本中的变更。具体体现在Shuffle数据的写入或读取,以及读写相关的数据块解析方式。下面简单描述一下整个演进过程。

在Spark 1.1之前,Spark中只实现了一种Shuffle方式,即基于Hash的Shuffle。在基于Hash的Shuffle的实现方式中,每个Mapper阶段的Task都会为每个Reduce阶段的Task生成一个文件,通常会产生大量的文件(即对应为M×R个中间文件,其中,M表示Mapper阶段的Task个数,R表示Reduce阶段的Task个数)。伴随大量的随机磁盘I/O操作与大量的内存开销。

为了缓解上述问题,在Spark 0.8.1版本中为基于Hash的Shuffle的实现引入了Shuffle Consolidate机制(即文件合并机制),将Mapper端生成的中间文件进行合并的处理机制。通过将配置属性spark.shuffle.consolidateFiles设置为true,减少中间生成的文件数量。通过文件合并,可以将中间文件的生成方式修改为每个执行单位(类似于Hadoop的Slot)为每个Reduce阶段的Task生成一个文件。其中,执行单位对应为:每个Mapper阶段的Cores数/每个Task分配的Cores数(默认为1)。最终可以将文件个数从M×R修改为E×C/T×R,其中,E表示Executors个数,C表示可用Cores个数,T表示Task分配的Cores个数。

基于Hash的Shuffle的实现方式中,生成的中间结果文件的个数都会依赖于Reduce阶段的Task个数,即Reduce端的并行度,因此文件数仍然不可控,无法真正解决问题。为了更好地解决问题,在Spark 1.1版本引入了基于Sort的Shuffle实现方式,并且在Spark 1.2版本之后,默认的实现方式也从基于Hash的Shuffle,修改为基于Sort的Shuffle实现方式,即使用的ShuffleManager从默认的hash修改为sort。首先,每个Mapper阶段的Task不会为每个Reduce阶段的Task生成一个单独的文件;而是全部写到一个数据(Data)文件中,同时生成一个索引(Index)文件,Reduce阶段的各个Task可以通过该索引文件获取相关的数据。避免产生大量文件的直接收益就是降低随机磁盘I/O与内存的开销。最终生成的文件个数减少到2M,其中M表示Mapper阶段的Task个数,每个Mapper阶段的Task分别生成两个文件(1个数据文件、1个索引文件),最终的文件个数为M个数据文件与M个索引文件。因此,最终文件个数是2×M个。

随着Tungsten计划的引入与优化,从Spark 1.4版本开始(Tungsten计划目前在Spark 1.5与Spark 1.6两个版本中分别实现了第一与第二两个阶段),在Shuffle过程中也引入了基于Tungsten-Sort的Shuffle实现方式,通过Tungsten项目所做的优化,可以极大提高Spark在数据处理上的性能。

为了更合理、更高效地使用内存,在Spark的Shuffle实现方式演进过程中,引进了外部排序等处理机制(针对基于Sort的Shuffle机制。基于Hash的Shuffle机制从最原始的全部放入内存改为记录级写入)。同时,为了保存Shuffle结果提高性能以及支持资源动态分配等特性,也引进了外部Shuffle服务等机制。

7.2.2 Shuffle的框架内核

Shuffle框架的设计可以从两方面理解:一方面,为了Shuffle模块更加内聚并与其他模块解耦;另一方面,为了更方便替换、测试、扩展Shuffle的不同实现方式。从Spark 1.1版本开始,引进了可插拔式的Shuffle框架(通过将Shuffle相关的实现封装到一个统一的对外接口,提供一种具体实现可插拔的框架)。Spark框架中,通过ShuffleManager来管理各种不同实现机制的Shuffle过程,由ShuffleManager统一构建、管理具体实现子类来实现Shuffle框架的可插拔的Shuffle机制。

在详细描述Shuffle框架实现细节之前,先给出可插拔式Shuffle的整体架构的类图,如图7-2所示。

图7-2 可插拔式Shuffle的整体架构的类图

在DAG的调度过程中,Stage阶段的划分是根据是否有Shuffle过程,也就是当存在ShuffleDependency的宽依赖时,需要进行Shuffle,这时会将作业(Job)划分成多个Stage。对应地,在源码实现中,通过在划分Stage的关键点——构建ShuffleDependency时——进行Shuffle注册,获取后续数据读写所需的ShuffleHandle。

Stage阶段划分后,最终每个作业(Job)提交后都会对应生成一个ResultStage与若干个ShuffleMapStage,其中ResultStage表示生成作业的最终结果所在的Stage。ResultStage与ShuffleMapStage中的Task分别对应了ResultTask与ShuffleMapTask。一个作业,除了最终的ResultStage,其他若干ShuffleMapStage中的各个ShuffleMapTask都需要将最终的数据根据相应的分区器(Partitioner)对数据进行分组(即将数据重组到新的各个分区中),然后持久化分组后的数据。对应地,每个RDD本身记录了它的数据来源,在计算(compute)时会读取所需数据,对于带有宽依赖的RDD,读取时会获取在ShuffleMapTask中持久化的数据。

从图7-2中可以看到,外部宽依赖相关的RDD与ShuffleManager之间的注册交互,通过该注册,每个RDD自带的宽依赖(ShuffleDependency)内部会维护Shuffle的唯一标识信息ShuffleId以及与Shuffle过程具体读写相关的句柄ShuffleHandle,后续在ShuffleMapTask中启动任务(Task)的运行时,可以通过该句柄获取相关的Shuffle写入器实例,实现具体的数据磁盘写操作。

而在带有宽依赖(ShuffleDependency)的RDD中,执行compute时会去读取上一Stage为其输出的Shuffle数据,此时同样会通过该句柄获取相关的Shuffle读取器实例,实现具体数据的读取操作。需要注意的是,当前Shuffle的读写过程中,与BlockManager的交互,是通过MapOutputTracker来跟踪Shuffle过程中各个任务的输出数据的。在任务完成等场景中,会将对应的MapStatus信息注册到MapOutputTracker中,而在compute数据读取过程中,也会通过该跟踪器来获取上一Stage的输出数据在BlockManager中的位置,然后通过getReader得到的数据读取器,从这些位置中读取数据。

目前对Shuffle的输出进行跟踪的MapOutputTracker并没有和Shuffle数据读写类一样,也封装到Shuffle的框架中。如果从代码聚合与解耦等角度出发,也可以将MapOutputTracker合并到整个Shuffle框架中,然后在Shuffle写入器输出数据之后立即进行注册,在数据读取器读取数据前获取位置等(但对应的DAG等调度部分,也需要进行修改)。

ShuffleManager封装了各种Shuffle机制的具体实现细节,包含的接口与属性如下所示。

(1)registerShuffle:每个RDD在构建它的父依赖(这里特指ShuffleDependency)时,都会先注册到ShuffleManager,获取ShuffleHandler,用于后续数据块的读写等。

(2)getWriter:可以通过ShuffleHandler获取数据块写入器,写数据时通过Shuffle的块解析器shuffleBlockResolver,获取写入位置(通常将写入位置抽象为Bucket,位置的选择则由洗牌的规则,即Shuffle的分区器决定),然后将数据写入到相应位置(理论上,位置可以位于任何能存储数据的地方,包括磁盘、内存或其他存储框架等,目前在可插拔框架的几种实现中,Spark与Hadoop一样都采用磁盘的方式进行存储,主要目的是为了节约内存,同时提高容错性)。

(3)getReader:可以通过ShuffleHandler获取数据块读取器,然后通过Shuffle的块解析器shuffleBlockResolver,获取指定数据块。

(4)unregisterShuffle:与注册对应,用于删除元数据等后续清理操作。

(5)shuffleBlockResolver:Shuffle的块解析器,通过该解析器,为数据块的读写提供支撑层,便于抽象具体的实现细节。

7.2.3 Shuffle框架的源码解析

用户可以通过自定义ShuffleManager接口,并通过指定的配置属性进行设置,也可以通过该配置属性指定Spark已经支持的ShuffleManager具体实现子类。

在SparkEnv源码中可以看到设置的配置属性,以及当前在Spark的ShuffleManager可插拔框架中已经提供的ShuffleManager具体实现。Spark 2.0版本中支持sort、tungsten-sort两种方式。

Spark 2.1.1版本的SparkEnv.scala的源码如下。

1.   //用户可以通过短格式的命名来指定所使用的ShuffleManager
2.  val shortShuffleMgrNames = Map(
3.       "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager]
         .getName, "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.
          SortShuffleManager].getName)
4.
5.  //指定ShuffleManager的配置属性:"spark.shuffle.manager"
6.    //默认情况下使用"sort",即SortShuffleManager的实现
7.      val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
8.      val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.
        toLowerCase, shuffleMgrName)
9.      val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

Spark 2.2.0版本的SparkEnv.scala的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第8行调用toLowerCase小写转换方法,设置Locale.ROOT区域表示。root locale是一个区域设置,其语言、地区、变量都设置为空("")字符串。

1.   ......
2.      val shuffleMgrClass =
3.        shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase
          (Locale.ROOT), shuffleMgrName)
4.  .......

从代码中可以看出,ShuffleManager是Spark Shuffle系统提供的一个可插拔式接口,可以通过spark.shuffle.manager配置属性来设置自定义的ShuffleManager。

在Driver和每个Executor的SparkEnv实例化过程中,都会创建一个ShuffleManager,用于管理块数据,提供集群块数据的读写,包括数据的本地读写和读取远程节点的块数据。

Shuffle系统的框架可以以ShuffleManager为入口进行解析。在ShuffleManager中指定了整个Shuffle框架使用的各个组件,包括如何注册到ShuffleManager,以获取一个用于数据读写的处理句柄ShuffleHandle,通过ShuffleHandle获取特定的数据读写接口:ShuffleWriter与ShuffleReader,以及如何获取块数据信息的解析接口ShuffleBlockResolver。下面通过源码分别对这几个比较重要的组件进行解析。

1.ShuffleManager的源码解析

由于ShuffleManager是Spark Shuffle系统提供的一个可插拔式接口,提供具体实现子类或自定义具体实现子类时,都需要重写ShuffleManager类的抽象接口。下面首先分析ShuffleManager的源码。

ShuffleManager.scala的源码如下。

1.
2.  //Shuffle系统的可插拔接口。在Driver和每个Executor的SparkEnv实例中创建
3.  private[spark] trait ShuffleManager {
4.
5.    /**
6.      *在Driver端向ShuffleManager注册一个Shuffle,获取一个Handle
7.      *在具体Tasks中会通过该Handle来读写数据
8.      */
9.    def registerShuffle[K, V, C](
10.       shuffleId: Int,
11.       numMaps: Int,
12.       dependency: ShuffleDependency[K, V, C]): ShuffleHandle
13.
14.   /**
        *获取对应给定的分区使用的ShuffleWriter,该方法在Executors上执行各个Map
        *任务时调用
15.     */
16.   def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context:
      TaskContext): ShuffleWriter[K, V]
17.   /**
        * 获取在Reduce阶段读取分区的ShuffleReader,对应读取的分区由[startPartition
        * to endPartition-1]区间指定。该方法在Executors上执行,在各个Reduce任务时调用
        *
18      */
19.   def getReader[K, C](
20
21.       handle: ShuffleHandle,
22.       startPartition: Int,
23.       endPartition: Int,
24.       context: TaskContext): ShuffleReader[K, C]
25.
26.   /**
27.     *该接口和registerShuffle分别负责元数据的取消注册与注册
28.     *调用unregisterShuffle接口时,会移除ShuffleManager中对应的元数据信息
29.     */
30.   def unregisterShuffle(shuffleId: Int): Boolean
31.
32.   /**
        *返回一个可以基于块坐标来获取Shuffle 块数据的ShuffleBlockResolver
33.     */
34.   def shuffleBlockResolver: ShuffleBlockResolver
35.
36.   /**终止ShuffleManager */
37.   def stop(): Unit
38. }
2.ShuffleHandle的源码解析
1.  abstract class ShuffleHandle(val shuffleId: Int) extends Serializable {}

ShuffleHandle比较简单,用于记录Task与Shuffle相关的一些元数据,同时也可以作为不同具体Shuffle实现机制的一种标志信息,控制不同具体实现子类的选择等。

3.ShuffleWriter的源码解析

ShuffleWriter.scala的源码如下。

1.   private[spark] abstract class ShuffleWriter[K, V] {
2.    /** Write a sequence of records to this task's output */
3.    @throws[IOException]
4.    def write(records: Iterator[Product2[K, V]]): Unit
5.
6.    /** Close this writer, passing along whether the map completed */
7.    def stop(success: Boolean): Option[MapStatus]
8.  }

继承ShuffleWriter的每个具体子类会实现write接口,给出任务在输出时的写记录的具体方法。

4.ShuffleReader的源码解析

ShuffleReader.scala的源码如下。

1.  private[spark] trait ShuffleReader[K, C] {
2.   /** Read the combined key-values for this reduce task */
3.   def read(): Iterator[Product2[K, C]]

继承ShuffleReader的每个具体子类会实现read接口,计算时负责从上一阶段Stage的输出数据中读取记录。

5.ShuffleBlockResolver的源码解析

ShuffleBlockResolver的源码如下。

1.  /**
      *该特质的具体实现子类知道如何通过一个逻辑Shuffle块标识信息来获取一个块数据。具体
      *实现可以使用文件或文件段来封装Shuffle的数据。这是获取Shuffle块数据时使用的抽
      *象接口,在BlockStore中使用
2.     */
3.
4.
5.  trait ShuffleBlockResolver {
6.    type ShuffleId = Int
7.
8.    /**
       *获取指定块的数据。如果指定块的数据无法获取,则抛出异常
9.     */
10.   def getBlockData(blockId: ShuffleBlockId): ManagedBuffer
11.
12.   def stop(): Unit
13. }

继承ShuffleBlockResolver的每个具体子类会实现getBlockData接口,给出具体的获取块数据的方法。

目前在ShuffleBlockResolver的各个具体子类中,除给出获取数据的接口外,通常会提供如何解析块数据信息的接口,即提供了写数据块时的物理块与逻辑块之间映射关系的解析方法。

7.2.4 Shuffle数据读写的源码解析

1.Shuffle写数据的源码解析

从Spark Shuffle的整体框架中可以看到,ShuffleManager提供了Shuffle相关数据块的写入与读取,即对应的接口getWriter与getReader。

在解析Shuffle框架数据读取过程中,可以构建一个具有ShuffleDependency的RDD,查看执行过程中,Shuffle框架中的数据读写接口getWriter与getReader如何使用,通过这种具体案例的方式来加深对源码的理解。

Spark中Shuffle具体的执行机制可以参考本书的其他章节,在此仅分析与Shuffle直接相关的内容。通过DAG调度机制的解析,可以知道Spark中一个作业可以根据宽依赖切分Stages,而在Stages中,相应的Tasks也包含两种,即ResultTask与ShuffleMapTask。其中,一个ShuffleMapTask会基于ShuffleDependency中指定的分区器,将一个RDD的元素拆分到多个buckets中,此时通过ShuffleManager的getWriter接口获取数据与buckets的映射关系。而ResultTask对应的是一个将输出返回给应用程序Driver端的Task,在该Task执行过程中,最终都会调用RDD的compute对内部数据进行计算,而在带有ShuffleDependency的RDD中,在compute计算时,会通过ShuffleManager的getReader接口,获取上一个Stage的Shuffle输出结果作为本次Task的输入数据。

首先来看ShuffleMapTask中的写数据流程,具体代码如下所示。

ShuffleMapTask.scala的源码如下。

1.  override def runTask(context: TaskContext): MapStatus = {
2.     ......
3.  //首先从SparkEnv获取ShuffleManager
4.  //然后从ShuffleDependency中获取注册到ShuffleManager时得到的shuffleHandle
5.  //根据shuffleHandle和当前Task对应的分区ID,获取ShuffleWriter
6.  //最后根据获取的ShuffleWriter,调用其write接口,写入当前分区的数据
7.  var writer: ShuffleWriter[Any, Any] = null
8.      try {
9.        val manager = SparkEnv.get.shuffleManager
10.      writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId,
         context)
11.       writer.write(rdd.iterator(partition, context).asInstanceOf
          [Iterator[_ <: Product2[Any, Any]]])
12.       writer.stop(success = true).get
13.     } catch {
14.       ......
15.     }
16.   }
2.Shuffle读数据的源码解析

对应的数据读取器,从RDD的5个抽象接口可知,RDD的数据流最终会经过算子操作,即RDD中的compute方法。下面以包含宽依赖的RDD、CoGroupedRDD为例,查看如何获取Shuffle的数据。具体代码如下所示。

Spark 1.6.0版本的CoGroupedRDD.scala的源码如下。

1.  //对指定分区进行计算的抽象接口,以下为CoGroupedRDD具体子类中该方法的实现
2.  override def compute(s: Partition, context: TaskContext): Iterator[(K,
    Array[Iterable[_]])] = {
3.      val split = s.asInstanceOf[CoGroupPartition]
4.      val numRdds = dependencies.length
5.
6.      //A list of (rdditerator, dependency number) pairs
7.      val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)]
8.      for ((dep, depNum) <- dependencies.zipWithIndex) dep match {
9.        case    oneToOneDependency:       OneToOneDependency[Product2[K,     Any]]
          @unchecked =>
10.         val dependencyPartition = split.narrowDeps(depNum).get.split
11.         //Read them from the parent
12.         val it = oneToOneDependency.rdd.iterator(dependencyPartition,
            context)
13. rddIterators += ((it, depNum))
14.
15.  case shuffleDependency: ShuffleDependency[_, _, _] =>
16. //首先从SparkEnv获取ShuffleManager
17. //然后从ShuffleDependency中获取注册到ShuffleManager时得到的shuffleHandle
18. //根据shuffleHandle和当前Task对应的分区ID,获取ShuffleWriter
19. //最后根据获取的ShuffleReader,调用其read接口,读取Shuffle的Map输出
20.
21. val it = SparkEnv.get.shuffleManager
22.           .getReader(shuffleDependency.shuffleHandle, split.index,
              split.index + 1, context)
23.           .read()
24. rddIterators += ((it, depNum))
25.     }
26.
27.     val map = createExternalMap(numRdds)
28.     for ((it, depNum) <- rddIterators) {
29.       map.insertAll(it.map(pair => (pair._1, new CoGroupValue(pair._2,
          depNum))))
30.    }
31.    context.taskMetrics().incMemoryBytesSpilled(map.memoryBytesSpilled)
32.    context.taskMetrics().incDiskBytesSpilled(map.diskBytesSpilled)
33.    context.internalMetricsToAccumulators(
34.      InternalAccumulator.PEAK_EXECUTION_MEMORY).add
         (map.peakMemoryUsedBytes)
35.    new InterruptibleIterator(context,
36.      map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
37.  }

Spark 2.2.0版本的CoGroupedRDD.scala的源码与Spark 1.6.0版本相比具有如下特点:上段代码中第28~29行的context.internalMetricsToAccumulators方法调整为context.taskMetrics方法,用于任务的度量监控,监控内存的峰值使用情况。

1.  ......
2.  context.taskMetrics().incPeakExecutionMemory(map.peakMemoryUsedBytes)
3.  .......

从代码中可以看到,带宽依赖的RDD的compute操作中,最终是通过SparkEnv中的ShuffleManager实例的getReader方法,获取数据读取器的,然后再次调用读取器的read读取指定分区范围的Shuffle数据。注意,是带宽依赖的RDD,而非ShuffleRDD,除了ShuffleRDD外,还有其他RDD也可以带上宽依赖的,如前面给出的CoGroupedRDD。

目前支持的几种具体Shuffle实现机制在读取数据的处理上都是一样的。从源码角度可以看到,当前继承了ShuffleReader这一数据读取器的接口的具体子类只有BlockStoreShuffleReader,因此,本章内容仅在此对各种Shuffle实现机制的数据读取进行解析,后续各实现机制中不再重复描述。

源码解析的第一步仍然是查看该类的描述信息,具体如下所示。

1.  /**
2.   *通过从其他节点上请求读取   Shuffle  数据来接收并读取指定范围[起始分区, 结束分区)
     *——对应为左闭右开区间
3.   *通过从其他节点上请求读取Shuffle数据来接收并读取指定范围[起始分区,结束分区]
4.   *——对应为左闭右开区间
5.   */

从注释上可以看出,读取器负责上一Stage为下一Stage输出数据块的读取。从前面对ShuffleReader接口的解析可知,继承的具体子类需要实现真正的数据读取操作,即实现read方法。因此,该方法便是需要重点关注的源码。一些关键的代码如下所示。

Spark 2.1.1版本的BlockStoreShuffleReader.scala的源码如下。

1.   //为该Reduce任务读取并合并key-values 值
2.  override def read(): Iterator[Product2[K, C]] = {
3.   //真正的数据Iterator读取是通过ShuffleBlockFetcherIterator来完成的
4.      val blockFetcherItr = new ShuffleBlockFetcherIterator(
5.        context,
6.        blockManager.shuffleClient,
7.        blockManager,
8.        //可以看到,当ShuffleMapTask完成后注册到mapOutputTracker的元数据信息
          //同样会通过mapOutputTracker来获取,在此同时还指定了获取的分区范围
          //通过该方法的返回值类型
9.
10.
11.
12.       mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId,
          startPartition, endPartition),
13.      //默认读取时的数据大小限制为48m,对应后续并行的读取,都是一种数据读取的控制策
         //略,一方面可以避免目标机器占用过多带宽,同时也可以启动并行机制,加快读取速度
14.
15.
16.
17.       SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight",
          "48m") * 1024 * 1024,
18.       SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight",
          Int.MaxValue))
19.
20.     //在此针对前面获取的各个数据块唯一标识ID信息及其对应的输入流进行处理
21.     val wrappedStreams = blockFetcherItr.map { case (blockId, inputStream) =>
22.       serializerManager.wrapStream(blockId, inputStream)
23.     }
24.
25.     val serializerInstance = dep.serializer.newInstance()
26.
27.     //为每个流stream创建一个键/值迭代器
28.     val recordIter = wrappedStreams.flatMap { wrappedStream =>
29.       //注意:askey Value Iterator在内部迭代器Next Iterator中包裹一个键/值对,
          //当Input Stream中的数据已读取,Next Iterator确保Close()方法被调用
30.
31.
32.       serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator
33.     }
34.
35.     //为每个记录更新上下文任务度量
36.     val readMetrics = context.taskMetrics.createTempShuffleReadMetrics()
37.     val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]](
38.       recordIter.map { record =>
39.         readMetrics.incRecordsRead(1)
40.         record
41.       },
42.       context.taskMetrics().mergeShuffleReadMetrics())
43.
44.     //为了支持任务取消,这里必须使用可中断迭代器
45.    val interruptibleIter = new InterruptibleIterator[(Any, Any)](context,
       metricIter)
46.  //对读取到的数据进行聚合处理
47.     val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator
        .isDefined) {
48.      //如果在Map端已经做了聚合的优化操作,则对读取到的聚合结果进行聚合,注意此时的
         //聚合操作与数据类型和Map端未做优化时是不同的
49.
50.
51.
52.   if (dep.mapSideCombine) {
53.         //对读取到的数据进行聚合处理
54.         val combinedKeyValuesIterator = interruptibleIter.asInstanceOf
            [Iterator[(K, C)]]
55.
56.      //Map端各分区针对Key进行合并后的结果再次聚合,Map的合并可以大大减少网络传输
         //的数据量
57.
58.         dep.aggregator.get.combineCombinersByKey
            (combinedKeyValuesIterator, context)
59.       } else {
60.         //我们无需关心值的类型,但应确保聚合是兼容的,其将把值的类型转化成聚合以后的
            //C类型
61.
62.
63.         val keyValuesIterator = interruptibleIter.asInstanceOf[Iterator
            [(K, Nothing)]]
64.         dep.aggregator.get.combineValuesByKey(keyValuesIterator, context)
65.       }
66.     } else {
67.       require(!dep.mapSideCombine, "Map-side combine without Aggregator
          specified!")
68.       interruptibleIter.asInstanceOf[Iterator[Product2[K, C]]]
69.     }
70.      //在基于Sort的Shuffle实现过程中,默认基于PartitionId进行排序,在分区的内
         //部,数据是没有排序的,因此添加了keyOrdering变量,提供是否需要针对分区内的
         //数据进行排序的标识信息
71.     //如果定义了排序,则对输出结果进行排序
72.     dep.keyOrdering match {
73.       case Some(keyOrd: Ordering[K]) =>
74.
75.           //为了减少内存的压力,避免GC开销,引入了外部排序器对数据进行排序当内存不足
              //以容纳排序的数据量时,会根据配置的spark.shuffle.spill属性来决定是否需要
              //spill到磁盘中,默认情况下会打开spill开关,若不打开spill开关,数据量比
              //较大时会引发内存溢出问题(Out of Memory,OOM)
76.         val sorter =
77.           new ExternalSorter[K, C, C](context, ordering = Some(keyOrd),
              serializer = dep.serializer)
78.         sorter.insertAll(aggregatedIter)
79.         context.taskMetrics().incMemoryBytesSpilled(sorter.
            memoryBytesSpilled)
80.         context.taskMetrics().incDiskBytesSpilled(sorter.
            diskBytesSpilled)
81.         context.taskMetrics().incPeakExecutionMemory
            (sorter.peakMemoryUsedBytes)
82.         CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]]
            (sorter.iterator, sorter.stop())
83.       case None =>
84.      //不需要排序分区内部数据时直接返回
85.         aggregatedIter
86.     }
87.   }
88. }

Spark 2.2.0版本的BlockStoreShuffleReader.scala的源码与Spark 2.1.1版本相比具有如下特点。

 上段代码中第4行blockFetcherItr名称更改为wrappedStreams。

 上段代码中第17行之前新增代码serializerManager.wrapStream。

 上段代码中第18行之后新增配置参数REDUCER_MAX_REQ_SIZE_SHUFFLE_ TO_MEM:shuffle时可请求内存的最大大小(以字节为单位)。

 上段代码中第18行之后新增配置参数spark.shuffle.detectCorrupt:检测获取块blocks中是否有任何损坏。

 上段代码中第21~23行删除。

 上段代码中第28行wrappedStream调整为case (blockId, wrappedStream)。

1.   .......
2.      val wrappedStreams = new ShuffleBlockFetcherIterator(
3.       ......
4.        serializerManager.wrapStream,
5.      ......
          SparkEnv.get.conf.get(config.REDUCER_MAX_REQ_SIZE_SHUFFLE_TO_MEM),
6.        SparkEnv.get.conf.getBoolean("spark.shuffle.detectCorrupt", true))
7.  ......
8.     val recordIter = wrappedStreams.flatMap { case (blockId, wrappedStream)
       =>
9.  .......

下面进一步解析数据读取的部分细节。首先是数据块获取、读取的ShuffleBlock-FetcherIterator类,在类的构造体中调用了initialize方法(构造体中的表达式会在构造实例时执行),该方法中会根据数据块所在位置(本地节点或远程节点)分别进行读取,其中关键代码如下所示。

ShuffleBlockFetcherIterator的源码如下。

1.     private[this] def initialize(): Unit = {
2.     //任务完成进行回调清理(在成功案例和失败案例中调用)
3.     context.addTaskCompletionListener(_ => cleanup())
4.     //本地与远程的数据读取方式不同,因此先进行拆分,注意拆分时会考虑一次获取的数据
       //大小(拆分时会同时考虑并行数)封装请求,最后会将剩余不足该大小的数据获取也封装
       //为一个请求
5.
6.
7.
8.     val remoteRequests = splitLocalRemoteBlocks()
9.     //存入需要远程读取的数据块请求信息
10.    fetchRequests ++= Utils.randomize(remoteRequests)
11.    assert ((0 == reqsInFlight) == (0 == bytesInFlight),
12.      "expected reqsInFlight = 0 but found reqsInFlight = " + reqsInFlight +
13.      ", expected bytesInFlight = 0 but found bytesInFlight = " +
         bytesInFlight)
14.
15.    //发送数据获取请求
16.    fetchUpToMaxBytes()
17.
18.    val numFetches = remoteRequests.size - fetchRequests.size
19.    logInfo("Started " + numFetches + " remote fetches in" +
       Utils.getUsedTimeMs(startTime))
20.
21.     //除了远程数据获取外,下面是获取本地数据块的方法调用
22.    fetchLocalBlocks()
23.    logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime))
24.  }

与Hadoop一样,Spark计算框架也基于数据本地性,即移动数据而非移动计算的原则,因此在获取数据块时,也会考虑数据本地性,尽量从本地读取已有的数据块,然后再远程读取。

另外,数据块的本地性是通过ShuffleBlockFetcherIterator实例构建时所传入的位置信息来判断的,而该信息由MapOutputTracker实例的getMapSizesByExecutorId方法提供,可以参考该方法的返回值类型查看相关的位置信息,返回值类型为:Seq[(BlockManagerId, Seq[(BlockId, Long)])]。其中,BlockManagerId是BlockManager的唯一标识信息,BlockId是数据块的唯一信息,对应的Seq[(BlockId, Long)]表示一组数据块标识ID及其数据块大小的元组信息。

最后简单分析一下如何设置分区内部的排序标识,当需要对分区内的数据进行排序时,会设置RDD中的宽依赖(ShuffleDependency)实例的keyOrdering变量。下面以基于排序的OrderedRDDFunctions提供的sortByKey方法给出解析,具体代码如下所示。

OrderedRDDFunctions的源码如下。

1.    def sortByKey(ascending: Boolean = true, numPartitions: Int =
      self.partitions.length)
2.        : RDD[(K, V)] = self.withScope
3.    {
4.     //注意,这里设置了该方法构建的RDD使用的分区器
       //根据Range而非Hash进行分区,对应的Range信息需要计算并将结果
       //反馈到Driver端,因此对应调用RDD中的Action,即会触发一个Job的执行
5.  val part = new RangePartitioner(numPartitions, self, ascending)
6.      //在构建RDD实例后,设置Key的排序算法,即Ordering实例
7.      new ShuffledRDD[K, V, V](self, part)
8.        .setKeyOrdering(if (ascending) ordering else ordering.reverse)
9.    }

当需要对分区内部的数据进行排序时,构建RDD的同时会设置Key值的排序算法,结合前面的read代码,当指定Key值的排序算法时,就会使用外部排序器对分区内的数据进行排序。