子查询

理论

SPJ,选择、投影、连接,最重要的三个操作
非 SPJ

从子查询出现在SQL语句的位置看,它可以出现在

  • 目标列
  • FROM子句
  • WHERE子句
  • JOIN/ON子句
  • GROUPBY子句
  • HAVING子句
  • ORDERBY子句等位置

子查询出现在不同位置对优化的影响如下

  • 目标列位置,子查询如果位于目标列,则只能是标量子查询
  • FROM子句位置,不能是相关子查询,可以是 非相关子查询,此时可以 上拉子查询变成 join
  • WHERE子句位置,是条件表达式的一部分,如Int类型的>、<、=、<> 此时必须是标量子查询,其他谓词包括:IN、BETWEEN、EXISTS
  • JOIN/ON,JOIN类似于FROM 子句,ON 类似于WHERE 子句,总体处理方式跟 FROM、WHERE 类似
  • GROUPBY子句位置,目标列必须和 group by 相关联,但一般没什么意义
  • ORDERBY子句位置,一般没什么意义

子查询分类

  • 相关子查询。子查询的执行依赖于外层父查询的一些属性值。子查询因依赖于父查询的参数,当父查询的参数改变时,子查询需要根据新参数值重新执行
  • 非相关子查询。子查询的执行不依赖于外层父查询的任何属性值,这样的子查询具有独立性,可独自求解

从特定谓词看

  • [NOT]IN/ALL/ANY/SOME子查询,语义相近,表示“[取反]存在/所有/任何/任何”,左面是操作数,右面是子查询,是最常见的子查询类型之一
  • [NOT]EXISTS子查询,半连接语义,表示“[取反]存在”,没有左操作数,右面是子查询,也是最常见的子查询类型之一

查询分类

  • SPJ子查询。由选择、连接、投影操作组成的查询。
  • GROUPBY子查询。SPJ子查询加上分组、聚集操作组成的查询。
  • 其他子查询。GROUPBY子查询中加上其他子句如Top-N、LIMIT/OFFSET、集合、排序等操作
  • 后两种子查询有时合称非SPJ子查询

从结果集的角度看,子查询分为以下4类:

  • 标量子查询。子查询返回的结果集类型是一个单一值(return a scalar,a single value)。
  • 列子查询。子查询返回的结果集类型是一条单一元组(return a single row)。
  • 行子查询。子查询返回的结果集类型是一个单一列(return a single column)。
  • 表子查询。子查询返回的结果集类型是一个表(多行多列)(return a table,one or more rows of one or more columns)

优化技术

子查询合并(Subquery Coalescing)

1
2
3
4
SELECT * FROM t1 WHERE a1<10 AND (
        EXISTS (SELECT a2 FROM t2 WHERE t2.a2<5 AND t2.b2=1) OR 
        EXISTS (SELECT a2 FROM t2 WHERE t2.a2<5 AND t2.b2=2) 
);

可优化为:

