多维分析

cube

测试代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
    val jsonData = Seq(
      """{"gradeID": 1, "classID": 1, "studentID": "2017101", "score": 80}""",
      """{"gradeID": 1, "classID": 1, "studentID": "2017104", "score": 91}""",
      """{"gradeID": 1, "classID": 2, "studentID": "2017122", "score": 75}""",
      """{"gradeID": 1, "classID": 2, "studentID": "2017811", "score": 67}""",
      """{"gradeID": 2, "classID": 2, "studentID": "2017066", "score": 81}""",
      """{"gradeID": 2, "classID": 1, "studentID": "2017057", "score": 99}""",
      """{"gradeID": 1, "classID": 1, "studentID": "2017089", "score": 85}""",
      """{"gradeID": 1, "classID": 2, "studentID": "2017044", "score": 76}""",
      """{"gradeID": 2, "classID": 1, "studentID": "2017037", "score": 69}"""
    )
    val df: DataFrame = spark.read.json(spark.sparkContext.parallelize(jsonData))
    df.createTempView("exam")
    val df2 = spark.sql(
      """
        | SELECT gradeID, classID, max(score)  FROM  exam
        | GROUP BY gradeID, classID WITH CUBE
        |""".stripMargin)
    df2.show()
    df2.explain("extended")

结果:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
+-------+-------+----------+
|gradeID|classID|max(score)|
+-------+-------+----------+
|      1|      1|        91|
|      1|   NULL|        91|
|   NULL|      1|        99|
|   NULL|   NULL|        99|
|      1|      2|        76|
|   NULL|      2|        81|
|      2|      2|        81|
|      2|   NULL|        99|
|      2|      1|        99|
+-------+-------+----------+

with cube查询

1
2
SELECT gradeID, classID, max(score)  FROM  exam
GROUP BY gradeID, classID WITH CUBE

等价于

1
2
3
4
5
6
7
SELECT gradeID, classID, max(score)  FROM  exam  GROUP BY gradeID, classID
UNION
SELECT gradeID, classID, max(score)  FROM  exam  GROUP BY gradeID, null
UNION
SELECT gradeID, classID, max(score)  FROM  exam  GROUP BY null, classID
UNION
SELECT gradeID, classID, max(score)  FROM  exam  GROUP BY null, null

相当于一份数据被当了N倍,$O(2^N)$,当维度增大时,放大的数据量急剧增大

