3.8 通过WordCount实战解析Spark RDD内部机制

本节通过Spark WordCount动手实践,编写单词计数代码;在wordcount.scala的基础上,从数据流动的视角深入分析Spark RDD的数据处理过程。

3.8.1 Spark WordCount动手实践

本节进行Spark WordCount动手实践。首先建立一个文本文件helloSpark.txt,将文本文件放到文件目录data/wordcount/中。helloSpark.txt的文本内容如下。

1.  Hello Spark Hello Scala
2.  Hello Hadoop
3.  Hello Flink
4.  Spark is Awesome

在IDEA中编写wordcount.scala的代码如下。

1.   package com.dt.spark.sparksql
2.  import org.apache.spark.SparkConf
3.  import org.apache.spark.SparkContext
4.  import org.apache.spark.rdd.RDD
5.  /**
6.    * 使用Scala开发本地测试的Spark WordCount程序
7.    * @author DT大数据梦工厂
8.    * 新浪微博:http://weibo.com/ilovepains/
9.    */
10. object WordCount {
11.   def main(args: Array[String]){
12.     /**
13.       * 第1步:创建Spark的配置对象SparkConf,设置Spark程序运行时的配置信息,
          * 例如,通过setMaster设置程序要链接的Spark集群的Master的URL,如果设置
          * 为local,则代表Spark程序在本地运行,特别适合于机器配置非常差(如只有1GB
          * 的内存)的初学者
14.       */
15.
16.     val conf = new SparkConf() //创建SparkConf对象
17.     conf.setAppName("Wow,My First Spark App!")
                                 //设置应用程序的名称,在程序运行的监控界面可以看到名称
18.     conf.setMaster("local") //此时程序在本地运行,不需要安装Spark集群
19.
20.     /**
21.       * 第2步:创建SparkContext对象
22.       * SparkContext是Spark程序所有功能的唯一入口,采用Scala、Java、Python、
          * R等都必须有一个SparkContext
23.       * SparkContext    核心作用:初始化 Spark   应用程序,运行所需要的核心组件,包括
          * DAGScheduler、TaskScheduler、SchedulerBackend,同时还会负责Spark程
          * 序往Master注册程序等,SparkContext是整个Spark应用程序中至关重要的一个对象
24.       */
25.     val sc = new SparkContext(conf)
                                 //创建SparkContext对象,通过传入SparkConf实例来定
                                 //制Spark运行的具体参数和配置信息
26.
27.     /**
28.       * 第 3  步:根据具体的数据来源(如  HDFS、HBase、Local FS、DB、S3      等)通过
          * SparkContext来创建RDD
29.       * RDD的创建有3种方式:根据外部的数据来源(如HDFS)、根据Scala集合、由其他
          * 的RDD操作
30.       * 数据会被RDD划分成为一系列的Partitions,分配到每个Partition的数据属于
          * 一个Task的处理范畴
31.       */
32.
33.     val lines = sc.textFile("data/wordcount/helloSpark.txt", 1)
                                               //读取本地文件并设置为一个Partition
34.
35.     /**
36.       * 第4步:对初始的RDD进行Transformation级别的处理,如通过map、filter等
          * 高阶函数等的编程,进行具体的数据计算
37.       *  第4.1步:将每一行的字符串拆分成单个单词
38.       */
39.     val words = lines.flatMap { line => line.split(" ")}
                                          //对每一行的字符串进行单词拆分,并把所有行的拆
                                          //分结果通过flat合并成为一个大的单词集合
40.     /**
41.       * 第4步:对初始的RDD进行Transformation级别的处理,如通过map、filter等
          * 高阶函数等的编程,进行具体的数据计算
42.       *  第4.2步:在单词拆分的基础上对每个单词实例计数为1,也就是word => (word, 1)
43.       */
44.     val pairs = words.map { word => (word, 1) }
45.
46.     /**
47.       * 第4步:对初始的RDD进行Transformation级别的处理,如通过map、filter等
          * 高阶函数等的编程,进行具体的数据计算
48.       *  第4.3步:在每个单词实例计数为1基础之上统计每个单词在文件中出现的总次数
49.       */
50.     val wordCountsOdered = pairs.reduceByKey(_+_).map(pair => (pair._2,
        pair._1)).sortByKey(false).map(pair => (pair._2, pair._1))
        //对相同的Key,进行Value的累计(包括Local和Reducer级别,同时Reduce)
51.     wordCountsOdered.collect.foreach(wordNumberPair => println
        (wordNumberPair._1 + " : " + wordNumberPair._2))
52.     sc.stop()
53.
54.   }
55. }