1
2
SELECT * FROM t1 WHERE a1<10 AND (
        EXISTS (SELECT a2 FROM t2 WHERE t2.a2<5 AND (t2.b2=1 OR t2.b2=2) 

子查询展开(Subquery Unnesting)
又称子查询反嵌套,又称为子查询上拉。把一些子查询置于外层的父查询中,作为连接关系与外层父查询并列

  • 如果子查询包含了:聚集、GROUPBY、DISTINCT子句,则子查询只能单独求解,不可以上拉到上层
  • 如果子查询只是一个简单格式(SPJ格式)的查询语句,则可以上拉到上层
1
2
SELECT * FROM t1, (SELECT * FROM t2 WHERE t2.a2 >10) v_t2 
WHERE t1.a1<10 AND v_t2.a2<20;

可优化为:

1
SELECT * FROM t1, t2 WHERE t1.a1<10 AND t2.a2<20 AND t2.a2 >10; 

查询展开的具体步骤如下:

  1. 将子查询和上层查询的FROM子句连接为同一个FROM子句,并且修改相应的运行参数。
  2. 将子查询的谓词符号进行相应修改(如:IN修改为=ANY)。
  3. 将子查询的WHERE条件作为一个整体与上层查询的WHERE条件合并,并用AND条件连接词连接,从而保证新生成的谓词与原谓词的上下文意思相同,且成为一个整体

IN 子查询优化,将外层的条件 下推到内层,然后将 IN 改为 EXIST
EXIST 可以被: 半连接算法实现优化

Spark中的查询分类

Spark 中根据查询结果的 子查询分类

  • OneRowSubquery,在 OptimizeOneRowRelationSubquery 中
  • LateralSubquery,多行多列
  • ScalarSubquery,一行一列

OptimzeSubqueries

OptimzeSubqueries 这个优化规则,目前只包含了一个场景,将子查询上面的 sort 去掉了
这个规则值做这么一个简单的事情,由其他规则负责将 sub-queries 转为 join

1
2
3
4
5
6
7
8
9
    private def removeTopLevelSort(plan: LogicalPlan): LogicalPlan = {
      if (!plan.containsPattern(SORT)) {
        return plan
      }
      plan match {
        case Sort(_, _, child) => child
        case Project(fields, child) => Project(fields, removeTopLevelSort(child))
        case other => other
      } 

SQL语句如下:

1
2
3
4
5
6
7
8
SELECT id,
          customer_id,
         ( SELECT MAX(item_price)
               FROM item
               WHERE item.order_id = order.id
               ORDER BY MAX(item_price)
         ) AS max_item_price
FROM order


PullupCorrelatedPredicates

PullupCorrelatedPredicates 优化规则实现如下
SQL如下:

1
2
3
4
5
6
7
SELECT id, customer_id, order_name
FROM order
WHERE (
    SELECT MAX(item_price) AS max_item_price
    FROM item
    WHERE item.order_id = order.id
) < 100

这里还使用了 RewriteCorrelatedScalarSubquery ,将子查询改写为 join
优化后,Filter(order_id = outer(id)),这个被优化成 join 条件了,也就是把相关 子查询中的 谓词给 上提了

注意,这个规则,以及类似的其他规则,只是做了一部分优化,后面会有其他规则继续做优化,最终看到的结果,是多个优化规则叠加完成的
这个规则执行前,t1 是内表

1
2
3
4
SELECT t2.id FROM t2 WHERE t2.id IN
(
    SELECT t1.id FROM t1 WHERE t1.id = t2.id
)

生成的逻辑规则是这样:

1
2
3
4
5
6
7
8
9
id: int
Project [id#1]
+- Filter id#1 IN (list#0 [id#1])
   :  +- Project [id#3]
   :     +- Filter (id#3 = outer(id#1))
   :        +- SubqueryAlias t1
   :           +- RelationV2[id#3, sname#4] t1
   +- SubqueryAlias t2
      +- RelationV2[id#1, sname#2]  t2

下面的这个是优化的中间结果:

1
2
3
4
5
Project [id#1]
+- Filter id#1 IN (list#0 [id#1 && (id#3 = id#1)])
   :  +- Project [id#3]
   :     +- RelationV2[id#3, sname#4]  t1
   +- RelationV2[id#1, sname#2]  t2

也就是只是将 子查询的条件 上提了,转成 JOIN 是后面的规则做的

RewritePredicateSubquery

RewritePredicateSubquery 这个规则将谓词子查询重写为left semi/anti join。支持以下谓词:

  • EXISTS/NOT EXISTS将被重写为semi/anti join,Filter中未解析的条件将被提取为join条件
  • IN/NOT IN将被重写为semi/anti join,Filter中未解析的条件将作为join条件被拉出,value=selected列也将用作join条件

两个表,结构一样,包含 id(int), sname(varchar)

1
2
3
4
SELECT id, sname FROM t1 WHERE id IN
(
    SELECT id FROM p1 WHERE id < 10
) AND id > 999

逻辑计划:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
== Analyzed Logical Plan ==
id: bigint, sname: string
Project [id#118L, sname#119]
+- Filter (id#118L IN (list#142 []) AND (id#118L > cast(999 as bigint)))
   :  +- Project [id#15L]
   :     +- Filter (id#15L < cast(10 as bigint))
   :        +- SubqueryAlias p1
   :           +- View (`p1`, [age#14L,id#15L,sname#16])
   :              +- Relation [age#14L,id#15L,sname#16] json
   +- SubqueryAlias t1
      +- View (`t1`, [age#117L,id#118L,sname#119])
         +- Relation [age#117L,id#118L,sname#119] json

期间会有一个中间步骤的优化,大致相当于

1
2
3
4
5
6
Project [id#118L, sname#119]
+- Filter (id#118L IN (list#142 []) AND (id#118L > cast(999 as bigint)))
   :  +- Project [id#15L]
   :     +- Filter (id#15L < cast(10 as bigint))
   :              +- Relation [age#14L,id#15L,sname#16] json
   +- Relation [age#117L,id#118L,sname#119] json

执行规则,这里用 IN 来 举例

  1. 跟 IN 拆分 条件,IN 里面的是子查询,跟 IN 同级的是 普通查询的条件
  2. 将普通查询条件 用 AND 做连接,变成新条件
  3. 子查询做投影过滤,去掉不需要的列
  4. 根据外层 plan,新的 plan,获取 join 的ON 条件
  5. 新条件跟 原外层 plan合并,变成新 outerPlan
  6. 用 outerPlan,innerPlan(子查询变换的),ON 条件,LeftSemi,组合成 JOIN 逻辑计划
  7. 如果是 NOT,则 JOIN 类型为 LeftAnti,也就是:左边中没有跟右表匹配的所有行

优化后的:

1
2
3
4
5
6
7
8
== Optimized Logical Plan ==
Join LeftSemi, (id#118L = id#15L)
:- Project [id#118L, sname#119]
:  +- Filter (isnotnull(id#118L) AND (id#118L > 999))
:     +- Relation [age#117L,id#118L,sname#119] json
+- Project [id#15L]
   +- Filter (isnotnull(id#15L) AND (id#15L < 10))
      +- Relation [age#14L,id#15L,sname#16] json

其他一些情况

带常数,无关子查询的 SQL

1
2
3
4
SELECT t1.* FROM t1 WHERE 10 IN
(
    SELECT t2.id FROM t2
)

优化后的,这里使用的是 Spark3.5

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
== Analyzed Logical Plan ==
id: int, sname: string
Project [id#1, sname#2]
+- Filter 10 IN (list#0 [])
   :  +- Project [id#3]
   :     +- SubqueryAlias t2
   :        +- RelationV2[id#3, sname#4] t2
   +- SubqueryAlias t1
      +- RelationV2[id#1, sname#2] t1

== Optimized Logical Plan ==
Join LeftSemi
:- RelationV2[id#1, sname#2] t1
+- LocalLimit 1
   +- Project
      +- Filter (10 = id#3)
         +- RelationV2[id#3] t2

带随机值的SQL:

1
2
3
4
5
SELECT t1.id FROM  t1 WHERE t1.id + RAND() * 10
IN
(
     SELECT t2.id FROM t2
)

优化后的:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
== Analyzed Logical Plan ==
id: int
Project [id#1]
+- Filter (cast(id#1 as double) + (rand(2079373737972317466) * cast(10 as double))) IN (list#0 [])
   :  +- Project [cast(id#3 as double) AS id#5]
   :     +- Project [id#3]
   :        +- SubqueryAlias t2
   :           +- RelationV2[id#3, sname#4] t2
   +- SubqueryAlias t1
      +- RelationV2[id#1, sname#2] t1 t1

== Optimized Logical Plan ==
Join LeftSemi, (knownfloatingpointnormalized(normalizenanandzero((cast(id#1 as double) + (rand(2079373737972317466) * 10.0)))) 
		= knownfloatingpointnormalized(normalizenanandzero(id#5)))
:- RelationV2[id#1] t1
+- Project [cast(id#3 as double) AS id#5]
   +- RelationV2[id#3] t2

算子合并

CollapseRepartition

合并相邻的RepartitionOperation和RebalancePartitions运算符

1
2
3
4
    var df = spark.range(1, 100, 1)
    df = df.repartition(5)
    df = df.coalesce(10)
    df.explain("extended")

逻辑计划:

1
2
3
4
5
== Analyzed Logical Plan ==
id: bigint
Repartition 10, false
+- Repartition 5, true
   +- Range (1, 100, step=1, splits=Some(8))

优化后

1
2
3
== Optimized Logical Plan ==
Repartition 5, true
+- Range (1, 100, step=1, splits=Some(8))

如果 repartition 比 coalesce 更大,则不合并

1
2
3
    var df = spark.range(1, 100, 1)
    df = df.repartition(10).coalesce(5)
    df.explain("extended")

逻辑和优化计划

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
== Analyzed Logical Plan ==
id: bigint
Repartition 5, false
+- Repartition 10, true
   +- Range (1, 100, step=1, splits=Some(8))

== Optimized Logical Plan ==
Repartition 5, false
+- Repartition 10, true
   +- Range (1, 100, step=1, splits=Some(8))

CollapseProject

两个Project运算符合并为一个别名替换,在以下情况下,将表达式合并为一个表达式

  1. 两个Project运算符相邻时。
  2. 当两个Project运算符之间有LocalLimit/Sample/Repartition运算符,且上层的Project由相同数量的列组成,且列数相等或具有别名时。同时也考虑到GlobalLimit(LocalLimit)模式
1
2
3
4
SELECT col_1 FROM
(
    SELECT 1 as col_1, 2 as col_2
)

逻辑计划

1
2
3
4
5
6
== Analyzed Logical Plan ==
col_1: int
Project [col_1#0]
+- SubqueryAlias __auto_generated_subquery_name
   +- Project [1 AS col_1#0, 2 AS col_2#1]
      +- OneRowRelation

优化后的

1
2
3
== Optimized Logical Plan ==
Project [1 AS col_1#0]
+- OneRowRelation

CombineFilters

将两个相邻的Filter运算符合并为一个,将非冗余条件合并为一个连接谓词

1
2
3
4
5
    var df = spark.sql(
      s"""
        |SELECT id, sname FROM $mysqlName.$mysqlDB.t1
        |""".stripMargin)
    df = df.filter("id > 10").filter("id < 999")

逻辑计划

1
2
3
4
5
6
7
== Analyzed Logical Plan ==
id: int, sname: string
Filter (id#0 < 999)
+- Filter (id#0 > 10)
   +- Project [id#0, sname#1]
      +- SubqueryAlias my_catalog.mysql57.hello.t1
         +- RelationV2[id#0, sname#1] my_catalog.mysql57.hello.t1 t1

中间结果

1
2
Filter ((id#0 > 10) AND (id#0 < 999))
+- RelationV2[id#0, sname#1] my_catalog.mysql57.hello.t1 t1

CombineUnions

将所有相邻的Union运算符合并为一个

逻辑计划

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
== Analyzed Logical Plan ==
id: int, sname: string
Distinct
+- Union false, false
   :- Distinct
   :  +- Union false, false
   :     :- Project [id#0, sname#1]
   :     :  +- Filter (id#0 < 10)
   :     :     +- SubqueryAlias t1
   :     :        +- RelationV2[id#0, sname#1]  t1
   :     +- Project [id#2, sname#3]
   :        +- Filter (id#2 > 100)
   :           +- SubqueryAlias t1
   :              +- RelationV2[id#2, sname#3]  t1
   +- Project [id#4, sname#5]
      +- Filter (id#4 < 999)
         +- SubqueryAlias t2
            +- RelationV2[id#4, sname#5]  t2

优化的中间结果

1
2
3
4
5
6
7
8
Aggregate [id#0, sname#1], [id#0, sname#1]
+- Union false, false
  :- Filter (id#0 < 10)
  :  +- RelationV2[id#0, sname#1]  t1
  :- Filter (id#2 > 100)
  :  +- RelationV2[id#2, sname#3]  t1
  +- Filter (id#4 < 999)
     +- RelationV2[id#4, sname#5]  t2

其他

OptimizeWindowFunctions

  • 将first(col)替换成nth_value(col, 1)来获得更好的性能

CollapseWindow

  • 折叠相邻的Window表达式
  • 如果分区规格和顺序规格相同,并且窗口表达式是独立的,且属于相同的窗口函数类型,则折叠到父节点中

常量折叠和强度消减

在编译器构建中,强度消减(strength reduction)是一种编译器优化

  • 其中昂贵的操作被等效但成本较低的操作所取代
  • 强度降低的经典例子将循环中的“强”乘法转换为“弱”加法——这是数组寻址中经常出现的情况
  • 强度消减的例子包括:用加法替换循环中的乘法、用乘法替换循环中的指数

NullPropagation

Replaces Expressions that can be statically evaluated with equivalent Literal values.
This rule is more specific with Null value propagation from bottom to top of the expression tree.

1
SELECT count(id=NULL) FROM t1

逻辑计划

1
2
3
4
5
== Analyzed Logical Plan ==
count((id = NULL)): bigint
Aggregate [count((id#0 = cast(null as int))) AS count((id = NULL))#3L]
+- SubqueryAlias t1
   +- RelationV2[id#0, sname#1] t1

中间结果

1
2
3
Aggregate [cast(0 as bigint) AS count((id = NULL))#3L]
+- Project [id#0]
   +- RelationV2[id#0, sname#1] t1

ConstantPropagation

用连接表达式中的相应值替换可以静态计算的属性

  • 例如:SELECT * FROM table WHERE i = 5 AND j = i + 3
  • 可以替换为: ==> SELECT * FROM table WHERE i = 5 AND j = 8
  • 使用的方法:通过查看所有相等的谓词来填充属性 => 常量值的映射;
  • 使用这个映射,将属性的出现的地方替换为AND节点中相应的常量值
1
2
SELECT *
FROM t3 WHERE id = 10 AND sid = 100 + id

逻辑计划:

1
2
3
4
5
6
== Analyzed Logical Plan ==
id: int, sid: int, sname: string
Project [id#0, sid#1, sname#2]
+- Filter ((id#0 = 10) AND (sid#1 = (100 + id#0)))
   +- SubqueryAlias t3
      +- RelationV2[id#0, sid#1, sname#2]  t3

中间结果

1
2
Filter ((id#0 = 10) AND (sid#1 = (100 + 10)))
+- RelationV2[id#0, sid#1, sname#2]  t3

OptimizeIn

优化IN谓词

  • 当列表为空且值不可为null时,将谓词转换为false
  • 删除文本值重复
  • 将In (value, seq[Literal])替换为更快的优化版本InSet (value, HashSet[Literal])
1
2
SELECT * FROM t1
 WHERE id IN (1, 2, 1, 1)

逻辑计划:

1
2
3
4
5
6
== Analyzed Logical Plan ==
id: int, sname: string
Project [id#0, sname#1]
+- Filter id#0 IN (1,2,1,1)
   +- SubqueryAlias t1
      +- RelationV2[id#0, sname#1]  t1

优化的中间结果

1
2
Filter id#0 IN (1,2)
+- RelationV2[id#0, sname#1]  t1

如果 IN 中的条件太多,超过阈值,spark.sql.optimizer.inSetConversionThreshold则 改用 insert

1
2
SELECT * FROM t1
WHERE id IN (1, 2, 3,4,5,6,7,8,9,10,11,12,13,14,15)

逻辑计划:

1
2
3
4
5
6
== Analyzed Logical Plan ==
id: int, sname: string
Project [id#0, sname#1]
+- Filter id#0 IN (1,2,3,4,5,6,7,8,9,10,11,12,13,14,15)
   +- SubqueryAlias t1
      +- RelationV2[id#0, sname#1]  t1

中间结果:

1
2
Filter id#0 INSET 1, 10, 11, 12, 13, 14, 15, 2, 3, 4, 5, 6, 7, 8, 9
+- RelationV2[id#0, sname#1]  t1

ConstantFolding

替换可以用等效文本值静态计算的表达式
The following conditions are used to determine suitability for constant folding:

  • A Coalesce is foldable if all of its children are foldable
  • A BinaryExpression is foldable if its both left and right child are foldable
  • A Not, IsNull, or IsNotNull is foldable if its child is foldable
  • A Literal is foldable
  • A Cast or UnaryMinus is foldable if its child is foldable
1
SELECT replace('123456', '123', 'abc') AS r

逻辑计划:

1
2
3
4
== Analyzed Logical Plan ==
r: string
Project [replace(123456, 123, abc) AS r#0]
+- OneRowRelation

中间结果:

1
2
Project [abc456 AS r#0]
+- OneRowRelation

这里面用到了 StringReplace,有很多相关的类

调用了 父类 TernaryExpression#eval(),做求值计算的


另一种情况

1
SELECT 1 + (id + 2) + 3 FROM $mysqlName.$mysqlDB.t1

逻辑计划:

1
2
3
4
5
== Analyzed Logical Plan ==
((1 + (id + 2)) + 3): int
Project [((1 + (id#0 + 2)) + 3) AS ((1 + (id + 2)) + 3)#2]
+- SubqueryAlias t1
   +- RelationV2[id#0, sname#1]  t1

优化后的:

1
2
3
== Optimized Logical Plan ==
Project [(id#0 + 6) AS ((1 + (id + 2)) + 3)#2]
+- RelationV2[id#0] t1

ReplaceNullWithFalseInPredicate

一个用False Literal替换Literal(null, BooleanType) 的规则,如果可能的话,在WHERE/HAVING/ON(JOIN)子句的搜索条件中
该子句包含一个隐式布尔运算符(search condition) = TRUE。当计算整个搜索条件时,只有当Literal(null, BooleanType)在语义上等同于FalseLiteral时,替换才有效
请注意,在大多数情况下,当搜索条件包含NOT和可空的表达式时,FALSE和NULL是不可交换的
因此,该规则非常保守,适用于非常有限的情况
例如

  • Filter(Literal(null, BooleanType))等同于Filter(FalseLiteral)
  • 另一个包含分支的示例是Filter(If(cond, FalseLiteral, Literal(null, _)));
  • 这可以优化为Filter(If(cond, FalseLiteral, FalseLiteral)),
  • 最终Filter(FalseLiteral) -
  • 此外,该规则还转换所有If表达式中的谓词,以及所有CaseWhen表达式中的分支条件,即使它们不是搜索条件的一部分
  • 例如,Project(If(And(cond, Literal(null)), Literal(1), Literal(2)))可以简化为Project(Literal(2))
1
SELECT IF  ((( 2 > 1) AND NULL), 'good', 'bad') AS col

逻辑计划和优化后的

1
2
3
4
5
6
7
8
== Analyzed Logical Plan ==
col: string
Project [if (((2 > 1) AND cast(null as boolean))) good else bad AS col#0]
+- OneRowRelation

== Optimized Logical Plan ==
Project [bad AS col#0]
+- OneRowRelation

CombineConcats

合并嵌套的Concat表达式

1
SELECT concat("abc", concat("def", "ghi")) AS col

逻辑计划和优化后的

1
2
3
4
5
6
7
8
== Analyzed Logical Plan ==
col: string
Project [concat(abc, concat(def, ghi)) AS col#0]
+- OneRowRelation

== Optimized Logical Plan ==
Project [abcdefghi AS col#0]
+- OneRowRelation

其他

还有很多,如

  • TransposeWindow
    • 转置相邻的窗口表达式。如果父窗口表达式的分区规范与子窗口表达式的分区规范兼容,就转置它们
  • NullDownPropagation
    • 如果输入不允许为空,则展开IsNull/IsNotNull的输入
    • 例如:IsNull(Not(null)) == IsNull(null)
  • FoldablePropagation
    • 用原始可折叠表达式的别名替换属性。其他优化将利用传播的可折叠表达
    • 如,此规则可以将SELECT 1.0 x, ‘abc’ y, Now() z ORDER BY x, y, 3
    • 优化成SELECT 1.0 x, ‘abc’ y, Now() z ORDER BY 1.0, ‘abc’, Now()
    • 其他规则可以进一步优化它,并且删除ORDER BY运算符
  • PushFoldableIntoBranches
    • 将可折叠表达式下推到if / case分支
  • RemoveDispensableExpressions
    • 移除不必要的节点

算子简化

LikeSimplification

简化了不需要完整正则表达式来计算条件的LIKE表达式
例如,当表达式只是检查字符串是否以给定模式开头时

1
2
SELECT *
FROM  t1  WHERE sname LIKE '%abc'

逻辑计划:

1
2
3
4
5
6
== Analyzed Logical Plan ==
id: int, sname: string
Project [id#0, sname#1]
+- Filter sname#1 LIKE %abc
   +- SubqueryAlias t1
      +- RelationV2[id#0, sname#1]  t1

优化的中间结果:

1
2
Filter EndsWith(sname#1, abc)
+- RelationV2[id#0, sname#1]  t1

BooleanSimplification

简化了布尔表达式:

  • 简化了答案可以在不计算双方的情况下确定的表达式
  • 消除/提取共同的因子
  • 合并相同的表达式
  • 删除Not运算符
1
2
3
SELECT *
FROM  t1
WHERE (id > 10 AND id < 100) OR id > 10


SimplifyBinaryComparison

使用语义相等的表达式简化二进制比较

  1. 用true文本值替代<==>;
  2. 如果操作数都是非空的,用true文本值替代 =, <=, 和 >=;
  3. 如果操作数都是非空的,用false文本值替代>和<;
  4. 如果有一边操作数是布尔文本值,就展开=、<=>
1
SELECT (1 + 2) <=> (4 - 1)

逻辑计划和优化后的

1
2
3
4
5
6
7
8
== Analyzed Logical Plan ==
((1 + 2) <=> (4 - 1)): boolean
Project [((1 + 2) <=> (4 - 1)) AS ((1 + 2) <=> (4 - 1))#0]
+- OneRowRelation

== Optimized Logical Plan ==
Project [true AS ((1 + 2) <=> (4 - 1))#0]
+- OneRowRelation

PruneFilters

删除可以进行简单计算的Filter。这可以通过以下方式实现:

  1. 在其计算结果始终为true的情况下,省略Filter
  2. 当筛选器的计算结果总是为false时,替换成一个伪空关系
  3. 消除子节点输出给定约束始终为true的条件
1
SELECT * FROM t1 WHERE 1 > 0

逻辑计划和优化后的

1
2
3
4
5
6
7
8
9
== Analyzed Logical Plan ==
id: int, sname: string
Project [id#0, sname#1]
+- Filter (1 > 0)
   +- SubqueryAlias t1
      +- RelationV2[id#0, sname#1]  t1

== Optimized Logical Plan ==
RelationV2[id#0, sname#1] t1

EliminateSerialization

消除不必要地在对象和数据项的序列化(InternalRow)表示之间切换的情况。例如,背对背映射操作

1
spark.range(10).ap(_ * 1).map(_ + 2).explain("extended")


SimplifyExtractValueOps

简化冗余的 CreateNamedStruct/CreateArray/CreateMap表达式

1
SELECT array('a', 'b', 'c')[0] as col_1

逻辑计划和优化后的

1
2
3
4
5
6
7
8
== Analyzed Logical Plan ==
col_1: string
Project [array(a, b, c)[0] AS col_1#0]
+- OneRowRelation

== Optimized Logical Plan ==
Project [a AS col_1#0]
+- OneRowRelation

map 的简化

1
SELECT map('a_key', 'a_value', 'b_key', 'b_value').a_key as col_1

逻辑计划和优化后的

1
2
3
4
5
6
7
8
== Analyzed Logical Plan ==
col_1: string
Project [map(a_key, a_value, b_key, b_value)[a_key] AS col_1#0]
+- OneRowRelation

== Optimized Logical Plan ==
Project [a_value AS col_1#0]
+- OneRowRelation

其他

SimplifyCasts

  • 删除不必要的强制转换,因为输入已经是正确的类型。

SimplifyCaseConversionExpressions

  • 删除不必要的内部大小写转换表达式,因为内部转换被外部转换覆盖

RemoveRedundantAliases

  • 从查询计划中删除冗余别名
  • 冗余别名是不会更改列的名称或元数据,也不会消除重复数据的别名

Rewrite/Replace/Eliminate规则

EliminateOuterJoin

包含的主要逻辑

图片来源,这里

1
2
3
4
5
6
7
8
    join.joinType match {
      case RightOuter if leftHasNonNullPredicate => Inner
      case LeftOuter if rightHasNonNullPredicate => Inner
      case FullOuter if leftHasNonNullPredicate && rightHasNonNullPredicate => Inner
      case FullOuter if leftHasNonNullPredicate => LeftOuter
      case FullOuter if rightHasNonNullPredicate => RightOuter
      case o => o
    }

转换的几个要求

  • right outer类型,对join的左表有过滤操作,转换 innert join
  • left outer类型,对右表有过滤操作,转换为 inner join
  • full outer,对左右表都有过滤,转为 inner join
  • full outer,join的左表有过滤,转为 left outer
  • full outer,join的右表有过滤,转为 right outer

以 left outer 为例

1
2
3
4
SELECT a.sname, b.sname FROM $mysqlName.$mysqlDB.t1 AS a
  LEFT JOIN $mysqlName.$mysqlDB.t2 AS b
ON a.id = b.id
WHERE b.id > 99

逻辑计划:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
== Analyzed Logical Plan ==
sname: string, sname: string
Project [sname#1, sname#3]
+- Filter (id#2 > 99)
   +- Join LeftOuter, (id#0 = id#2)
      :- SubqueryAlias a
      :  +- SubqueryAlias t1
      :     +- RelationV2[id#0, sname#1]  t1
      +- SubqueryAlias b
         +- SubqueryAlias t2
            +- RelationV2[id#2, sname#3]  t2

优化的中间结果:

1
2
3
4
Filter (id#2 > 99)
+- Join Inner, (id#0 = id#2)
   :- RelationV2[id#0, sname#1]  t1
   +- RelationV2[id#2, sname#3]  t2

FULL JOIN的优化

1
2
3
4
SELECT a.sname, b.sname FROM t1 AS a
  FULL JOIN t2 AS b
ON a.id = b.id
WHERE b.id > 99

逻辑计划:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
== Analyzed Logical Plan ==
sname: string, sname: string
Project [sname#1, sname#3]
+- Filter (id#2 > 99)
   +- Join FullOuter, (id#0 = id#2)
      :- SubqueryAlias a
      :  +- SubqueryAlias t1
      :     +- RelationV2[id#0, sname#1]  t1
      +- SubqueryAlias b
         +- SubqueryAlias t2
            +- RelationV2[id#2, sname#3]  t2

优化的中间结果:

1
2
3
4
Filter (id#2 > 99)
+- Join RightOuter, (id#0 = id#2)
   :- RelationV2[id#0, sname#1]  t1
   +- RelationV2[id#2, sname#3]  t2

join类型图解

EliminateDistinct

消除聚合函数中不需要的 distinct

1
SELECT max(distinct id) as max_id FROM  t1

逻辑计划 和 优化结果:

1
2
3
4
5
6
7
8
9
== Analyzed Logical Plan ==
max_id: int
Aggregate [max(distinct id#1) AS max_id#0]
+- SubqueryAlias .t1
   +- RelationV2[id#1, sname#2]  t1

== Optimized Logical Plan ==
Project [MAX(`id`)#6 AS max(id#1)#3 AS max_id#0]
+- RelationV2[MAX(`id`)#6] t1

EliminateLimits

该规则由normal和AQE优化器应用,并通过以下方式优化Limit运算符:

  • 如果是child max row<=Limit,则消除Limit/GlobalLimit运算符
  • 将两个相邻的Limit运算符合并为一个,将多个表达式合并成一个
1
2
3
4
5
    var df = spark.sql(
      s"""
        |SELECT id, sname FROM $mysqlName.$mysqlDB.t1
        |""".stripMargin)
    df = df.limit(5).limit(10)

逻辑计划

1
2
3
4
5
6
7
8
9
== Analyzed Logical Plan ==
id: int, sname: string
GlobalLimit 10
+- LocalLimit 10
   +- GlobalLimit 5
      +- LocalLimit 5
         +- Project [id#0, sname#1]
            +- SubqueryAlias t1
               +- RelationV2[id#0, sname#1]  t1

中间结果

1
2
3
GlobalLimit 5
+- LocalLimit 5
   +- RelationV2[id#0, sname#1]  t1

其他

ReplaceIntersectWithSemiJoin

  • 使用 left-semi Join 运算符替代逻辑Intersect运算符

ReplaceExceptWithFilter

  • 如果逻辑Except运算符中的一或两个数据集都纯粹地使用Filter转换过
  • 这个规则会使用反转Except运算符右侧条件之后的Filter运算符替代

ReplaceExceptWithAntiJoin

  • 使用 left-anti Join运算符替代逻辑Except运算符

ReplaceDistinctWithAggregate

  • 使用Aggregate运算符替代逻辑Distinct运算符

RewriteExceptAll

  • 混合使用Union、Aggregate、Generate 运算符来替代逻辑的Except运算符

RewriteIntersectAll

  • 混合使用Union、Aggregate、Generate 运算符来替代逻辑的Intersect运算符

EliminateLimits

  • 该规则由normal和AQE优化器应用,并通过以下方式优化Limit运算符:
  • 1.如果是child max row<=Limit,则消除Limit/GlobalLimit运算符
  • 2.将两个相邻的Limit运算符合并为一个,将多个表达式合并成一个

EliminateAggregateFilter

  • 删除聚合表达式的无用FILTER子句
  • 在RewriteDistinctAggregates之前,应该先应用这个规则

EliminateSerialization

  • 消除不必要地在对象和数据项的序列化(InternalRow)表示之间切换的情况
  • 例如,背对背映射操作

ReplaceIntersectWithSemiJoin

  • 使用 left-semi Join 运算符替代逻辑Intersect运算符

ReplaceExceptWithFilter

  • 如果逻辑Except运算符中的一或两个数据集都纯粹地使用Filter转换过
  • 这个规则会使用反转Except运算符右侧条件之后的Filter运算符替代。

ReplaceExceptWithAntiJoin

  • 用 left-anti Join运算符替代逻辑Except运算符

ReplaceDistinctWithAggregate

  • 使用Aggregate运算符替代逻辑Distinct运算符

RewriteExceptAll

  • 混合使用Union、Aggregate、Generate 运算符来替代逻辑的Except运算符

RewriteIntersectAll

  • 混合使用Union、Aggregate、Generate 运算符来替代逻辑的Intersect运算符

下推规则

PushDownPredicates

这个规则由三个规则组成

1
2
3
4
5
6
7
8
object PushDownPredicates extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning(
    _.containsAnyPattern(FILTER, JOIN)) {
    CombineFilters.applyLocally
      .orElse(PushPredicateThroughNonJoin.applyLocally)
      .orElse(PushPredicateThroughJoin.applyLocally)
  }
}


图片来自这里

PushPredicateThroughNonJoin

这个规则又包含了 6 个处理逻辑:

Case 1 – Push predicate through Project

1
2
3
4
 SELECT id FROM
 (
 SELECT id FROM t1 WHERE
 ) WHERE id < 100

首先要判断 project 中的字符安是否是确定性的,只有确定性的才能将条件下推
转换后的中间结果

1
2
3
Project [id#0]
+- Filter (id#0 < 100)
   +- RelationV2[id#0, sname#1] t1

这种情况就不能下推,外层

1
2
3
4
5
 SELECT id, r FROM
 (
 SELECT id, rand() * 10 AS r  FROM t1
     WHERE r > 10
 ) WHERE r < 100

下推后相当于: (r > 10 AND r < 100),但因为 字段 r 是随机产生的,是不确定的
这样过滤后结果就不对了,所以不能下推


Case 2 – Push predicate through Aggregate
SQL如下:

1
2
3
4
5
 SELECT id, sname FROM
 (
 SELECT id, sname, count(1) AS c  FROM t1
     WHERE id > 10  GROUP BY id, sname
 ) WHERE c = 1 AND id < 999

执行逻辑:

  • 首先也是检查 project 中的字符安是否是确定性的,只有确定性的才能下推
  • 字段必须要有 GROUP BY,也就是外层的 Filter 字段在 GROUP BY 中才可以下推
  • 获取 Aggregate 中别名和真实字段的对应关系
  • 将 Filter字段拆分为 确定的(候选字段)、不确定的
  • 根据候选字段,跟聚会的字段作比较,只有在 聚合列表中的,比如 id < 999 中的id可以,c 则不行
  • 将能不能下推的字段重新组合
  • 能下推的字段,移到 Aggregate 下面作为 Filter,这样就完成了下推

逻辑计划:

1
2
3
4
5
6
7
8
9
== Analyzed Logical Plan ==
id: int, sname: string
Project [id#1, sname#2]
+- Filter ((c#0L = cast(1 as bigint)) AND (id#1 < 999))
   +- SubqueryAlias __auto_generated_subquery_name
      +- Aggregate [id#1, sname#2], [id#1, sname#2, count(1) AS c#0L]
         +- Filter (id#1 > 10)
            +- SubqueryAlias t1
               +- RelationV2[id#1, sname#2]  t1

优化的中间结果:

1
2
3
4
5
Filter (c#0L = cast(1 as bigint))
+- Aggregate [id#1, sname#2], [id#1, sname#2, count(1) AS c#0L]
   +- Filter (id#1 < 999)
      +- Filter (id#1 > 10)
         +- RelationV2[id#1, sname#2] t1

Case 3 – Push predicate through Window
SQL:

1
2
3
4
5
6
 SELECT id, sname FROM
 (
  SELECT id, sname,
    row_number() over(partition by id order by sname desc ) AS rn
  FROM $mysqlName.$mysqlDB.t2
 ) tmp WHERE id < 999 AND sname = 'aa'

执行过程

  • 跟 Aggregate 类似,判断 project中的字段是否是确定的
  • 找出可以下推的,也就是出现的 windows 函数中的字段
  • 将不能下推的重新组合,能下推的移动到 windows 算子下面

逻辑计划:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
== Analyzed Logical Plan ==
id: int, sname: string
Project [id#1, sname#2]
+- Filter ((id#1 < 999) AND (sname#2 = aa))
   +- SubqueryAlias tmp
      +- Project [id#1, sname#2, rn#0]
         +- Project [id#1, sname#2, rn#0, rn#0]
            +- Window [row_number() windowspecdefinition(id#1, sname#2 DESC NULLS LAST, specifiedwindowframe(RowFrame,
			unboundedpreceding$(), currentrow$())) AS rn#0], [id#1], [sname#2 DESC NULLS LAST]
               +- Project [id#1, sname#2]
                  +- SubqueryAlias t2
                     +- RelationV2[id#1, sname#2] t2

优化后的中间结果

1
2
3
4
5
Filter (sname#2 = aa)
+- Window [row_number() windowspecdefinition(id#1, sname#2 DESC NULLS LAST, specifiedwindowframe(RowFrame,
 unboundedpreceding$(), currentrow$())) AS rn#0], [id#1], [sname#2 DESC NULLS LAST]
   +- Filter (id#1 < 999)
      +- RelationV2[id#1, sname#2]  t2

Case 4 – Push predicate through Union
SQL如下:

1
2
3
4
5
SELECT tmpc FROM (
    SELECT  id AS tmpc  FROM t1
UNION ALL
    SELECT  id AS tmpc  FROM t2
) tmp WHERE tmpc >1 AND rand()>0.1

逻辑计划:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
== Analyzed Logical Plan ==
tmpc: int
Project [tmpc#0]
+- Filter ((tmpc#0 > 1) AND (rand(5842271494822007178) > cast(0.1 asf double)))
   +- SubqueryAlias tmp
      +- Union false, false
         :- Project [id#2 AS tmpc#0]
         :  +- SubqueryAlias t1
         :     +- RelationV2[id#2, sname#3]  t1
         +- Project [id#4 AS tmpc#1]
            +- SubqueryAlias t2
               +- RelationV2[id#4, sname#5] 2 t2

优化后的中间结果,之前最外层的 tmpc > 1 被下推到了 UNION的下层了,而不确定的字段 rand 则没被下推

1
2
3
4
5
6
7
8
Filter (rand(5842271494822007178) > cast(0.1 as double))
+- Union false, false
   :- Filter (tmpc#0 > 1)
   :  +- Project [id#2 AS tmpc#0]
   :     +- RelationV2[id#2, sname#3]  t1
   +- Filter (tmpc#1 > 1)
      +- Project [id#4 AS tmpc#1]
         +- RelationV2[id#4, sname#5]  t2

Case 5 – Push predicate through all other supported operators
其他一些可以下推的场景

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
  def canPushThrough(p: UnaryNode): Boolean = p match {
    // Note that some operators (e.g. project, aggregate, union) are being handled separately
    // (earlier in this rule).
    case _: AppendColumns => true
    case _: Distinct => true
    case _: Generate => true
    case _: Pivot => true
    case _: RepartitionByExpression => true
    case _: Repartition => true
    case _: RebalancePartitions => true
    case _: ScriptTransformation => true
    case _: Sort => true
    case _: BatchEvalPython => true
    case _: ArrowEvalPython => true
    case _: Expand => true
    case _ => false
  }

SQL

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
SELECT order_id FROM
(
    SELECT * FROM item
	pivot (
	    count(id)
		FOR item_name IN (
		    'item1' item_1
		)
	)
) WHERE order_id > 10


PushPredicateThroughJoin



图片来自这里

case-1,Filter+inner join:把过滤条件下推到参加Join的两端
步骤

  • 这里会取出 left 条件, right 条件,以及通用条件,然后分别跟 left子节点,right子节点重新拼接
  • 也就是将 Filter放到 left、right 节点的上面,实现下推,最后将两个新的 子节点(此时左右都是 Filter)
  • 拼接成一个新的 JOIN
1
2
3
SELECT a.sname, b.sname FROM t1 AS a  JOIN t2 AS b
ON a.id = b.id
WHERE a.id < 1 AND b.id > 99

逻辑计划:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
== Analyzed Logical Plan ==
sname: string, sname: string
Project [sname#1, sname#3]
+- Filter ((id#0 < 1) AND (id#2 > 99))
   +- Join Inner, (id#0 = id#2)
      :- SubqueryAlias a
      :  +- SubqueryAlias t1
      :     +- RelationV2[id#0, sname#1]  t1
      +- SubqueryAlias b
         +- SubqueryAlias t2
            +- RelationV2[id#2, sname#3]  t2

优化的中间结果:

1
2
3
4
5
Join Inner, (id#0 = id#2)
:- Filter (id#0 < 1)
:  +- RelationV2[id#0, sname#1]  t1
+- Filter (id#2 > 99)
   +- RelationV2[id#2, sname#3]  t2

case-2,Filter+left join,把where子句的左侧数据表的过滤条件下推到左侧数据表
跟 case-1 差不多,只是下推了左节点的条件

1
2
3
SELECT a.sname, b.sname FROM t1 AS a  LEFT OUTER JOIN t2 AS b   
ON a.id = b.id
WHERE a.id < 1

逻辑计划:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
== Analyzed Logical Plan ==
sname: string, sname: string
Project [sname#1, sname#3]
+- Filter (id#0 < 1)
   +- Join LeftOuter, (id#0 = id#2)
      :- SubqueryAlias a
      :  +- SubqueryAlias t1
      :     +- RelationV2[id#0, sname#1]  t1
      +- SubqueryAlias b
         +- SubqueryAlias t2
            +- RelationV2[id#2, sname#3]  t2

中间结果:

1
2
3
4
Join LeftOuter, (id#0 = id#2)
:- Filter (id#0 < 1)
:  +- RelationV2[id#0, sname#1]  t1
+- RelationV2[id#2, sname#3]  t2

case-3,Filter+left join,把where子句的右侧数据表的过滤条件下推到右侧数据表

1
2
3
SELECT a.sname, b.sname FROM t1 AS a  RIGHT OUTER JOIN t2 AS b
ON a.id = b.id
WHERE b.id > 99

逻辑计划:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
== Analyzed Logical Plan ==
sname: string, sname: string
Project [sname#1, sname#3]
+- Filter (id#2 > 99)
   +- Join RightOuter, (id#0 = id#2)
      :- SubqueryAlias a
      :  +- SubqueryAlias t1
      :     +- RelationV2[id#0, sname#1] 1 t1
      +- SubqueryAlias b
         +- SubqueryAlias t2
            +- RelationV2[id#2, sname#3]  t2

中间结果:

1
2
3
4
Join RightOuter, (id#0 = id#2)
:- RelationV2[id#0, sname#1]  t1
+- Filter (id#2 > 99)
   +- RelationV2[id#2, sname#3]  t2

case-4,full join,这个很多时候会被转换为 left、right或者 inner
然后再进一步下推
带 where 的 full join 实际是

  • EliminateOuterJoin
  • PushPredicateThroughJoin

共同完成的转换,下推的


case-5,join 的 ON 条件下推,这里仅列出 INNER 的情况,其他场景类似

1
2
3
SELECT a.sname, b.sname FROM t1 AS a  JOIN t2 AS b
ON a.id = b.id
 AND a.id > 99 AND b.sname = 'hehe'

逻辑计划:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
== Analyzed Logical Plan ==
sname: string, sname: string
Project [sname#1, sname#3]
+- Join Inner, (((id#0 = id#2) AND (id#0 > 99)) AND (sname#3 = hehe))
   :- SubqueryAlias a
   :  +- SubqueryAlias t1
   :     +- RelationV2[id#0, sname#1]  t1
   +- SubqueryAlias b
      +- SubqueryAlias t2
         +- RelationV2[id#2, sname#3]  t2

优化后的中间结果:

1
2
3
4
5
Join Inner, (id#0 = id#2)
:- Filter (id#0 > 99)
:  +- RelationV2[id#0, sname#1]  t1
+- Filter (sname#3 = hehe)
   +- RelationV2[id#2, sname#3]  t2

LimitPushDown

可以下推的包括

  • 子节点为 Union
  • 子节点为 JOIN,分好几种情况
    • RightOuter 时,join 条件不空,可以下推 limit 到join 右边
    • LeftOuter 时,join条件不空,可以下推 limit 到 join 左边
    • InnerLike 时,条件不空,下推到左右两边
    • LeftSemi,LeftAnti 时,条件不空,下推到 join 的左边
  • 子节点为 PROJECT
    • 孙子为 JOIN
    • 孙子为 BatchEvalPython
    • 孙子为 ArrowEvalPython
  • limit 1,并且字节为 Aggregate
  • limit 1,并且子节点为 project
  • 子节点为 OFFSET
  • 子节点为 BatchEvalPython
  • 子节点为 ArrowEvalPython

SQL:

1
2
3
4
SELECT a.sname, b.sname FROM t1 AS a
   LEFT OUTER JOIN
   t2 AS b
ON a.id = b.id  LIMIT 1

逻辑计划:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
== Analyzed Logical Plan ==
sname: string, sname: string
GlobalLimit 1
+- LocalLimit 1
   +- Project [sname#1, sname#3]
      +- Join LeftOuter, (id#0 = id#2)
         :- SubqueryAlias a
         :  +- SubqueryAlias t1
         :     +- RelationV2[id#0, sname#1] t1
         +- SubqueryAlias b
            +- SubqueryAlias t2
               +- RelationV2[id#2, sname#3]  t2

优化后:

1
2
3
4
5
6
7
8
== Optimized Logical Plan ==
GlobalLimit 1
+- LocalLimit 1
   +- Project [sname#1, sname#3]
      +- Join LeftOuter, (id#0 = id#2)
         :- LocalLimit 1
         :  +- RelationV2[id#0, sname#1] t1
         +- RelationV2[id#2, sname#3] t2

ColumnPruning

试图消除查询计划中不需要的列读取

1
2
3
4
5
SELECT  id, m_id FROM
(
    SELECT id, max(id) as m_id,  max(parent_id) as m_pid
    FROM $mysqlName.$mysqlDB.zz_3 GROUP BY id
)

逻辑计划:

1
2
3
4
5
6
7
== Analyzed Logical Plan ==
id: int, m_id: int
Project [id#2, m_id#0]
+- SubqueryAlias __auto_generated_subquery_name
   +- Aggregate [id#2], [id#2, max(id#2) AS m_id#0, max(parent_id#3) AS m_pid#1]
      +- SubqueryAlias zz_3
         +- RelationV2[id#2, parent_id#3, sname#4]  zz_3

中间结果:

1
2
3
Project [id#2, m_id#0]
+- Aggregate [id#2], [id#2, max(id#2) AS m_id#0]
   +- RelationV2[id#2, parent_id#3, sname#4]  zz_3

参考