3.9 基于DataSet的代码到底是如何一步步转化成为RDD的

基于DataSet的代码转换为RDD之前需要一个Action的操作,基于Spark中的新解析引擎Catalyst进行优化,Spark中的Catalyst不仅限于SQL的优化,Spark的五大子框架(Spark Cores、Spark SQL、Spark Streaming、Spark GraphX、Spark Mlib)将来都会基于Catalyst基础之上。

Dataset.scala的collect方法的源码如下。

Spark 2.1.1版本的Dataset.scala的源码如下。

1.  def collect(): Array[T] = collect(needCallback = true)

Spark 2.2.0版本Dataset.scala的源码与Spark 2.1.1版本相比具有如下特点:将Dataset的action包裹起来,这样可跟踪QueryExecution和时间成本,然后汇报给用户注册的回调函数。

1.  def collect(): Array[T] = withAction("collect", queryExecution)
    (collectFromPlan)

进入collect(needCallback:true)的方法如下。

Spark 2.1.1版本的Dataset.scala的源码如下。

1.   private def collect(needCallback: Boolean): Array[T] = {
2.     def execute(): Array[T] = withNewExecutionId {
3.       queryExecution.executedPlan.executeCollect().map(boundEnc.fromRow)
4.     }
5.
6.     if (needCallback) {
7.       withCallback("collect", toDF())(_ => execute())
8.     } else {
9.       execute()
10.    }
11.  }

Spark 2.2.0版本的Dataset.scala的源码与Spark 2.1.1版本相比具有如下特点。

 调用关键的代码SQLExecution.withNewExecutionId(sparkSession,qe){action(qe. executedPlan)}得到计算结果。

 action(qe.executedPlan)是collect方法中传入的函数collectFromPlan,在函数collectFromPlan中传入参数qe.executedPlan。

1.  private def withAction[U](name: String, qe: QueryExecution)(action:
    SparkPlan => U) = {
2.     try {
3.       qe.executedPlan.foreach { plan =>
4.         plan.resetMetrics()
5.       }
6.       val start = System.nanoTime()
7.       val result = SQLExecution.withNewExecutionId(sparkSession, qe) {
8.         action(qe.executedPlan)
9.       }
10.      val end = System.nanoTime()
11.      sparkSession.listenerManager.onSuccess(name, qe, end - start)
12.      result
13.    } catch {
14.      case e: Exception =>
15.        sparkSession.listenerManager.onFailure(name, qe, e)
16.        throw e
17.    }
18.  }

Spark 2.2.0版本的Dataset.scala的源码从spark plan中获取所有的数据。

1.    private def collectFromPlan(plan: SparkPlan): Array[T] = {
2.    plan.executeCollect().map(boundEnc.fromRow)
3.  }

collect方法中关键的一行代码是queryExecution.executedPlan.executeCollect().map (boundEnc.fromRow),我们看一下executedPlan。executedPlan不用来初始化任何SparkPlan,仅用于执行。

QueryExecution.scala的源码如下。

1.  class    QueryExecution(val      sparkSession:     SparkSession,     val  logical:
    LogicalPlan) {
2.  ......
3.  //executePlan不应该被用来初始化任何Spark Plan,executePlan只用于执行
4.    lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
5.  ......
6.       lazy val toRdd: RDD[InternalRow] = executedPlan.execute()
7.  ......

queryExecution.executedPlan.executeCollect()代码中的executeCollect方法运行此查询,将结果作为数组返回。executeCollect方法调用了byteArrayRdd.collect()方法。

SparkPlan .scala的executeCollect的源码如下。

1.  def executeCollect(): Array[InternalRow] = {
2.    val byteArrayRdd = getByteArrayRdd()
3.
4.    val results = ArrayBuffer[InternalRow]()
5.    byteArrayRdd.collect().foreach { bytes =>
6.      decodeUnsafeRows(bytes).foreach(results.+=)
7.    }
8.    results.toArray
9.  }

byteArrayRdd.collect()方法调用RDD.scala的collect方法。collect方法最终通过sc.runJob提交Spark集群运行。

RDD.scala的collect方法的源码如下。

1.   def collect(): Array[T] = withScope {
2.    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
3.    Array.concat(results: _*)
4.  }

回到QueryExecution.scala中,executedPlan.execute()是关键性的代码。

1.  lazy val toRdd: RDD[InternalRow] = executedPlan.execute()

进入SparkPlan.scala的execute返回的查询结果类型为RDD[InternalRow]。调用doExecute执行,SparkPlan应重写doExecute进行具体实现。在execute方法中就生成了RDD[InternalRow]。execute方法的源码如下。

1.  final def execute(): RDD[InternalRow] = executeQuery {
2.    doExecute()
3.  }

SparkPlan.scala的doExecute()抽象方法没有具体实现,通过SparkPlan重写具体实现。产生的查询结果作为RDD[InternalRow]。

1.  protected def doExecute(): RDD[InternalRow]

InternalRow是通过语法树生成的一些数据结构。其子类包括BaseGenericInternalRow、JoinedRow、Row、UnsafeRow。

InternalRow.scala的源码如下。

1.  abstract class InternalRow extends SpecializedGetters with Serializable {
2.  ......
3.    def setBoolean(i: Int, value: Boolean): Unit = update(i, value)
4.    def setByte(i: Int, value: Byte): Unit = update(i, value)
5.    def setShort(i: Int, value: Short): Unit = update(i, value)
6.    def setInt(i: Int, value: Int): Unit = update(i, value)
7.    def setLong(i: Int, value: Long): Unit = update(i, value)
8.    def setFloat(i: Int, value: Float): Unit = update(i, value)
9.    def setDouble(i: Int, value: Double): Unit = update(i, value)
10. ........

DataSet的代码转化成为RDD的内部流程如下。

Parse SQL(DataSet)→Analyze Logical Plan→Optimize Logical Plan→Generate Physical Plan→Prepareed Spark Plan→Execute SQL→Generate RDD

基于DataSet的代码一步步转化成为RDD:最终调用execute()生成RDD。