在IDEA中运行程序,wordcount.scala的运行结果如下:

1.   ......
2.   17/05/21 21:19:07 INFO DAGScheduler: Job 0 finished: collect at
     WordCount.scala:60, took 0.957991 s
3.  Hello : 4
4.  Spark : 2
5.  Awesome : 1
6.  Flink : 1
7.  is : 1
8.  Scala : 1
9.  Hadoop : 1
10. ......

3.8.2 解析RDD生成的内部机制

下面详细解析一下wordcount.scala的运行原理。

(1)从数据流动视角解密WordCount,使用Spark作单词计数统计,搞清楚数据到底是怎么流动的。

(2)从RDD依赖关系的视角解密WordCount。Spark中的一切操作都是RDD,后面的RDD对前面的RDD有依赖关系。

(3)DAG与血统Lineage的思考。

在wordcount.scala的基础上,我们从数据流动的视角分析数据到底是怎么处理的。我们绘制一张WordCount数据处理过程图,由于图片较大,为了方便阅读,将原图分成两张图,如图3-11和图3-12所示。

图3-11 WordCount图1

图3-12 WordCount图2

数据在生产环境中默认在HDFS中进行分布式存储,如果在分布式集群中,我们的机器会分成不同的节点对数据进行处理,这里我们在本地测试,重点关注数据是怎么流动的。处理的第一步是获取数据,读取数据会生成HadoopRDD。

在WordCount.scala中,单击sc.textFile进入Spakr框架,SparkContext.scala的textFile的源码如下。

1.  def textFile(
2.      path: String,
3.      minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
4.    assertNotStopped()
5.    hadoopFile(path, classOf[TextInputFormat],            classOf[LongWritable],
      classOf[Text],
6.      minPartitions).map(pair => pair._2.toString).setName(path)
7.  }

下面看一下hadoopFile的源码,通过new()函数创建一个HadoopRDD,HadoopRDD从Hdfs上读取分布式数据,并且以数据分片的方式存在于集群中。所谓的数据分片,就是把我们要处理的数据分成不同的部分,例如,在集群中有4个节点,粗略的划分可以认为将数据分成4个部分,4条语句就分成4个部分。例如,Hello Spark在第一台机器上,Hello Hadoop在第二台机器上,Hello Flink在第三台机器上,Spark is Awesome在第四台机器上。HadoopRDD帮助我们从磁盘上读取数据,计算的时候会分布式地放入内存中,Spark运行在Hadoop上,要借助Hadoop来读取数据。

Spark的特点包括:分布式、基于内存(部分基于磁盘)、可迭代;默认分片策略Block多大,分片就多大。但这种说法不完全准确,因为分片记录可能跨两个Block,所以一个分片不会严格地等于Block的大小。例如,HDFS的Block大小是128MB的话,分片可能多几个字节或少几个字节。分片不一定小于128MB,因为如果最后一条记录跨两个Block,分片会把最后一条记录放在前一个分片中。这里,HadoopRDD用了4个数据分片,设想为128M左右。

hadoopFile的源码如下。

1.   def hadoopFile[K, V](
2.        path: String,
3.        inputFormatClass: Class[_ <: InputFormat[K, V]],
4.        keyClass: Class[K],
5.        valueClass: Class[V],
6.        minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
7.      assertNotStopped()
8.
9.      //加载hdfs-site.xml配置文件
10.     //详情参阅Spark-11227
11.     FileSystem.getLocal(hadoopConfiguration)
12.
13.     //Hadoop配置文件大约有10 KB,相当大,所以进行广播
14.     val confBroadcast = broadcast(new SerializableConfiguration
        (hadoopConfiguration))
15.     val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.
        setInputPaths(jobConf, path)
16.     new HadoopRDD(
17.       this,
18.       confBroadcast,
19.       Some(setInputPathsFunc),
20.       inputFormatClass,
21.       keyClass,
22.       valueClass,
23.       minPartitions).setName(path)
24.   }