生成的物理计划:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[gradeID#19L, classID#20L, spark_grouping_id#18L], 
   functions=[max(score#8L)], output=[gradeID#19L, classID#20L, max(score)#15L])
   +- Exchange hashpartitioning(gradeID#19L, classID#20L, spark_grouping_id#18L, 1), 
	  ENSURE_REQUIREMENTS, [plan_id=84]
      +- HashAggregate(keys=[gradeID#19L, classID#20L, spark_grouping_id#18L],
	     functions=[partial_max(score#8L)], output=[gradeID#19L, classID#20L, spark_grouping_id#18L, max#34L])
         +- Expand [[score#8L, gradeID#7L, classID#6L, 0], [score#8L, gradeID#7L, null, 1], 
		    [score#8L, null, classID#6L, 2], [score#8L, null, null, 3]], [score#8L, gradeID#19L, 
			classID#20L, spark_grouping_id#18L]
            +- Project [score#8L, gradeID#7L, classID#6L]
               +- Scan ExistingRDD[classID#6L,gradeID#7L,score#8L,studentID#9]

执行原理

多维分析在执行过程中一般都会产生空值(null)。

  • 常用的方法是使用grouping函数来区分,如果是由多维分析产生的null值,则grouping函数返回 1
  • 如果是数据本身的null值,则grouping函数返回 0

rollup

测试代码 桶 with cube
with rollup 查询

1
2
SELECT gradeID, classID, max(score)  FROM  exam
GROUP BY gradeID, classID WITH ROLLUP

结果:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
+-------+-------+----------+
|gradeID|classID|max(score)|
+-------+-------+----------+
|      1|      1|        91|
|      1|   NULL|        91|
|   NULL|   NULL|        99|
|      1|      2|        76|
|      2|      2|        81|
|      2|   NULL|        99|
|      2|      1|        99|
+-------+-------+----------+

等价于:

1
2
3
4
5
SELECT gradeID, classID, max(score)  FROM  exam GROUP BY gradeID, classID
UNION
SELECT gradeID, classID, max(score)  FROM  exam GROUP BY gradeID, null
UNION
SELECT gradeID, classID, max(score)  FROM  exam GROUP BY null, null

物理计划:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[gradeID#19L, classID#20L, spark_grouping_id#18L], 
   functions=[max(score#8L)], output=[gradeID#19L, classID#20L, max(score)#15L])
   +- Exchange hashpartitioning(gradeID#19L, classID#20L, spark_grouping_id#18L, 1), 
      ENSURE_REQUIREMENTS, [plan_id=84]
      +- HashAggregate(keys=[gradeID#19L, classID#20L, spark_grouping_id#18L], 
	     functions=[partial_max(score#8L)], output=[gradeID#19L, classID#20L, spark_grouping_id#18L, max#34L])
         +- Expand [[score#8L, gradeID#7L, classID#6L, 0], [score#8L, gradeID#7L, null, 1], 
		    [score#8L, null, null, 3]], [score#8L, gradeID#19L, classID#20L, spark_grouping_id#18L]
            +- Project [score#8L, gradeID#7L, classID#6L]
               +- Scan ExistingRDD[classID#6L,gradeID#7L,score#8L,studentID#9]

grouping sets

测试代码同 with cube
查询 SQL

1
2
SELECT gradeID, classID, max(score)  FROM  exam
GROUP BY gradeID, classID GROUPING SETS((gradeID, classID), gradeID)

等价于:

1
2
3
SELECT gradeID, classID, max(score)  FROM  exam  GROUP BY gradeID, null
UNION
SELECT gradeID, classID, max(score)  FROM  exam  GROUP BY null, classID

结果:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
+-------+-------+----------+
|gradeID|classID|max(score)|
+-------+-------+----------+
|      1|      1|        91|
|      1|   NULL|        91|
|      1|      2|        76|
|      2|      2|        81|
|      2|   NULL|        99|
|      2|      1|        99|
+-------+-------+----------+

物理计划:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[gradeID#19L, classID#20L, spark_grouping_id#18L], 
   functions=[max(score#8L)], output=[gradeID#19L, classID#20L, max(score)#15L])
   +- Exchange hashpartitioning(gradeID#19L, classID#20L, spark_grouping_id#18L, 1), 
      ENSURE_REQUIREMENTS, [plan_id=81]
      +- HashAggregate(keys=[gradeID#19L, classID#20L, spark_grouping_id#18L], 
	     functions=[partial_max(score#8L)], output=[gradeID#19L, classID#20L, spark_grouping_id#18L, max#34L])
         +- Expand [[score#8L, gradeID#7L, classID#6L, 0], [score#8L, gradeID#7L, null, 1]], 
		    [score#8L, gradeID#19L, classID#20L, spark_grouping_id#18L]
            +- Project [score#8L, gradeID#7L, classID#6L]
               +- Scan ExistingRDD[classID#6L,gradeID#7L,score#8L,studentID#9]

原理分析

逻辑计划 –> 物理计划

类图:

其中 group by 后面的表达式,用 bitmap 来存储的
生成Expand+Aggregate节点

ResolveGroupingAnalytics规则

  • 生成GroupingSets节点:针对Aggregate节点,如果其中group by语句对应的是cube或rollup表达式,计算相应的bitmasks,将Aggregate节点转换为GroupingSets节点
  • 生成Expand+Aggregate节点:针对GroupingSets节点,生成Expand逻辑算子树节点加上Aggregate逻辑算子树节点
  • 替换多维分析函数(grouping_id和grouping函数):将逻辑算子树某些节点中(主要存在Filter或Sort节点)的表达式所包含的多维分析函数,替换为对应生成的列名(在Expand节点中生成)
  • 由于还存在一些未解析的表达式,后面还需要继续 analysis
  • GroupingSets节点转换为Aggregate Expand + Project3个节点的组合+

Expand表示“扩展”

  • 多维分析在本质上相当于执行多种组合的group by操作
  • 因此Expand所起的作用就是将一条数据扩展为特定形式的多条数据

多维分析逻辑算子树

  • 在物理算子树中,ExpandExec节点对应Expand逻辑算子树节点,Aggregate节点生成两个HashAggregateExec物理算子树节点
  • 两个HashAggregateExec节点之间需要进行分区方面的处理,因此在最后阶段的EnsureRequirements规则中,会添加ExExchange节点进行Shuffle操作
  • 最终得到的物理执行计划包含6个节点
  • Spark SQL中的多维分析的执行没有特殊之处,本质上仍然是聚合查询的计算过程
  • Expand方式执行多维分析虽然能够达到只读一次数据表的效果,但是在某些场景下容易造成中间数据的膨胀
  • 如:数据的维度太高,Expand会产生指数级别的数据量

ExpandExec

  • 代码中的注释说明
  • Apply all of the GroupExpressions to every input row, hence we will get multiple output rows for an input row.

一大段注释:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
    /*
     * When the projections list looks like:
     *   expr1A, exprB, expr1C
     *   expr2A, exprB, expr2C
     *   ...
     *   expr(N-1)A, exprB, expr(N-1)C
     *
     * i.e. column A and C have different values for each output row, but column B stays constant.
     *
     * The generated code looks something like (note that B is only computed once in declaration):
     *
     * // part 1: declare all the columns
     * colA = ...
     * colB = ...
     * colC = ...
     *
     * // part 2: code that computes the columns
     * for (row = 0; row < N; row++) {
     *   switch (row) {
     *     case 0:
     *       colA = ...
     *       colC = ...
     *     case 1:
     *       colA = ...
     *       colC = ...
     *     ...
     *     case N - 1:
     *       colA = ...
     *       colC = ...
     *   }
     *   // increment metrics and consume output values
     * }
     *
     * We use a for loop here so we only includes one copy of the consume code and avoid code
     * size explosion.
     */

将一份数据复制 N 份,然后加上 gid,再做聚合

窗口函数

定义

1
2
3
4
window_function [ nulls_option ] OVER
( [  { PARTITION | DISTRIBUTE } BY partition_col_name = partition_col_val ( [ , ... ] ) ]
  { ORDER | SORT } BY expression [ ASC | DESC ] [ NULLS { FIRST | LAST } ] [ , ... ]
  [ window_frame ] )

窗口函数涉及了3个核心元素

  • 分区((PARTITION|DISTRIBUTE)BY)信息
  • 排序((ORDER|SORT)BY)信息
  • 窗框定义(windowFrame)

测试代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
  test("windows test") {
    val jsonData = Seq(
      """{"name": "Lisa", "dept": "Sales", "salary": 10000, "age": 35}""",
      """{"name": "Evan", "dept": "Sales", "salary": 32000, "age": 38}""",
      """{"name": "Fred", "dept": "Engineering", "salary": 21000, "age": 28}""",
      """{"name": "Alex", "dept": "Sales", "salary": 30000, "age": 33}""",
      """{"name": "Tom", "dept": "Engineering", "salary": 23000, "age": 33}""",
      """{"name": "Jane", "dept": "Marketing", "salary": 29000, "age": 28}""",
      """{"name": "Jeff", "dept": "Marketing", "salary": 35000, "age": 38}""",
      """{"name": "Paul", "dept": "Engineering", "salary": 29000, "age": 23}""",
      """{"name": "Chloe", "dept": "Engineering", "salary": 23000, "age": 25}"""
    )
    val df = spark.read.json(spark.sparkContext.parallelize(jsonData))
    df.createTempView("employees")
    val df2 = spark.sql(
      """
        |SELECT name, dept, salary, RANK() OVER (PARTITION BY dept
        |ORDER BY salary) AS rank FROM employees;
        |""".stripMargin)
    df2.show()
    df2.explain("extended")
  }

结果:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
+-----+-----------+------+----+
| name|       dept|salary|rank|
+-----+-----------+------+----+
| Fred|Engineering| 21000|   1|
|  Tom|Engineering| 23000|   2|
|Chloe|Engineering| 23000|   2|
| Paul|Engineering| 29000|   4|
| Jane|  Marketing| 29000|   1|
| Jeff|  Marketing| 35000|   2|
| Lisa|      Sales| 10000|   1|
| Alex|      Sales| 30000|   2|
| Evan|      Sales| 32000|   3|
+-----+-----------+------+----+

物理计划:

1
2
3
4
5
6
7
8
AdaptiveSparkPlan isFinalPlan=false
+- Window [rank(salary#9L) windowspecdefinition(dept#7, salary#9L ASC NULLS FIRST, 
   specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#14], 
   [dept#7], [salary#9L ASC NULLS FIRST]
   +- Sort [dept#7 ASC NULLS FIRST, salary#9L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(dept#7, 1), ENSURE_REQUIREMENTS, [plan_id=84]
         +- Project [name#8, dept#7, salary#9L]
            +- Scan ExistingRDD[age#6L,dept#7,name#8,salary#9L]

执行过程:

窗口类型

  • WindowExpression的窗口函数(WindowFunction)是Expression类型
  • 分区信息类型为Seq[Expression],在上述案例中表示按照gradeID和classID这两列进行分区
  • 排序信息类型为Seq[SortOrder],对应score列的降序
  • 窗框(WindowFrame)定义比较重要,有UnspecifiedFrame和SpecifiedWindowFrame两个子类

SpecifiedW indowFrame表示一个完整的窗框定义,包含

  • frameType、frameStart、frameEnd 3个元素
  • 分别代表窗框类型(FrameType)
  • 起始的窗口边界(FrameBoundary)
  • 终止的窗口边界(FrameBoundary)

FrameBoundary包含

  • UnboundedPreceding
  • ValuePreceding(value:Int)
  • CurrentRow
  • ValueFollow ing(value:Int)
  • UnboundedFollowing

RowFrame针对窗口分区中的所有数据行

  • 当ValuePreceding和ValueFollowing作为窗口边界时,其中的value值代表物理偏移量
  • 例如“ROW BETWEEN 1PRECEDING AND 2 FOLLOW ING”表示4行数据构成的窗框,即从当前行的前一行到后两行

RangeFrame针对的是用于排序的列

  • 当ValuePreceding和ValueFollowing作为窗口边界时,其中的value值代表逻辑偏移量
  • 例如假设当前行的score值为87,而窗框的边界定义为“RANGEBETWEEN 1PRECEDINGAND 1FOLLOW ING”
  • 那么所对应的数据窗口为score在[86,88]范围内的数据行

转换过程:

窗口的聚合过程:

有5种窗框类型

  • 全部数据分区(Entire Partition):即UNBOUNDED PRECEDING AND UNBOUNDED FOLLOW ING。在这种情况下,对于每条数据,需要处理该数据所在数据分区中的所有数据,对应实现为UnboundedW indowFunctionFrame类
  • 扩张框(Growing Frame):即UNBOUNDED PRECEDING AND……在这种情况下,每次都会移动到新的数据行进行处理,并添加一些数据行扩展该框架,在这种类型的窗框中数据不会被移除,只会不停地加入新数据,窗口范围不停地“扩张”,对应的实现为UnboundedPrecedingW indow FunctionFrame类,案例中的row_number()函数就属于这种类型
  • 收缩框(Shrinking Frame):该框架只会移除数据,即……ANDUNBOUNDED FOLLOW ING。在这种情况下,每次都会移动到新的数据行进行处理,并从窗框中移除一些数据行,这种类型的窗框中不会添加数据,窗口不停地“收缩”,对应的实现为UnboundedFollowing-W indowFunctionFrame类
  • 移动框(Moving Frame):每次处理到新的数据行,都会添加一些数据,同时也会删除一些数据,例如(1 PRECEDING AND CURRENT ROW)或(1 FOLLOW ING AND 2 FOLLOW ING),对应的实现为SlidingW indow FunctionFrame类
  • 偏移框(Offset Frame):该窗框仅包含一行数据,即距离当前数据行特定偏移量的数据。这里需要注意的是,偏移框仅适用于Off setW indowFunction类型的窗口函数

根据WindowExec类doExecute方法可知,整体的执行逻辑分为两步

  • 按分区读取数据(fetchNextPartition),并将其保存在ArrayBuffer中,在此过程中先构造一个ArrayRowBuffer存储数据,如果超过阈值(默认为4096),则切换为基于磁盘的UnsafeExternalSorter数据结构;当前分区数据读取结束后,根据UnsafeExternalSorter是否为空,判断分区数据保存在ArrayRowBuffer还是ExternalRowBuffer数据结构中
  • 遍历RowBuffer中的数据,逐条执行write操作,最终调用AggregateProcessor(注:封装row_number()函数)中的update等方法完成计算

lateral

测试代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
  test("lateral test") {
    val jsonData = Seq(
      """{"id": 100, "name": "John", "age": 30, "class": 1, "address": "Street 1"}""",
      """{"id": 200, "name": "Mary", "age": null, "class": 1, "address": "Street 2"}""",
      """{"id": 300, "name": "Mike", "age": 80, "class": 3, "address": "Street 3"}""",
      """{"id": 400, "name": "Dan", "age": 50, "class": 4, "address": "Street 4"}"""
    )
    val df = spark.read.json(spark.sparkContext.parallelize(jsonData))
    df.createTempView("person ")

    val df2 = spark.sql(
      """
        |SELECT * FROM person
        |    LATERAL VIEW EXPLODE(ARRAY(30, 60)) tableName AS c_age
        |    LATERAL VIEW EXPLODE(ARRAY(40, 80)) AS d_age
        |""".stripMargin)
    df2.show()
    df2.explain("extended")

    println("===========================")
    spark.sql(
      """
        |SELECT c_age, COUNT(1) FROM person
        |    LATERAL VIEW EXPLODE(ARRAY(30, 60)) AS c_age
        |    LATERAL VIEW EXPLODE(ARRAY(40, 80)) AS d_age
        |GROUP BY c_age;
        |""".stripMargin).show()

    spark.sql("SELECT * FROM person\n    LATERAL VIEW EXPLODE(ARRAY()) tableName AS c_age;").show()
    spark.sql("SELECT * FROM person\n    LATERAL VIEW OUTER EXPLODE(ARRAY()) tableName AS c_age;").show()
  }

第一个 sql 生成的物理计划:

1
2
3
4
== Physical Plan ==
*(1) Generate explode([40,80]), [address#6, age#7L, class#8L, id#9L, name#10, c_age#16], false, [d_age#17]
+- *(1) Generate explode([30,60]), [address#6, age#7L, class#8L, id#9L, name#10], false, [c_age#16]
   +- *(1) Scan ExistingRDD[address#6,age#7L,class#8L,id#9L,name#10]

执行结果:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
+--------+----+-----+---+----+-----+-----+
| address| age|class| id|name|c_age|d_age|
+--------+----+-----+---+----+-----+-----+
|Street 1|  30|    1|100|John|   30|   40|
|Street 1|  30|    1|100|John|   30|   80|
|Street 1|  30|    1|100|John|   60|   40|
|Street 1|  30|    1|100|John|   60|   80|
|Street 2|NULL|    1|200|Mary|   30|   40|
|Street 2|NULL|    1|200|Mary|   30|   80|
|Street 2|NULL|    1|200|Mary|   60|   40|
|Street 2|NULL|    1|200|Mary|   60|   80|
|Street 3|  80|    3|300|Mike|   30|   40|
|Street 3|  80|    3|300|Mike|   30|   80|
|Street 3|  80|    3|300|Mike|   60|   40|
|Street 3|  80|    3|300|Mike|   60|   80|
|Street 4|  50|    4|400| Dan|   30|   40|
|Street 4|  50|    4|400| Dan|   30|   80|
|Street 4|  50|    4|400| Dan|   60|   40|
|Street 4|  50|    4|400| Dan|   60|   80|
+--------+----+-----+---+----+-----+-----+

+-----+--------+
|c_age|count(1)|
+-----+--------+
|   30|       8|
|   60|       8|
+-----+--------+

+-------+---+-----+---+----+-----+
|address|age|class| id|name|c_age|
+-------+---+-----+---+----+-----+
+-------+---+-----+---+----+-----+

+--------+----+-----+---+----+-----+
| address| age|class| id|name|c_age|
+--------+----+-----+---+----+-----+
|Street 1|  30|    1|100|John| NULL|
|Street 2|NULL|    1|200|Mary| NULL|
|Street 3|  80|    3|300|Mike| NULL|
|Street 4|  50|    4|400| Dan| NULL|
+--------+----+-----+---+----+-----+

pivot

一个简单的例子

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
  test("pivot test 1") {
    val jsonData = Seq(
      """{"d": "2024-04-28", "c": 20}""",
      """{"d": "2024-03-01", "c": 14}""",
      """{"d": "2024-05-01", "c": 24}""",
      """{"d": "2024-05-02", "c": 25}""",
      """{"d": "2024-05-03", "c": 26}"""
    )
    val df = spark.read.json(spark.sparkContext.parallelize(jsonData))
    df.createTempView("temperature")
    val sql =
      """
        |SELECT * FROM (
        |  SELECT year(d) AS year, month(d) month, c
        |  FROM temperature
        |)
        |PIVOT (
        |  cast(avg(c) AS decimal(4, 1))
        |  FOR month IN (
        |    1 JAN, 2 FEB, 3 MAR, 4 APR, 5 MAY, 6 JUN
        |  )
        |
        |)
        |""".stripMargin
    val df2 = spark.sql(sql)
    df2.show()
    df2.explain("extended")
  }

结果:

1
2
3
4
5
+----+----+----+----+----+----+----+
|year| JAN| FEB| MAR| APR| MAY| JUN|
+----+----+----+----+----+----+----+
|2024|NULL|NULL|14.0|20.0|25.0|NULL|
+----+----+----+----+----+----+----+

参考这里


两个聚合函数的测试代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
test("test pivot") {
    val jsonData = Seq(
      """{"id": 100, "name": "John", "age": 30, "class": 1, "address": "Street 1"}""",
      """{"id": 200, "name": "Mary", "age": null, "class": 1, "address": "Street 2"}""",
      """{"id": 300, "name": "Mike", "age": 80, "class": 3, "address": "Street 3"}""",
      """{"id": 400, "name": "Dan", "age": 50, "class": 4, "address": "Street 4"}"""
    )
    val df: DataFrame = spark.read.json(spark.sparkContext.parallelize(jsonData))
    df.createTempView("person")

    val sql =
      """
        |SELECT * FROM person
        |    PIVOT (
        |        SUM(age) AS a, AVG(class) AS c
        |        FOR name IN ('John' AS john, 'Mike' AS mike)
        |    )
        |""".stripMargin
    val df2 = spark.sql(sql)
    df2.show()
    df2.explain("extended")
  }

结果:

1
2
3
4
5
6
7
8
+--------+---+------+------+------+------+
| address| id|john_a|john_c|mike_a|mike_c|
+--------+---+------+------+------+------+
|Street 1|100|    30|   1.0|  NULL|  NULL|
|Street 2|200|  NULL|  NULL|  NULL|  NULL|
|Street 3|300|  NULL|  NULL|    80|   3.0|
|Street 4|400|  NULL|  NULL|  NULL|  NULL|
+--------+---+------+------+------+------+

物理计划:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [address#6, id#9L, __pivot_sum(person.age) AS a AS `sum(person.age) 
    AS a`#29[0] AS john_a#36L, __pivot_avg(person.class) AS c AS `avg(person.class) AS c`#35[0] AS john_c#37,
    __pivot_sum(person.age) AS a AS `sum(person.age) AS a`#29[1] AS mike_a#38L, __pivot_avg(person.class) 
	AS c AS `avg(person.class) AS c`#35[1] AS mike_c#39]
   +- HashAggregate(keys=[address#6, id#9L], functions=[pivotfirst(name#10, sum(person.age) AS a#22L, 
      John, Mike, 0, 0), pivotfirst(name#10, avg(person.class) AS c#23, John, Mike, 0, 0)], output=[address#6, id#9L, 
	  __pivot_sum(person.age) AS a AS `sum(person.age) AS a`#29, __pivot_avg(person.class) AS c AS `avg(person.class) AS c`#35])
      +- Exchange hashpartitioning(address#6, id#9L, 1), ENSURE_REQUIREMENTS, [plan_id=139]
         +- HashAggregate(keys=[address#6, id#9L], functions=[partial_pivotfirst(name#10, sum(person.age) 
		    AS a#22L, John, Mike, 0, 0), partial_pivotfirst(name#10, avg(person.class) AS c#23, John, Mike, 0, 0)], 
			output=[address#6, id#9L, John#26L, Mike#27L, John#32, Mike#33])
            +- HashAggregate(keys=[address#6, id#9L, name#10], functions=[sum(age#7L), avg(class#8L)], 
			   output=[address#6, id#9L, name#10, sum(person.age) AS a#22L, avg(person.class) AS c#23])
               +- Exchange hashpartitioning(address#6, id#9L, name#10, 1), ENSURE_REQUIREMENTS, [plan_id=135]
                  +- HashAggregate(keys=[address#6, id#9L, name#10], functions=[partial_sum(age#7L), partial_avg(class#8L)], 
				     output=[address#6, id#9L, name#10, sum#75L, sum#76, count#77L])
                     +- Scan ExistingRDD[address#6,age#7L,class#8L,id#9L,name#10]

unpivot

测试代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
  test("unpivot test") {
    val jsonData = Seq(
      """{"year": 2020, "q1": null, "q2": 1000, "q3": 2000, "q4": 2500}""",
      """{"year": 2021, "q1": 2250, "q2": 3200, "q3": 4200, "q4": 5900}""",
      """{"year": 2022, "q1": 4200, "q2": 3100, "q3": null, "q4": null}"""
    )
    val df = spark.read.json(spark.sparkContext.parallelize(jsonData))
    df.createTempView("sales_quarterly  ")

    val df2 = spark.sql(
      """
        |SELECT * FROM sales_quarterly
        |    UNPIVOT (
        |        sales FOR quarter IN (q1, q2, q3, q4)
        |    );
        |""".stripMargin)
    df2.explain("extended")

    println("=====================")
    df2.show()
    spark.sql(
      """
        |SELECT up.* FROM sales_quarterly
        |    UNPIVOT INCLUDE NULLS (
        |        sales FOR quarter IN (q1 AS Q1, q2 AS Q2, q3 AS Q3, q4 AS Q4)
        |    ) AS up;
        |""".stripMargin).show()
    spark.sql(
      """
        |SELECT * FROM sales_quarterly
        |    UNPIVOT EXCLUDE NULLS (
        |        (first_quarter, second_quarter)
        |        FOR half_of_the_year IN (
        |            (q1, q2) AS H1,
        |            (q3, q4) AS H2
        |        )
        |    );
        |""".stripMargin).show()

  }

第一个的物理计划

1
2
3
*(1) Filter isnotnull(sales#17L)
+- *(1) Expand [[year#10L, q1, q1#6L], [year#10L, q2, q2#7L], [year#10L, q3, q3#8L], [year#10L, q4, q4#9L]], [year#10L, quarter#16, sales#17L]
   +- *(1) Scan ExistingRDD[q1#6L,q2#7L,q3#8L,q4#9L,year#10L]

执行结果:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
+----+-------+-----+
|year|quarter|sales|
+----+-------+-----+
|2020|     q2| 1000|
|2020|     q3| 2000|
|2020|     q4| 2500|
|2021|     q1| 2250|
|2021|     q2| 3200|
|2021|     q3| 4200|
|2021|     q4| 5900|
|2022|     q1| 4200|
|2022|     q2| 3100|
+----+-------+-----+

+----+-------+-----+
|year|quarter|sales|
+----+-------+-----+
|2020|     Q1| NULL|
|2020|     Q2| 1000|
|2020|     Q3| 2000|
|2020|     Q4| 2500|
|2021|     Q1| 2250|
|2021|     Q2| 3200|
|2021|     Q3| 4200|
|2021|     Q4| 5900|
|2022|     Q1| 4200|
|2022|     Q2| 3100|
|2022|     Q3| NULL|
|2022|     Q4| NULL|
+----+-------+-----+

+----+----------------+-------------+--------------+
|year|half_of_the_year|first_quarter|second_quarter|
+----+----------------+-------------+--------------+
|2020|              H1|         NULL|          1000|
|2020|              H2|         2000|          2500|
|2021|              H1|         2250|          3200|
|2021|              H2|         4200|          5900|
|2022|              H1|         4200|          3100|
+----+----------------+-------------+--------------+

with cube 代码生成

代码生成

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=1
/* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */   private Object[] references;
/* 008 */   private scala.collection.Iterator[] inputs;
/* 009 */   private boolean hashAgg_initAgg_0;
/* 010 */   private boolean hashAgg_bufIsNull_0;
/* 011 */   private long hashAgg_bufValue_0;
/* 012 */   private hashAgg_FastHashMap_0 hashAgg_fastHashMap_0;
/* 013 */   private org.apache.spark.unsafe.KVIterator<UnsafeRow, UnsafeRow> hashAgg_fastHashMapIter_0;
/* 014 */   private org.apache.spark.unsafe.KVIterator hashAgg_mapIter_0;
/* 015 */   private org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap hashAgg_hashMap_0;
/* 016 */   private org.apache.spark.sql.execution.UnsafeKVExternalSorter hashAgg_sorter_0;
/* 017 */   private scala.collection.Iterator rdd_input_0;
/* 018 */   private boolean expand_resultIsNull_0;
/* 019 */   private long expand_resultValue_0;
/* 020 */   private boolean expand_resultIsNull_1;
/* 021 */   private long expand_resultValue_1;
/* 022 */   private boolean expand_resultIsNull_2;
/* 023 */   private long expand_resultValue_2;
/* 024 */   private boolean hashAgg_hashAgg_isNull_11_0;
/* 025 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] rdd_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[7];
/* 026 */
/* 027 */   public GeneratedIteratorForCodegenStage1(Object[] references) {
/* 028 */     this.references = references;
/* 029 */   }
/* 030 */
/* 031 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 032 */     partitionIndex = index;
/* 033 */     this.inputs = inputs;
/* 034 */
/* 035 */     rdd_input_0 = inputs[0];
/* 036 */     rdd_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(4, 32);
/* 037 */     rdd_mutableStateArray_0[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(3, 0);
/* 038 */     rdd_mutableStateArray_0[2] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(3, 0);
/* 039 */     expand_resultIsNull_0 = true;
/* 040 */     expand_resultValue_0 = -1L;
/* 041 */     expand_resultIsNull_1 = true;
/* 042 */     expand_resultValue_1 = -1L;
/* 043 */     expand_resultIsNull_2 = true;
/* 044 */     expand_resultValue_2 = -1L;
/* 045 */     rdd_mutableStateArray_0[3] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(4, 0);
/* 046 */     rdd_mutableStateArray_0[4] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(4, 0);
/* 047 */     rdd_mutableStateArray_0[5] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(3, 0);
/* 048 */     rdd_mutableStateArray_0[6] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(4, 0);
/* 049 */
/* 050 */   }
/* 051 */
/* 052 */   public class hashAgg_FastHashMap_0 {
/* 053 */     private org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch batch;
/* 054 */     private int[] buckets;
/* 055 */     private int capacity = 1 << 16;
/* 056 */     private double loadFactor = 0.5;
/* 057 */     private int numBuckets = (int) (capacity / loadFactor);
/* 058 */     private int maxSteps = 2;
/* 059 */     private int numRows = 0;
/* 060 */     private Object emptyVBase;
/* 061 */     private long emptyVOff;
/* 062 */     private int emptyVLen;
/* 063 */     private boolean isBatchFull = false;
/* 064 */     private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
/* 065 */
/* 066 */     public hashAgg_FastHashMap_0(
/* 067 */       org.apache.spark.memory.TaskMemoryManager taskMemoryManager,
/* 068 */       InternalRow emptyAggregationBuffer) {
/* 069 */       batch = org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch
/* 070 */       .allocate(((org.apache.spark.sql.types.StructType) references[1] /* keySchemaTerm */), ((org.apache.spark.sql.types.StructType) references[2] /* valueSchemaTerm */), taskMemoryManager, capacity);
/* 071 */
/* 072 */       final UnsafeProjection valueProjection = UnsafeProjection.create(((org.apache.spark.sql.types.StructType) references[2] /* valueSchemaTerm */));
/* 073 */       final byte[] emptyBuffer = valueProjection.apply(emptyAggregationBuffer).getBytes();
/* 074 */
/* 075 */       emptyVBase = emptyBuffer;
/* 076 */       emptyVOff = Platform.BYTE_ARRAY_OFFSET;
/* 077 */       emptyVLen = emptyBuffer.length;
/* 078 */
/* 079 */       agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(
/* 080 */         3, 0);
/* 081 */
/* 082 */       buckets = new int[numBuckets];
/* 083 */       java.util.Arrays.fill(buckets, -1);
/* 084 */     }
/* 085 */
/* 086 */     public org.apache.spark.sql.catalyst.expressions.UnsafeRow findOrInsert(long hashAgg_key_0, long hashAgg_key_1, long hashAgg_key_2) {
/* 087 */       long h = hash(hashAgg_key_0, hashAgg_key_1, hashAgg_key_2);
/* 088 */       int step = 0;
/* 089 */       int idx = (int) h & (numBuckets - 1);
/* 090 */       while (step < maxSteps) {
/* 091 */         // Return bucket index if it's either an empty slot or already contains the key
/* 092 */         if (buckets[idx] == -1) {
/* 093 */           if (numRows < capacity && !isBatchFull) {
/* 094 */             agg_rowWriter.reset();
/* 095 */             agg_rowWriter.zeroOutNullBytes();
/* 096 */             agg_rowWriter.write(0, hashAgg_key_0);
/* 097 */             agg_rowWriter.write(1, hashAgg_key_1);
/* 098 */             agg_rowWriter.write(2, hashAgg_key_2);
/* 099 */             org.apache.spark.sql.catalyst.expressions.UnsafeRow agg_result
/* 100 */             = agg_rowWriter.getRow();
/* 101 */             Object kbase = agg_result.getBaseObject();
/* 102 */             long koff = agg_result.getBaseOffset();
/* 103 */             int klen = agg_result.getSizeInBytes();
/* 104 */
/* 105 */             UnsafeRow vRow
/* 106 */             = batch.appendRow(kbase, koff, klen, emptyVBase, emptyVOff, emptyVLen);
/* 107 */             if (vRow == null) {
/* 108 */               isBatchFull = true;
/* 109 */             } else {
/* 110 */               buckets[idx] = numRows++;
/* 111 */             }
/* 112 */             return vRow;
/* 113 */           } else {
/* 114 */             // No more space
/* 115 */             return null;
/* 116 */           }
/* 117 */         } else if (equals(idx, hashAgg_key_0, hashAgg_key_1, hashAgg_key_2)) {
/* 118 */           return batch.getValueRow(buckets[idx]);
/* 119 */         }
/* 120 */         idx = (idx + 1) & (numBuckets - 1);
/* 121 */         step++;
/* 122 */       }
/* 123 */       // Didn't find it
/* 124 */       return null;
/* 125 */     }
/* 126 */
/* 127 */     private boolean equals(int idx, long hashAgg_key_0, long hashAgg_key_1, long hashAgg_key_2) {
/* 128 */       UnsafeRow row = batch.getKeyRow(buckets[idx]);
/* 129 */       return (row.getLong(0) == hashAgg_key_0) && (row.getLong(1) == hashAgg_key_1) && (row.getLong(2) == hashAgg_key_2);
/* 130 */     }
/* 131 */
/* 132 */     private long hash(long hashAgg_key_0, long hashAgg_key_1, long hashAgg_key_2) {
/* 133 */       long hashAgg_hash_0 = 0;
/* 134 */
/* 135 */       long hashAgg_result_0 = hashAgg_key_0;
/* 136 */       hashAgg_hash_0 = (hashAgg_hash_0 ^ (0x9e3779b9)) + hashAgg_result_0 + (hashAgg_hash_0 << 6) + (hashAgg_hash_0 >>> 2);
/* 137 */
/* 138 */       long hashAgg_result_1 = hashAgg_key_1;
/* 139 */       hashAgg_hash_0 = (hashAgg_hash_0 ^ (0x9e3779b9)) + hashAgg_result_1 + (hashAgg_hash_0 << 6) + (hashAgg_hash_0 >>> 2);
/* 140 */
/* 141 */       long hashAgg_result_2 = hashAgg_key_2;
/* 142 */       hashAgg_hash_0 = (hashAgg_hash_0 ^ (0x9e3779b9)) + hashAgg_result_2 + (hashAgg_hash_0 << 6) + (hashAgg_hash_0 >>> 2);
/* 143 */
/* 144 */       return hashAgg_hash_0;
/* 145 */     }
/* 146 */
/* 147 */     public org.apache.spark.unsafe.KVIterator<UnsafeRow, UnsafeRow> rowIterator() {
/* 148 */       return batch.rowIterator();
/* 149 */     }
/* 150 */
/* 151 */     public void close() {
/* 152 */       batch.close();
/* 153 */     }
/* 154 */
/* 155 */   }
/* 156 */
/* 157 */   private void expand_doConsume_0(long expand_expr_0_0, boolean expand_exprIsNull_0_0, long expand_expr_1_0, boolean expand_exprIsNull_1_0, long expand_expr_2_0, boolean expand_exprIsNull_2_0) throws java.io.IOException {
/* 158 */     for (int expand_i_0 = 0; expand_i_0 < 4; expand_i_0 ++) {
/* 159 */       switch (expand_i_0) {
/* 160 */       case 0:
/* 161 */
/* 162 */         expand_resultIsNull_0 = expand_exprIsNull_1_0;
/* 163 */         expand_resultValue_0 = expand_expr_1_0;
/* 164 */
/* 165 */         expand_resultIsNull_1 = expand_exprIsNull_2_0;
/* 166 */         expand_resultValue_1 = expand_expr_2_0;
/* 167 */
/* 168 */         expand_resultIsNull_2 = false;
/* 169 */         expand_resultValue_2 = 0L;
/* 170 */
/* 171 */         break;
/* 172 */
/* 173 */       case 1:
/* 174 */
/* 175 */         expand_resultIsNull_0 = expand_exprIsNull_1_0;
/* 176 */         expand_resultValue_0 = expand_expr_1_0;
/* 177 */
/* 178 */         expand_resultIsNull_1 = true;
/* 179 */         expand_resultValue_1 = -1L;
/* 180 */
/* 181 */         expand_resultIsNull_2 = false;
/* 182 */         expand_resultValue_2 = 1L;
/* 183 */
/* 184 */         break;
/* 185 */
/* 186 */       case 2:
/* 187 */
/* 188 */         expand_resultIsNull_0 = true;
/* 189 */         expand_resultValue_0 = -1L;
/* 190 */
/* 191 */         expand_resultIsNull_1 = expand_exprIsNull_2_0;
/* 192 */         expand_resultValue_1 = expand_expr_2_0;
/* 193 */
/* 194 */         expand_resultIsNull_2 = false;
/* 195 */         expand_resultValue_2 = 2L;
/* 196 */
/* 197 */         break;
/* 198 */
/* 199 */       case 3:
/* 200 */
/* 201 */         expand_resultIsNull_0 = true;
/* 202 */         expand_resultValue_0 = -1L;
/* 203 */
/* 204 */         expand_resultIsNull_1 = true;
/* 205 */         expand_resultValue_1 = -1L;
/* 206 */
/* 207 */         expand_resultIsNull_2 = false;
/* 208 */         expand_resultValue_2 = 3L;
/* 209 */
/* 210 */         break;
/* 211 */       }
/* 212 */       ((org.apache.spark.sql.execution.metric.SQLMetric) references[8] /* numOutputRows */).add(1);
/* 213 */
/* 214 */       hashAgg_doConsume_0(expand_expr_0_0, expand_exprIsNull_0_0, expand_resultValue_0, expand_resultIsNull_0, expand_resultValue_1, expand_resultIsNull_1, expand_resultValue_2);
/* 215 */
/* 216 */     }
/* 217 */
/* 218 */   }
/* 219 */
/* 220 */   private void hashAgg_doAggregateWithKeys_0() throws java.io.IOException {
/* 221 */     while ( rdd_input_0.hasNext()) {
/* 222 */       InternalRow rdd_row_0 = (InternalRow) rdd_input_0.next();
/* 223 */       ((org.apache.spark.sql.execution.metric.SQLMetric) references[7] /* numOutputRows */).add(1);
/* 224 */       // common sub-expressions
/* 225 */
/* 226 */       boolean rdd_isNull_2 = rdd_row_0.isNullAt(2);
/* 227 */       long rdd_value_2 = rdd_isNull_2 ?
/* 228 */       -1L : (rdd_row_0.getLong(2));
/* 229 */       boolean rdd_isNull_1 = rdd_row_0.isNullAt(1);
/* 230 */       long rdd_value_1 = rdd_isNull_1 ?
/* 231 */       -1L : (rdd_row_0.getLong(1));
/* 232 */       boolean rdd_isNull_0 = rdd_row_0.isNullAt(0);
/* 233 */       long rdd_value_0 = rdd_isNull_0 ?
/* 234 */       -1L : (rdd_row_0.getLong(0));
/* 235 */
/* 236 */       expand_doConsume_0(rdd_value_2, rdd_isNull_2, rdd_value_1, rdd_isNull_1, rdd_value_0, rdd_isNull_0);
/* 237 */       // shouldStop check is eliminated
/* 238 */     }
/* 239 */
/* 240 */     hashAgg_fastHashMapIter_0 = hashAgg_fastHashMap_0.rowIterator();
/* 241 */     hashAgg_mapIter_0 = ((org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0] /* plan */).finishAggregate(hashAgg_hashMap_0, hashAgg_sorter_0, ((org.apache.spark.sql.execution.metric.SQLMetric) references[3] /* peakMemory */), ((org.apache.spark.sql.execution.metric.SQLMetric) references[4] /* spillSize */), ((org.apache.spark.sql.execution.metric.SQLMetric) references[5] /* avgHashProbe */), ((org.apache.spark.sql.execution.metric.SQLMetric) references[6] /* numTasksFallBacked */));
/* 242 */
/* 243 */   }
/* 244 */
/* 245 */   private void hashAgg_doConsume_0(long hashAgg_expr_0_0, boolean hashAgg_exprIsNull_0_0, long hashAgg_expr_1_0, boolean hashAgg_exprIsNull_1_0, long hashAgg_expr_2_0, boolean hashAgg_exprIsNull_2_0, long hashAgg_expr_3_0) throws java.io.IOException {
/* 246 */     UnsafeRow hashAgg_unsafeRowAggBuffer_0 = null;
/* 247 */     UnsafeRow hashAgg_fastAggBuffer_0 = null;
/* 248 */
/* 249 */     if (!hashAgg_exprIsNull_1_0 && !hashAgg_exprIsNull_2_0 && !false) {
/* 250 */       hashAgg_fastAggBuffer_0 = hashAgg_fastHashMap_0.findOrInsert(
/* 251 */         hashAgg_expr_1_0, hashAgg_expr_2_0, hashAgg_expr_3_0);
/* 252 */     }
/* 253 */     // Cannot find the key in fast hash map, try regular hash map.
/* 254 */     if (hashAgg_fastAggBuffer_0 == null) {
/* 255 */       // generate grouping key
/* 256 */       rdd_mutableStateArray_0[5].reset();
/* 257 */
/* 258 */       rdd_mutableStateArray_0[5].zeroOutNullBytes();
/* 259 */
/* 260 */       if (hashAgg_exprIsNull_1_0) {
/* 261 */         rdd_mutableStateArray_0[5].setNullAt(0);
/* 262 */       } else {
/* 263 */         rdd_mutableStateArray_0[5].write(0, hashAgg_expr_1_0);
/* 264 */       }
/* 265 */
/* 266 */       if (hashAgg_exprIsNull_2_0) {
/* 267 */         rdd_mutableStateArray_0[5].setNullAt(1);
/* 268 */       } else {
/* 269 */         rdd_mutableStateArray_0[5].write(1, hashAgg_expr_2_0);
/* 270 */       }
/* 271 */
/* 272 */       rdd_mutableStateArray_0[5].write(2, hashAgg_expr_3_0);
/* 273 */       int hashAgg_unsafeRowKeyHash_0 = (rdd_mutableStateArray_0[5].getRow()).hashCode();
/* 274 */       if (true) {
/* 275 */         // try to get the buffer from hash map
/* 276 */         hashAgg_unsafeRowAggBuffer_0 =
/* 277 */         hashAgg_hashMap_0.getAggregationBufferFromUnsafeRow((rdd_mutableStateArray_0[5].getRow()), hashAgg_unsafeRowKeyHash_0);
/* 278 */       }
/* 279 */       // Can't allocate buffer from the hash map. Spill the map and fallback to sort-based
/* 280 */       // aggregation after processing all input rows.
/* 281 */       if (hashAgg_unsafeRowAggBuffer_0 == null) {
/* 282 */         if (hashAgg_sorter_0 == null) {
/* 283 */           hashAgg_sorter_0 = hashAgg_hashMap_0.destructAndCreateExternalSorter();
/* 284 */         } else {
/* 285 */           hashAgg_sorter_0.merge(hashAgg_hashMap_0.destructAndCreateExternalSorter());
/* 286 */         }
/* 287 */
/* 288 */         // the hash map had be spilled, it should have enough memory now,
/* 289 */         // try to allocate buffer again.
/* 290 */         hashAgg_unsafeRowAggBuffer_0 = hashAgg_hashMap_0.getAggregationBufferFromUnsafeRow(
/* 291 */           (rdd_mutableStateArray_0[5].getRow()), hashAgg_unsafeRowKeyHash_0);
/* 292 */         if (hashAgg_unsafeRowAggBuffer_0 == null) {
/* 293 */           // failed to allocate the first page
/* 294 */           throw new org.apache.spark.memory.SparkOutOfMemoryError("No enough memory for aggregation");
/* 295 */         }
/* 296 */       }
/* 297 */
/* 298 */     }
/* 299 */
/* 300 */     // Updates the proper row buffer
/* 301 */     if (hashAgg_fastAggBuffer_0 != null) {
/* 302 */       hashAgg_unsafeRowAggBuffer_0 = hashAgg_fastAggBuffer_0;
/* 303 */     }
/* 304 */
/* 305 */     // common sub-expressions
/* 306 */
/* 307 */     // evaluate aggregate functions and update aggregation buffers
/* 308 */
/* 309 */     hashAgg_hashAgg_isNull_11_0 = true;
/* 310 */     long hashAgg_value_12 = -1L;
/* 311 */
/* 312 */     boolean hashAgg_isNull_12 = hashAgg_unsafeRowAggBuffer_0.isNullAt(0);
/* 313 */     long hashAgg_value_13 = hashAgg_isNull_12 ?
/* 314 */     -1L : (hashAgg_unsafeRowAggBuffer_0.getLong(0));
/* 315 */
/* 316 */     if (!hashAgg_isNull_12 && (hashAgg_hashAgg_isNull_11_0 ||
/* 317 */         hashAgg_value_13 > hashAgg_value_12)) {
/* 318 */       hashAgg_hashAgg_isNull_11_0 = false;
/* 319 */       hashAgg_value_12 = hashAgg_value_13;
/* 320 */     }
/* 321 */
/* 322 */     if (!hashAgg_exprIsNull_0_0 && (hashAgg_hashAgg_isNull_11_0 ||
/* 323 */         hashAgg_expr_0_0 > hashAgg_value_12)) {
/* 324 */       hashAgg_hashAgg_isNull_11_0 = false;
/* 325 */       hashAgg_value_12 = hashAgg_expr_0_0;
/* 326 */     }
/* 327 */
/* 328 */     if (!hashAgg_hashAgg_isNull_11_0) {
/* 329 */       hashAgg_unsafeRowAggBuffer_0.setLong(0, hashAgg_value_12);
/* 330 */     } else {
/* 331 */       hashAgg_unsafeRowAggBuffer_0.setNullAt(0);
/* 332 */     }
/* 333 */
/* 334 */   }
/* 335 */
/* 336 */   private void hashAgg_doAggregateWithKeysOutput_0(UnsafeRow hashAgg_keyTerm_0, UnsafeRow hashAgg_bufferTerm_0)
/* 337 */   throws java.io.IOException {
/* 338 */     ((org.apache.spark.sql.execution.metric.SQLMetric) references[9] /* numOutputRows */).add(1);
/* 339 */
/* 340 */     boolean hashAgg_isNull_14 = hashAgg_keyTerm_0.isNullAt(0);
/* 341 */     long hashAgg_value_15 = hashAgg_isNull_14 ?
/* 342 */     -1L : (hashAgg_keyTerm_0.getLong(0));
/* 343 */     boolean hashAgg_isNull_15 = hashAgg_keyTerm_0.isNullAt(1);
/* 344 */     long hashAgg_value_16 = hashAgg_isNull_15 ?
/* 345 */     -1L : (hashAgg_keyTerm_0.getLong(1));
/* 346 */     long hashAgg_value_17 = hashAgg_keyTerm_0.getLong(2);
/* 347 */     boolean hashAgg_isNull_17 = hashAgg_bufferTerm_0.isNullAt(0);
/* 348 */     long hashAgg_value_18 = hashAgg_isNull_17 ?
/* 349 */     -1L : (hashAgg_bufferTerm_0.getLong(0));
/* 350 */
/* 351 */     rdd_mutableStateArray_0[6].reset();
/* 352 */
/* 353 */     rdd_mutableStateArray_0[6].zeroOutNullBytes();
/* 354 */
/* 355 */     if (hashAgg_isNull_14) {
/* 356 */       rdd_mutableStateArray_0[6].setNullAt(0);
/* 357 */     } else {
/* 358 */       rdd_mutableStateArray_0[6].write(0, hashAgg_value_15);
/* 359 */     }
/* 360 */
/* 361 */     if (hashAgg_isNull_15) {
/* 362 */       rdd_mutableStateArray_0[6].setNullAt(1);
/* 363 */     } else {
/* 364 */       rdd_mutableStateArray_0[6].write(1, hashAgg_value_16);
/* 365 */     }
/* 366 */
/* 367 */     rdd_mutableStateArray_0[6].write(2, hashAgg_value_17);
/* 368 */
/* 369 */     if (hashAgg_isNull_17) {
/* 370 */       rdd_mutableStateArray_0[6].setNullAt(3);
/* 371 */     } else {
/* 372 */       rdd_mutableStateArray_0[6].write(3, hashAgg_value_18);
/* 373 */     }
/* 374 */     append((rdd_mutableStateArray_0[6].getRow()));
/* 375 */
/* 376 */   }
/* 377 */
/* 378 */   protected void processNext() throws java.io.IOException {
/* 379 */     if (!hashAgg_initAgg_0) {
/* 380 */       hashAgg_initAgg_0 = true;
/* 381 */       hashAgg_fastHashMap_0 = new hashAgg_FastHashMap_0(((org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0] /* plan */).getTaskContext().taskMemoryManager(), ((org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0] /* plan */).getEmptyAggregationBuffer());
/* 382 */
/* 383 */       ((org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0] /* plan */).getTaskContext().addTaskCompletionListener(
/* 384 */         new org.apache.spark.util.TaskCompletionListener() {
/* 385 */           @Override
/* 386 */           public void onTaskCompletion(org.apache.spark.TaskContext context) {
/* 387 */             hashAgg_fastHashMap_0.close();
/* 388 */           }
/* 389 */         });
/* 390 */
/* 391 */       hashAgg_hashMap_0 = ((org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0] /* plan */).createHashMap();
/* 392 */       long wholestagecodegen_beforeAgg_0 = System.nanoTime();
/* 393 */       hashAgg_doAggregateWithKeys_0();
/* 394 */       ((org.apache.spark.sql.execution.metric.SQLMetric) references[10] /* aggTime */).add((System.nanoTime() - wholestagecodegen_beforeAgg_0) / 1000000);
/* 395 */     }
/* 396 */     // output the result
/* 397 */
/* 398 */     while ( hashAgg_fastHashMapIter_0.next()) {
/* 399 */       UnsafeRow hashAgg_aggKey_0 = (UnsafeRow) hashAgg_fastHashMapIter_0.getKey();
/* 400 */       UnsafeRow hashAgg_aggBuffer_0 = (UnsafeRow) hashAgg_fastHashMapIter_0.getValue();
/* 401 */       hashAgg_doAggregateWithKeysOutput_0(hashAgg_aggKey_0, hashAgg_aggBuffer_0);
/* 402 */
/* 403 */       if (shouldStop()) return;
/* 404 */     }
/* 405 */     hashAgg_fastHashMap_0.close();
/* 406 */
/* 407 */     while ( hashAgg_mapIter_0.next()) {
/* 408 */       UnsafeRow hashAgg_aggKey_0 = (UnsafeRow) hashAgg_mapIter_0.getKey();
/* 409 */       UnsafeRow hashAgg_aggBuffer_0 = (UnsafeRow) hashAgg_mapIter_0.getValue();
/* 410 */       hashAgg_doAggregateWithKeysOutput_0(hashAgg_aggKey_0, hashAgg_aggBuffer_0);
/* 411 */       if (shouldStop()) return;
/* 412 */     }
/* 413 */     hashAgg_mapIter_0.close();
/* 414 */     if (hashAgg_sorter_0 == null) {
/* 415 */       hashAgg_hashMap_0.free();
/* 416 */     }
/* 417 */   }
/* 418 */
/* 419 */ }

参考