Spark Planner

优化之后, Spark Planner 会生成 一个、或者多个候选的物理计划
之后根据代价模型,选择一个代价最小的支持,不过当前 spark 只是简单的拿了第一个候选计划,没做什么 CBO
https://malinxiao.files.wordpress.com/2021/12/image-358.png

SparkPlanner 继承了 SparkStrategies,他们都是 QueryPlanner 的子类,是平台独立的
所以他适合于任何平台,而不是 Spark 平台
https://malinxiao.files.wordpress.com/2021/12/image-359.png

SparkPlan 的继承关系如下,他也是 TreeNode 体系下的,他本身也递归的包含若干个 SparkPlan 节点
https://malinxiao.files.wordpress.com/2021/12/image-96.png

SparkPlan 包含了三个子类

  • LeafExecNode
  • UnaryExecNode
  • BinaryExecNode
  • Others,包括:CodegenSupport, DummySparkPlan, and MyPlan
    https://malinxiao.files.wordpress.com/2021/12/image-360.png

转换方式
LogicalPlan to SparkPlan,包含一对一的映射
https://malinxiao.files.wordpress.com/2021/12/image-361.png

SparkStrategy 的 BasicOperators

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
      case logical.Sort(sortExprs, global, child) =>
        execution.SortExec(sortExprs, global, planLater(child)) :: Nil
      case logical.Project(projectList, child) =>
        execution.ProjectExec(projectList, planLater(child)) :: Nil
      case logical.Filter(condition, child) =>
        execution.FilterExec(condition, planLater(child)) :: Nil
      case f: logical.TypedFilter =>
        execution.FilterExec(f.typedCondition(f.deserializer), planLater(f.child)) :: Nil
      case e @ logical.Expand(_, _, child) =>
        execution.ExpandExec(e.projections, e.output, planLater(child)) :: Nil
      case logical.Sample(lb, ub, withReplacement, seed, child) =>
        execution.SampleExec(lb, ub, withReplacement, seed, planLater(child)) :: Nil
      case logical.LocalRelation(output, data, _) =>
        LocalTableScanExec(output, data) :: Nil
      case CommandResult(output, _, plan, data) => CommandResultExec(output, plan, data) :: Nil
      // We should match the combination of limit and offset first, to get the optimal physical
      // plan, instead of planning limit and offset separately.
      case LimitAndOffset(limit, offset, child) =>
        GlobalLimitExec(limit, planLater(child), offset) :: Nil
      case OffsetAndLimit(offset, limit, child) =>
        // 'Offset a' then 'Limit b' is the same as 'Limit a + b' then 'Offset a'.
        GlobalLimitExec(limit = offset + limit, child = planLater(child), offset = offset) :: Nil
      case logical.LocalLimit(IntegerLiteral(limit), child) =>
        execution.LocalLimitExec(limit, planLater(child)) :: Nil
      case logical.GlobalLimit(IntegerLiteral(limit), child) =>
        execution.GlobalLimitExec(limit, planLater(child)) :: Nil
      case logical.Offset(IntegerLiteral(offset), child) =>
        GlobalLimitExec(child = planLater(child), offset = offset) :: Nil
      case union: logical.Union =>
        execution.UnionExec(union.children.map(planLater)) :: Nil
      case g @ logical.Generate(generator, _, outer, _, _, child) =>
        execution.GenerateExec(
          generator, g.requiredChildOutput, outer,
          g.qualifiedGeneratorOutput, planLater(child)) :: Nil

另一种是:一对多的转换
https://malinxiao.files.wordpress.com/2021/12/image-362.png

参考