SparkContext.scala的textFile源码中,调用hadoopFile方法后进行了map转换操作,map对读取的每一行数据进行转换,读入的数据是一个Tuple,Key值为索引,Value值为每行数据的内容,生成MapPartitionsRDD。这里,map(pair => pair._2.toString)是基于HadoopRDD产生的Partition去掉的行Key产生的Value,第二个元素是读取的每行数据内容。MapPartitionsRDD是Spark框架产生的,运行中可能产生一个RDD,也可能产生两个RDD。例如,textFile中Spark框架就产生了两个RDD,即HadoopRDD和MapPartitionsRDD。下面是map的源码。

1.  def map[U: ClassTag](f: T => U): RDD[U] = withScope {
2.    val cleanF = sc.clean(f)
3.    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map
      (cleanF))
4.  }

我们看一下WordCount业务代码,对读取的每行数据进行flatMap转换。这里,flatMap对RDD中的每一个Partition的每一行数据内容进行单词切分,如有4个Partition分别进行单词切分,将“Hello Spark”切分成单词“Hello”和“Spark”,对每一个Partition中的每一行进行单词切分并合并成一个大的单词实例的集合。flatMap转换生成的仍然是MapPartitionsRDD:

RDD.scala的flatMap的源码如下。

1.  def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
2.    val cleanF = sc.clean(f)
3.    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap
      (cleanF))
4.  }

继续WordCount业务代码,words.map { word => (word, 1) }通过map转换将单词切分以后单词计数为1。例如,将单词“Hello”和“Spark”变成(Hello,1),(Spark,1)。这里生成了MapPartitionsRDD。

RDD.scala的map的源码如下。

1.  def map[U: ClassTag](f: T => U): RDD[U] = withScope {
2.    val cleanF = sc.clean(f)
3.    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map
      (cleanF))
4.  }

继续WordCount业务代码,计数之后进行一个关键的reduceByKey操作,对全局的数据进行计数统计。reduceByKey对相同的Key进行Value的累计(包括Local和Reducer级别,同时Reduce)。reduceByKey在MapPartitionsRDD之后,在Local reduce级别本地进行了统计,这里也是MapPartitionsRDD。例如,在本地将(Hello,1),(Spark,1),(Hello,1),(Scala,1)汇聚成(Hello,2),(Spark,1),(Scala,1)。Shuffle之前的Local Reduce操作主要负责本地局部统计,并且把统计以后的结果按照分区策略放到不同的file。举一个简单的例子,如果下一个阶段Stage是3个并行度,每个Partition进行local reduce以后,将自己的数据分成3种类型,最简单的方式是根据HashCode按3取模。

PairRDDFunctions.scala的reduceByKey的源码如下。

1.  def  reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
2.    reduceByKey(defaultPartitioner(self), func)
3.  }

至此,前面所有的操作都是一个Stage,一个Stage意味着什么:完全基于内存操作。父Stage:Stage内部的操作是基于内存迭代的,也可以进行Cache,这样速度快很多。不同于Hadoop的Map Redcue,Hadoop Map Redcue每次都要经过磁盘。

reduceByKey在Local reduce本地汇聚以后生成的MapPartitionsRDD仍属于父Stage;然后reduceByKey展开真正的Shuffle操作,Shuffle是Spark甚至整个分布式系统的性能瓶颈,Shuffle产生ShuffleRDD,ShuffledRDD就变成另一个Stage,为什么是变成另外一个Stage?因为要网络传输,网络传输不能在内存中进行迭代。

从WordCount业务代码pairs.reduceByKey(_+_)中看一下PairRDDFunctions.scala的reduceByKey的源码。

1.   def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K,
     V)] = self.withScope {
2.    combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
3.  }

reduceByKey内部调用了combineByKeyWithClassTag方法。下面看一下PairRDDFunctions. scala的combineByKeyWithClassTag的源码。

1.   def combineByKeyWithClassTag[C](
2.        createCombiner: V => C,
3.        mergeValue: (C, V) => C,
4.        mergeCombiners: (C, C) => C,
5.        partitioner: Partitioner,
6.        mapSideCombine: Boolean = true,
7.        serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K,
          C)] = self.withScope {
8.      require(mergeCombiners != null, "mergeCombiners must be defined")
        //required as of Spark 0.9.0
9.      if (keyClass.isArray) {
10.       if (mapSideCombine) {
11.         throw new SparkException("Cannot use map-side combining with array
            keys.")
12.       }
13.       if (partitioner.isInstanceOf[HashPartitioner]) {
14.         throw new SparkException("HashPartitioner cannot partition array
            keys.")
15.       }
16.     }
17.     val aggregator = new Aggregator[K, V, C](
18.       self.context.clean(createCombiner),
19.       self.context.clean(mergeValue),
20.       self.context.clean(mergeCombiners))
21.     if (self.partitioner == Some(partitioner)) {
22.       self.mapPartitions(iter => {
23.         val context = TaskContext.get()
24.         new InterruptibleIterator(context, aggregator.combineValuesByKey
            (iter, context))
25.       }, preservesPartitioning = true)
26.     } else {
27.       new ShuffledRDD[K, V, C](self, partitioner)
28.         .setSerializer(serializer)
29.         .setAggregator(aggregator)
30.         .setMapSideCombine(mapSideCombine)
31.     }
32.   }

在combineByKeyWithClassTag方法中就用new()函数创建了ShuffledRDD。

前面假设有4台机器并行计算,每台机器在自己的内存中进行迭代计算,现在产生Shuffle,数据就要进行分类,MapPartitionsRDD数据根据Hash已经分好类,我们就抓取MapPartitionsRDD中的数据。我们从第一台机器中获取的内容为(Hello,2),从第二台机器中获取的内容为(Hello,1),从第三台机器中获取的内容为(Hello,1),把所有的Hello都抓过来。同样,我们把其他的数据(Hadoop,1),(Flink,1)……都抓过来。

这就是Shuffle的过程,根据数据的分类拿到自己需要的数据。注意,MapPartitionsRDD属于第一个Stage,是父Stage,内部基于内存进行迭代,不需要操作都要读写磁盘,所以速度非常快;从计算算子的角度讲,reduceByKey发生在哪里?reduceByKey发生的计算过程包括两个RDD:一个是MapPartitionsRDD;一个是ShuffledRDD。ShuffledRDD要产生网络通信。

reduceByKey之后,我们将结果收集起来,进行全局级别的reduce,产生reduceByKey的最后结果,如将(Hello,2),(Hello,1),(Hello,1)在内部变成(Hello,4),其他数据也类似统计。这里reduceByKey之后,如果通过Collect将数据收集起来,就会产生MapPartitionsRDD。从Collect的角度讲,MapPartitionsRDD的作用是将结果收集起来发送给Driver;从saveAsTextFile输出到Hdfs的角度讲,例如输出(Hello,4),其中Hello是key,4是Value吗?不是!这里(Hello,4)就是value,这就需要设计一个key出来。

下面是RDD.scala的saveAsTextFile方法。

1.    def saveAsTextFile(path: String): Unit = withScope {
2.     //https://issues.apache.org/jira/browse/SPARK-2075
3.     //NullWritable在Hadoop 1.+版本中是Comparable,所以编译器无法发现隐式排
       //序,将使用默认的‘空’。然而,在Hadoop 2.+中是Comparable[NullWritable],
       //编译器将调用隐式的“排序”方法来创建一个排序的NullWritable。这就是为什么对
       //于Hadoop 1.+版本和Hadoop 2.+版本的saveAsTextFile,编译器会生成不同的匿
       //名类。因此,这里提供了一个显式排序的“null”来确保编译器为saveAsTextFile生
       //成相同的字节码
4.
5.
6.     val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
7.     val textClassTag = implicitly[ClassTag[Text]]
8.     val r = this.mapPartitions { iter =>
9.       val text = new Text()
10.      iter.map { x =>
11.        text.set(x.toString)
12.        (NullWritable.get(), text)
13.      }
14.    }
15.    RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
16.      .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
17.  }

RDD.scala的saveAsTextFile方法中的iter.map {x=>text.set(x.toString) (NullWritable.get(), text)},这里,key转换成Null,value就是内容本身(Hello,4)。saveAsHadoopFile中的TextOutputFormat要求输出的是key-value的格式,而我们处理的是内容。回顾一下,之前我们在textFile读入数据的时候,读入split分片将key去掉了,计算的是value。因此,输出时,须将丢失的key重新弄进来,这里key对我们没有意义,但key对Spark框架有意义,只有value对我们有意义。第一次计算的时候我们把key丢弃了,所以最后往HDFS写结果的时候需要生成key,这符合对称法则和能量守恒形式。

总结:

第一个Stage有哪些RDD?HadoopRDD、MapPartitionsRDD、MapPartitionsRDD、MapPartitionsRDD、MapPartitionsRDD。

第二个Stage有哪些RDD?ShuffledRDD、MapPartitionsRDD。