Spark性能调优

原理部分

RDD

RDD 具有 4 大属性

属性名 成员类型 属性含义 RDD特性 刻画方向
partitions 变量 RDD的所有数据分片实体 分布式 横向
partitioner 方法 划分数据分片的规则 分布式 横向
dependencies 变量 生成该RDD所一类的父RDD 容错性 纵向
compute 方法 生成该RDD 的计算接口 容错性 纵向

DAG的执行

DAG Direct Acyclic Graph

  • 通过无环图来表示RDD之间的各种依赖关系
  • 从开发者的视角出发,DAG 的构建是通过在分布式数据集上不停地调用算子来完成的
  • Stages 的划分:以 Actions 算子为起点,从后向前回溯 DAG,以 Shuffle 操作为边界去划分 Stages

DAG的生成

  • 应用开发实际上就是灵活运用算子实现业务逻辑的过程
  • 开发者在分布式数据集如 RDD、 DataFrame 或 Dataset 之上调用算子、封装计算逻辑
  • 这个过程会衍生新的子 RDD。与此同时,子 RDD 会把 dependencies 属性赋值到父 RDD
  • 把 compute 属性赋值到算子封装的计算逻辑
  • 以此类推,在子 RDD 之上,开发者还会继续调用其他算子,衍生出新的 RDD
  • 如此往复便有了 DAG

从开发者构建 DAG,到 DAG 转化的分布式任务在分布式环境中执行,其间会经历如下 4 个阶段

  • 回溯 DAG 并划分 Stages
  • 在 Stages 中创建分布式任务
  • 分布式任务的分发
  • 分布式任务的执行

map-reduce 的工作过程

spark 的内存计算,将 Stages 内部的所有操作,比如 清洗、过滤、加工 一次性糅合成一个函数
糅合后的函数一次性计算出结果,不会产生中间结果,这样就加速的运算
由于计算的融合只发生在 Stages 内部,而 Shuffle 是切割 Stages 的边界
因此一旦发生 Shuffle,内存计算的代码融合就会中断

任务调度

Spark 调度系统的工作流程包含如下 5 个步骤:

  • 将 DAG 拆分为不同的运行阶段 Stages
  • 创建分布式任务 Tasks 和任务组 TaskSet
  • 获取集群内可用的硬件资源情况
  • 按照调度规则决定优先调度哪些任务 / 组
  • 依序将分布式任务分发到执行器 Executor。

系统调度的流程步骤

调度系统流程步骤 调度系统核心组件
将DAG拆分为不同的运行阶段Stages DAGScheduler
创建分布式任务Tasks和任务组TaskSet DAGScheduler
获取集群内可用硬件资源情况 SchedulerBackend
按照调度规则决定优先调度哪些任务/组 TaskScheduler
依序将分布式任务分发到执行器Executor SchedulerBackend

DAGScheduler

  • 把用户的 DAG 拆分为 Stages
  • 在 Stages内创建计算任务 Tasks,这些任务囊括了用户通过组合不同算子实现的数据转换逻辑
  • 执行器 Executors 接收到 Tasks,会将其中封装的计算函数应用于分布式数据分片,去执行分布式的计算过程
  • 在分发任务之前,调度系统得先判断哪些节点的计算资源空闲,然后再把任务分发过去

SchedulerBackend

  • 提供了 Standalone、YARN、mesos、k8s 的对应调度实现类
  • SchedulerBackend 内部使用了 ExecutorDataMap,记录每个计算节点的资源状态
  • kye是 executor字符串
  • value是ExecutorData 结构,封装了RPC地址,主机地址、可用CPU,满配CPU等
  • 对内,用ExecutorData 对 Executor进行资源画像
  • 对外,以 WorkerOffer 为粒度提供计算资源

TaskScheduler

  • 要调度的计算任务有了,就是 DAGScheduler 通过 Stages 创建的 Tasks
  • 用于调度任务的计算资源也有了,即 SchedulerBackend 提供的一个又一个 WorkerOffer
  • DAGScheduler 就是需求端,SchedulerBackend 就是供给端
  • TaskScheduler 的职责是,基于既定的规则与策略达成供需双方的匹配与撮合
  • TaskScheduler 的调度策略分为两个层次,一个是不同 Stages 之间的调度优先级,一个是 Stages 内不同任务之间的调度优先级
  • 对于这种 Stages 之间的任务调度,TaskScheduler 提供了 2 种调度模式,分别是 FIFO(先到先得)和 FAIR(公平调度)
  • 对于 Stages 内的任务,当 TaskScheduler 接收到来自 SchedulerBackend 的 WorkerOffer 后,TaskScheduler 会优先挑选那些满足本地性级别要求的任务进行分发
  • 本地性级别有 4 种:Process local(一个executor进程内) < Node local < Rack local < Any
  • 分别是:进程本地性、节点本地性、机架本地性、跨机架本地性

DAGScheduler 划分 Stages,创建 Task时,会为每个任务执行本地性级别,记录该任务有意向的计算节点地址,甚至是 executor 的进程ID
任务自带调度意愿,它通过本地性级别告诉 TaskScheduler 自己更乐意被调度到哪里去
Spark 调度系统的原则是尽可能地让数据呆在原地、保持不动,同时尽可能地把承载计算任务的代码分发到离数据最近的地方,从而最大限度地降低分布式系统中的网络开销

TaskScheduler 调度步骤

  • TaskScheduler 根据本地性级别遴选出待计算任务之后,先对这些任务进行序列化
  • 交给 SchedulerBackend,SchedulerBackend 根据 ExecutorData 中记录的 RPC 地址和主机地址
  • 再将序列化的任务通过网络分发到目的主机的 Executor 中去
  • Executor 接收到任务之后,把任务交由内置的线程池,线程池中的多线程则并发地在不同数据分片之上执行任务中封装的数据处理函数,从而实现分布式计算

存储系统

主要用于存储 3 方面的数据

  • RDD 缓存,一些计算成本和访问频率较高的 RDD,可以以缓存的形式物化到内存或磁盘中。这样一来,既可以避免 DAG 频繁回溯的计算开销,也能有效提升端到端的执行性能
  • Shuffle 中间文件,Shuffle 中间文件的位置信息,都是由 Spark 存储系统保存并维护的,没有存储系统,Shuffle 是玩不转的
  • 广播变量,利用存储系统,广播变量可以在 Executors 进程范畴内保存全量数据,让任务以 Process local 的本地性级别,来共享广播变量中携带的全量数据

RDD 缓存指的是将 RDD 以缓存的形式物化到内存或磁盘的过程
Shuffle 中间文件实际上就是 Shuffle Map 阶段的输出结果,这些结果会以文件的形式暂存于本地磁盘
在 Shuffle Reduce 阶段,Reducer 通过网络拉取这些中间文件用于聚合计算
广播变量往往用于在集群范围内分发访问频率较高的小数据
利用存储系统,广播变量可以在 Executors 进程范畴内保存全量数据

存储系统相关的组件

  • BlockManager
  • BlockManagerMaster
  • MemoryStore
  • DiskStore
  • DiskBlockManager
  • BlockManager 是其中最为重要的组件,它在 Executors 端负责统一管理和协调数据的本地存取与跨节点传输
  • BlockManager定期跟BlockManagerMaster交互汇报本地数据元信息
  • 还会定期拉取全局数据存储状态,不同Executor的BlockManager之间,也会以Server/Client模式跨节点推送和拉取数据块

MemoryStore

  • MemoryStore 同时支持存储对象值和字节数组这两种不同的数据形式
  • 并且统一采用 MemoryEntry 数据抽象对它们进行封装
  • 实现类:DeserializedMemoryEntry、 SerializedMemoryEntry
  • RDD 的数据分片与存储系统的 Block 一一对应,也就是说一个 RDD 数据分片会被物化成一个内存或磁盘上的 Block。

缓存 RDD 的过程,就是将 RDD 计算数据的迭代器(Iterator)进行物化的过程

  • 通过调用 putIteratorAsValues 或是 putIteratorAsBytes 方法,把 RDD 迭代器展开为数据值,然后把这些数据值暂存到一个叫做 ValuesHolder 的数据结构里,Unroll
  • 为了节省内存开销,我们可以在存储数据值的 ValuesHolder 上直接调用 toArray 或是 toByteBuffer 操作,把 ValuesHolder 转换为 MemoryEntry 数据结构,从 Unroll memory 到 Storage memory 的 Transfer(转移)
  • 这些包含 RDD 数据值的 MemoryEntry 和与之对应的 BlockId,会被一起存入 Key 为 BlockId、Value 是 MemoryEntry 引用的链式哈希字典中

DiskStore

  • putBytes 方法把字节序列存入磁盘文件
  • getBytes 方法将文件内容转换为数据块
  • 使用了 DiskBlockManager 这个给力的帮手
  • DiskBlockManager 的主要职责就是,记录逻辑数据块 Block 与磁盘文件系统中物理文件的对应关系,每个 Block 都对应一个磁盘文件

DiskBlockManager 在初始化的时候,首先根据配置项 spark.local.dir 在磁盘的相应位置创建文件目录
在 spark.local.dir 指定的所有目录下分别创建子目录,子目录的个数由配置项 spark.diskStore.subDirectories 控制,它默认是 64
所有这些目录均用于存储通过 DiskStore 进行物化的数据文件,如 RDD 缓存文件、Shuffle 中间结果文件等

Spark 默认采用 SortShuffleManager 来管理 Stages 间的数据分发,在 Shuffle write 过程中,有 3 类结果文件

  • temp_shuffle_XXX,暂存文件最后会被删除
  • shuffle_XXX.data,中间文件
  • shuffle_XXX.index,记录data文件内不同分区的偏移地址

DiskStore 与 DiskBlockManager 的交互过程

  • shuffle write阶段,Shuffle manager 通过 BlockManager 调用 DiskStore 的 putBytes 方法将数据块写入文件
  • 文件由 DiskBlockManager 创建,文件名就是 putBytes 方法中的 Block ID
  • 这些文件会以“temp_shuffle”或“shuffle”开头,保存在 spark.local.dir 目录下的子目录里
  • Shuffle read 阶段,Shuffle manager 再次通过 BlockManager 调用 DiskStore 的 getBytes 方法,读取 data 文件和 index 文件
  • 将文件内容转化为数据块,最终这些数据块会通过网络分发到 Reducer 端进行聚合计算

内存管理

分为两种

  • 堆内内存(On-heap Memory)
  • 堆外内存(Off-heap Memory)
  • 堆内内存的申请与释放统一由 JVM 代劳
  • 堆外内存则不同,Spark 通过调用 Unsafe 的 allocateMemory 和 freeMemory 方法直接在操作系统内存中申请、释放内存空间

堆外内存、堆内内存的划分

总结关系
动态分配内存

  • Execution Memory 和 Storage Memory 之间可以相互转化
  • Execution Memory 和 Storage Memory,如果对方的内存空间有空闲,双方就都可以抢占
  • 对于 RDD 缓存任务抢占的执行内存,当执行任务有内存需要时,RDD 缓存任务必须立即归还抢占的内存,涉及的 RDD 缓存数据要么落盘、要么清除
  • 对于分布式计算任务抢占的 Storage Memory 内存空间,即便 RDD 缓存任务有收回内存的需要,也要等到任务执行完毕才能释放

一个例子

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// 定义 dict 字典,这个字典在 Driver 端生成,它在后续的 RDD 调用中会随着任务一起分发到 Executor 端
val dict: List[String] = List(“spark”, “scala”)
val words: RDD[String] = sparkContext.textFile(~/words.csv”)
// 用 dict 字典对 words 进行过滤,此时 dict 已分发到 Executor 端,Executor 将其存储在堆内存中,用于对 words 数据分片中的字符串进行过滤
// Dict 字典属于开发者自定义数据结构,因此,Executor 将其存储在 User Memory 区域
val keywords: RDD[String] = words.filter(word => dict.contains(word))
// 用 cache 和 count 对 keywords RDD 进行缓存
// 占用的是 Storage Memory 内存区域
keywords.cache
keywords.count
// 会引入shuffle,会消耗 Execution Memory 区域中的内存
keywords.map((_, 1)).reduceByKey(_ + _).collect

一个例子

spark.executor.memory = 20G
spark.memory.offHeap.size = 10G
spark.memory.fraction = 0.8
spark.memory.storageFraction = 0.6

保留内存       300M
用户内存:     20G * (1 - 0.8) = 4G
storage内存:  20G * (0.8 * 0.6) = 9.6G
execution内存:20G 0.8 * (1 - 0.6) = 6.4G
堆外
storage内存:  10G * 0.6 = 6G
execution内存:10G *(1 - 0.6) = 4G

通用性能调优

应用开发三原则

直接使用现有的技术

  • 钨丝计划,在数据结构方面,Tungsten 自定义了紧凑的二进制格式
  • ungsten 利用 Java Unsafe API 开辟堆外(Off Heap Memory)内存来管理对象
  • Tungsten 用全阶段代码生成(Whol Stage Code Generation)取代火山迭代模型

AQE Adaptive Query Execution 自适应查询执行

  • AQE 可以让 Spark 在运行时的不同阶段,结合实时的运行时状态,周期性地动态调整前面的逻辑计划,然后根据再优化的逻辑计划
  • 重新选定最优的物理计划,从而调整运行时后续阶段的执行方式
  • 分区自动合并
  • 数据倾斜
  • join 策略调整

打开 AQE

spark.sql.adaptive.enabled=TRUE

能省则省、能拖则拖

  • 尽量把能节省数据扫描量和数据处理量的操作往前推
  • 尽力消灭掉 Shuffle,省去数据落盘与分发的开销
  • 如果不能干掉 Shuffle,尽可能地把涉及 Shuffle 的操作拖到最后去执行

配置项速查手册

官网关于配置的信息
https://spark.apache.org/docs/latest/configuration.html

跟性能相关的

  • 硬件资源类包含的是与 CPU、内存、磁盘有关的配置项
  • Shuffle 类是专门针对 Shuffle 操作的
  • SparkSQL 的调优

堆内 vs 堆外

  • 对于需要处理的数据集,如果数据模式比较扁平,而且字段多是定长数据类型,就更多地使用堆外内存
  • 如果数据模式很复杂,嵌套结构或变长字段很多,就更多采用 JVM 堆内内存会更加稳妥

shuffle

  • 在 Shuffle 过程中,对于不需要排序和聚合的操作
  • 可以通过控制 spark.shuffle.sort.bypassMergeThreshold 参数
  • 来避免 Shuffle 执行过程中引入的排序环节

AQE 分区合并

  • AQE 事先并不判断哪些分区足够小,而是按照分区编号进行扫描,当扫描量超过“目标尺寸”时,就合并一次
  • 假设,Shuffle 过后数据大小为 20GB,minPartitionNum 设置为 200
  • 反推过来,每个分区的尺寸就是 20GB / 200 = 100MB
  • 再假设,advisoryPartitionSizeInBytes 设置为 200MB,最终的目标分区尺寸就是取(100MB,200MB)之间的最小值

AQE 数据倾斜处理

  • 分区尺寸必须要大于 spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes 参数的设定值,才有可能被判定为倾斜分区
  • AQE 统计所有数据分区大小并排序,取中位数作为放大基数,尺寸大于中位数一定倍数的分区会被判定为倾斜分区
  • 中位数的放大倍数也是由参数 spark.sql.adaptive.skewJoin.skewedPartitionFactor 控制
  • 假设数据表 A 有 3 个分区,分区大小分别是 80MB、100MB 和 512MB
  • 这些分区按大小个排序后的中位数是 100MB,因为 skewedPartitionFactor 的默认值是 5 倍
  • 所以大于 100MB * 5 = 500MB 的分区才有可能被判定为倾斜分区
  • 还要看 skewedPartitionThresholdInBytes 配置项,这个参数的默认值是 256MB。对于那些满足中位数条件的分区,必须要大于 256MB,Spark 才会把这个分区最终判定为倾斜分区
  • 拆分的时候会用到advisoryPartitionSizeInBytes 参数,512M倾斜分区会变成 512/256,被拆分成两个
  • 每个分分区的尺寸都不大于 256M

AQE 的join策略

  • Join 策略调整是一种动态优化机制,对于刚才的两张大表,AQE 会在数据表完成过滤操作之后动态计算剩余数据量
  • 当数据量满足广播条件时,AQE 会重新调整逻辑执行计划,在新的逻辑计划中把 Shuffle Joins 降级为 Broadcast Join
  • 运行时的数据量估算要比编译时准确得多,因此 AQE 的动态 Join 策略调整相比静态优化会更可靠、更稳定
  • 启用动态 Join 策略调整还有个前提,也就是要满足 nonEmptyPartitionRatioForBroadcastJoin 参数的限制,默认为 0.2,才能成功触发 Broadcast Join 降级
  • 大表过滤之前有 100 个分区,Filter 操作之后,有 85 个分区内的数据因为不满足过滤条件
  • 在过滤之后都变成了没有任何数据的空分区,另外的 15 个分区还保留着满足过滤条件的数据
  • 因为 15% 小于 0.2,所以这个例子中的大表会成功触发 Broadcast Join 降级
  • 如果你想要充分利用 Broadcast Join 的优势,可以考虑把这个参数适当调高

Shuffle 原理

shuffle 的过程,就类似天女散花的过程
本质就是 洗牌,一个map阶段,一个reduce阶段

map阶段

  • Map 阶段最终生产的数据会以中间文件的形式物化到磁盘中
  • 这些中间文件就存储在 spark.local.dir 设置的文件目录里
  • 中间文件包含两种类型:一类是后缀为 data 的数据文件,存储的内容是 Map 阶段生产的待分发数据
  • 另一类是后缀为 index 的索引文件,它记录的是数据文件中不同分区的偏移地址
  • 分区数量与 Reduce 阶段的并行度保持一致
  • Map 阶段每一个 Task 的执行流程都是一样的,每个 Task 最终都会生成一个数据文件和一个索引文件
  • 中间文件的数量与 Map 阶段的并行度保持一致
  • 有多少个 Task,Map 阶段就会生产相应数量的数据文件和索引文件

中间数据

  • 例子中,哪种花对应到哪个课桌是实现定义好的
  • Spark 中,每条数据记录应该分发到 哪个目标分区,是根据 key 的hash计算的
  • 目标分区计算好之后,Map Task 会把每条数据记录和它的目标分区,放到一个特殊的数据结构里,这个数据结构叫做“PartitionedPairBuffer”
  • 每条数据记录都会占用数组中相邻的两个元素空间,第一个元素是(目标分区,Key),第二个元素是 Value
  • 每个Task的内存是有限的,当超过额之后,需要溢出
    PartitionedPairBuffer 结构如下,假设只能放 4个元素


map阶段总结如下:

  • 对于分片中的数据记录,逐一计算其目标分区,并将其填充到 PartitionedPairBuffer;
  • PartitionedPairBuffer 填满后,如果分片中还有未处理的数据记录,就对 Buffer 中的数据记录按(目标分区 ID,Key)进行排序,将所有数据溢出到临时文件,同时清空缓存
  • 重复步骤 1、2,直到分片中所有的数据记录都被处理
  • 对所有临时文件和 PartitionedPairBuffer 归并排序,最终生成数据文件和索引文件。

reduce

  • groupByKey 使用的是 PartitionedPairBuffer,不会累加
  • reduceByKey 使用的是 PartitionedAppendOnlyMap,会累加
  • 需要的内存更少,产生中间溢出的可能性更小,效率更高
  • 要避免在聚合类的计算需求中,引入收集类的算子
  • Shuffle 在 Reduce 阶段是主动地从 Map 端的中间文件中拉取数据
  • 每个 Map Task 生成的数据文件,都包含所有 Reduce Task 所需的部分数据。因此,任何一个 Reduce Task 要想完成计算,必须先从所有 Map Task 的中间文件里去拉取属于自己的那部分数据,索引文件正是用于帮助判定哪部分数据属于哪个 Reduce Task
  • Reduce Task 通过网络拉取中间文件的过程,实际上就是不同 Stages 之间数据分发的过程


可以看到,stage 0 就对应了 map,stage 1 对应reduce,而stage1 也作为map,提供给stage 2

shuffle的性能问题

  • 它需要消耗所有的硬件资源,消耗大量内存,甚至溢出到磁盘做归并排序,呈指数级增长的跨节点数据分发
  • Shuffle 消耗的不同硬件资源之间很难达到平衡

广播变量

修改代码

1
2
3
4
5
val dict = List(“spark”, “tune”)
val bc = spark.sparkContext.broadcast(dict)
val words = spark.sparkContext.textFile(~/words.csv”)
val keywords = words.filter(word => bc.value.contains(word))
keywords.map((_, 1)).reduceByKey(_ + _).collect

广播变量

  • 由 Driver 端以 Executors 为粒度分发
  • 每一个 Executors 接收到广播变量之后,将其交给 BlockManager 管理

广播分布式数据集

1
2
3
val userFile: String = “hdfs://ip:port/rootDir/userData”
val df: DataFrame = spark.read.parquet(userFile)
val bc_df: Broadcast[DataFrame] = spark.sparkContext.broadcast(df)
  • Driver 从所有的 Executors 拉取这些数据分区,然后在本地构建全量数据
  • 之后,与从普通变量创建广播变量的过程类似
  • Driver 把汇总好的全量数据分发给各个 Executors
  • Executors 将接收到的全量数据缓存到存储系统的 BlockManager 中

shuffle join 的过程

  • 对参与关联的 左右表分别进行 shuffle
  • Shuffle 的分区规则是先对 Join keys 计算哈希值,再把哈希值对分区数取模
  • 由于左右表的分区数是一致的,因此 Shuffle 过后,一定能够保证 userID 相同的交易记录和用户数据坐落在同一个 Executors 内
  • Shuffle 完成之后,第二步就是在同一个 Executors 内,Reduce task 就可以对 userID 一致的记录进行关联操作

用广播解决 shuffle问题

  • Driver 从所有 Executors 收集 userDF 所属的所有数据分片
  • 在本地汇总用户数据,然后给每一个 Executors 都发送一份全量数据的拷贝
  • 交易表的数据分区待在原地、保持不动,就可以轻松地关联到一致的用户数据

广播优化
https://issues.apache.org/jira/browse/SPARK-17556

配置广播的阈值

# 默认为 10M,可以改成 2G
spark.sql.autoBroadcastJoinThreshold=10M
  • 使用广播阈值配置项让 Spark 优先选择 Broadcast Joins 的关键
  • 就是要确保至少有一张表的存储尺寸小于广播阈值
  • 这个是数据在磁盘上的大小,但是加载到内存可能会翻倍

动态的计算出表精确大小

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// 把要预估大小的数据表缓存到内存,比如直接在 DataFrame 或是 Dataset 上调用 cache 方法
val df: DataFrame = _
df.cache.count
 
// 读取 Spark SQL 执行计划的统计数据
val plan = df.queryExecution.logical
val estimated: BigInt = spark
.sessionState
.executePlan(plan)
.optimizedPlan
.stats
.sizeInBytes

SQL 的广播提示

1
2
3
4
5
6
7
val table1: DataFrame = spark.read.parquet(path1)
val table2: DataFrame = spark.read.parquet(path2)
table1.createOrReplaceTempView("t1")
table2.createOrReplaceTempView("t2")
 
val query: String = “select /*+ broadcast(t2) */ * from t1 inner join t2 on t1.key = t2.key”
val queryResutls: DataFrame = spark.sql(query)

另一种方式

1
table1.join(table2.hint(“broadcast”), Seq(“key”), “inner”)

join hints 如果表名拼写错误则不会检查
使用 broadcast 函数来广播

1
2
import org.apache.spark.sql.functions.broadcast
table1.join(broadcast(table2), Seq(“key”), “inner”)

广播的限制

  • 以广播阈值配置为主,以强制广播为辅
  • 广播阈值的设置,更多的是把选择权交给 Spark SQL,尤其是在 AQE 的机制下,动态 Join 策略调整需要这样的设置在运行时做出选择
  • 强制广播更多的是开发者以专家经验去指导 Spark SQL 该如何选择运行时策略

广播的问题

  • 从性能上来讲,Driver 在创建广播变量的过程中,需要拉取分布式数据集所有的数据分片
  • 从功能上来讲,并不是所有的 Joins 类型都可以转换为 Broadcast Joins
  • Broadcast Joins 不支持全连接(Full Outer Joins)
  • 在所有的数据关联中,我们不能广播基表
  • 在左连接(Left Outer Join)中,我们只能广播右表;在右连接(Right Outer Join)中,我们只能广播左表
  • 在下面的代码中,即便我们强制用 broadcast 函数进行广播,Spark SQL 在运行时还是会选择 Shuffle Joins
1
2
3
import org.apache.spark.sql.functions.broadcast
broadcast (table1).join(table2, Seq(“key”), “left”)
table1.join(broadcast(table2), Seq(“key”), “right”)

不能广播 基表的原因?
left join 时,左表需要 知道右表的全局信息
左侧数据集的某个记录没有与右侧数据集的任何记录匹配,那么该记录将被保留,并在右侧的连接结果中以 null 值填充相应的右侧字段
如果广播左表,右表就没有全局信息了,只知道这个分片是否满足条件,但全局的不清楚
这样的话,数据就不准确了

高效的利用 CPU

性能调优的最终目的,是在所有参与计算的硬件资源之间寻求协同与平衡
Spark 中 CPU 与内存的平衡,其实就是 CPU 与执行内存之间的协同与配比
并行度明确了数据划分的粒度:并行度越高,数据的粒度越细,数据分片越多,数据越分散

并行度可以通过两个参数来设置

  • spark.default.parallelism,用于设置 RDD 的默认并行度
  • spark.sql.shuffle.partitions,在 Spark SQL 开发框架下,指定了 Shuffle Reduce 阶段默认的并行度

并发度

  • Executor 的线程池大小由参数 spark.executor.cores 决定
  • 每个任务在执行期间需要消耗的线程数由 spark.task.cpus 配置项给定
  • 两者相除得到的商就是并发度,也就是同一时间内,一个 Executor 内部可以同时运行的最大任务数量
  • spark.task.cpus 默认数值为 1;并发度基本由 spark.executor.cores 参数敲定

可分配的执行内存总量会随着缓存任务和执行任务的此消彼长,而动态变化。但无论怎么变,可用的执行内存总量,都不会低于配置项设定的初始值

CPU 低效原因

  • 线程挂起
    • 动态变化的执行内存总量 M,上限是总内存 / 线程数,下限是 M / N / 2
    • 动态变化的并发度 N~
    • 分布式数据集的数据分布
    • 如果 400个线程分配了 800的数据集,但一开始来了200个线程,则一开始分配的是每个为4,后面再来200线程,就分配不到需要等待
    • 如果分布式数据集的并行度设置得当,因任务调度滞后而导致的线程挂起问题就会得到缓解
  • 调度开销
    • 数据过于分散会带来严重的副作用:调度开销骤增
    • 当数据过于分散,分布式任务数量会大幅增加,但每个任务需要处理的数据量却少之又少
    • 就 CPU 消耗来说,相比花在数据处理上的比例,任务调度上的开销几乎与之分庭抗礼

在给定 Executor 线程池和执行内存大小的时候,我们可以参考上面的算法
去计算一个能够让数据分片平均大小在(M/N/2, M/N)之间的并行度
这往往是个不错的选择

提升要点:

  • 在一个 Executor 中,每个 CPU 线程能够申请到的内存比例是有上下限的,最高不超过 1/N,最低不少于 1/N/2,其中 N 代表线程池大小
  • 在给定线程池大小和执行内存的时候,并行度较低、数据分片较大容易导致 CPU 线程挂起,线程频繁挂起不利于提升 CPU 利用率,而并行度过高、数据过于分散会让调度开销更显著,也不利于提升 CPU 利用率
  • 在给定执行内存 M、线程池大小 N 和数据总量 D 的时候,想要有效地提升 CPU 利用率,我们就要计算出最佳并行度 P,计算方法是让数据分片的平均大小 D/P 坐落在(M/N/2, M/N)区间

高效的利用 内存

User Memory

  • 计算自定义数据结构的总大小 #size
  • 计算 executor线程池大小,两者相乘 就是 user memory区域的内存消耗 #User
  • 非广播变量,每个task都会存一份,总消耗:executor并行度 #thread,也就是task数量 * #size

Storage Memory

  • 计算广播变量的总大小 #bc
  • 分布式数据集总大小 #cache
  • Executor的总数 #E
  • 总消耗为 #Storage = #bc + #cache / #E

Executor Momory

  • executor的线程池大小 #N
  • 数据分配大小,这个取决于数据集尺寸 #dataset 和并行度 #N
  • 每个executor 中执行内存的消耗计算公式为 #Execution = #thread * #dataset / #N

调整内存配置

  • spark.memory.fraction 可以由公式(#Storage + #Execution)/(#User + #Storage + #Execution)计算得到
  • spark.memory.storageFraction 的数值应该参考(#Storage)/(#Storage + #Execution
  • 对于 Executor 堆内内存总大小 spark.executor.memory 的设置,我们自然要参考 4 个内存区域的总消耗,也就是 300MB + #User + #Storage + #Execution
  • 要注意,利用这个公式计算的前提是,不同内存区域的占比与不同类型的数据消耗一致

Spark 的 Cache 机制主要有 3 个方面

  • 缓存的存储级别:它限定了数据缓存的存储介质,如内存、磁盘等
  • 缓存的计算过程:从 RDD 展开到分片以 Block 的形式,存储于内存或磁盘的过程
  • 缓存的销毁过程:缓存数据以主动或是被动的方式,被驱逐出内存或是磁盘的过程

存储级别包含 3个基本要素

  • 存储介质:内存还是磁盘,或是两者都有
  • 存储形式:对象值还是序列化的字节数组,带 SER 字样的表示以序列化方式存储,不带 SER 则表示采用对象值
  • 副本数量:存储级别名字最后的数字代表拷贝数量,没有数字默认为 1 份副本
  • 最常用的只有两个:MEMORY_ONLY 和 MEMORY_AND_DISK

缓存的计算过程

  • MEMORY_AND_DISK 模式下,Spark 会优先尝试把数据集全部缓存到内存,内存不足的情况下,再把剩余的数据落盘到本地
  • MEMORY_ONLY 则不管内存是否充足,而是一股脑地把数据往内存里塞,即便内存不够也不会落盘
  • RDD、DataFrame 他们的数据分片都是以 迭代器 Iterator 形式存储的
  • 要把数据缓存下来,我们先得把迭代器展开成实实在在的数据值,这一步叫做 Unroll
  • 下图,步骤1 展开的对象值暂时存储在一个叫做 ValuesHolder 的数据结构里
  • 然后转换为 MemoryEntry。转换的实现方式是 toArray,因此它不产生额外的内存开销,这一步转换叫做 Transfer
  • 最终,MemoryEntry 和与之对应的 BlockID,以 Key、Value 的形式存储到哈希字典(LinkedHashMap)中,步骤3

缓存的销毁过程

  • 使用 LRU 管理缓存
  • 当内存不足时,会对 LinkedHashMap 从头扫描到尾,同时记录 MemoryEntry 大小
  • 当遇到总大小满足标准时,停止扫描
  • 属于同一个 RDD 的 MemoryEntry 不会被选中

当内存很小时,使用 MEMORY_AND_DISK 时,最坏的情况可能会退化为 MapReduce,不停的换入换出

cache 的使用原则

  • 如果 RDD/DataFrame/Dataset 在应用中的引用次数为 1,就坚决不使用 Cache
  • 如果引用次数大于 1,且运行成本占比超过 30%,应当考虑启用 Cache
  • 运行成本占比。它指的是计算某个分布式数据集所消耗的总时间与作业执行时间的比值

比如一个应用,端到端执行 1小时,其中有一个 DataFreame 被引用 2次
从读取到计算,到生成 DataFrame,需要 12分钟,这个DataFrame运行成本为:12 * 2 / 60 = 40%
使用noop来触发计算,它只触发计算,而不涉及罗盘与数据存储,它的作业执行时间就是DataFrame的运行时间

1
2
3
4
//利用noop精确计算DataFrame运行时间
df.write
.format(“noop”)
.save()

注意事项

  • first、take、show、count 这四个中,只有 count 才会触发缓存的完全物化,而 first、take 和 show 这 3 个算子只会把涉及的数据物化
  • 因此 Cache 应该遵循最小公共子集原则,也就是说,开发者应该仅仅缓存后续操作必需的那些数据列
  • Cache Manager 要求两个查询的 Analyzed Logical Plan 必须完全一致,才能对 DataFrame 的缓存进行复用
  • 为了避免因为 Analyzed Logical Plan 不一致造成的 Cache miss,把我们想要缓存的数据赋值给一个变量,凡是在这个变量之上的分析操作,都会完全复用缓存数据
  • 异步模式:调用 unpersist() 或是 unpersist(False)
  • 同步模式:调用 unpersist(True)

OOM 问题,首先要定位

  • 发生 OOM 的 LOC(Line Of Code),也就是代码位置在哪?
  • OOM 发生在 Driver 端,还是在 Executor 端?
  • 如果是发生在 Executor 端,OOM 到底发生在哪一片内存区域?

Driver 端的 OOM

  • 创建小规模的分布式数据集:使用 parallelize、createDataFrame 等 API 创建数据集
  • 收集计算结果:通过 take、show、collect 等算子把结果收集到 Driver 端

总大小超过 Driver的结果

1
java.lang.OutOfMemoryError: Not enough memory to build and broadcast

预估数据大小

1
2
3
4
5
6
7
8
9
val df: DataFrame = _
df.cache.count
val plan = df.queryExecution.logical
val estimated: BigInt = spark
.sessionState
.executePlan(plan)
.optimizedPlan
.stats
.sizeInBytes

Executor端 OOM

  • Reserved Memory 大小固定为 300MB,因为它是硬编码到源码中的
  • Storage Memory 来说,即便数据集不能完全缓存到 MemoryStore,Spark 也不会抛 OOM 异常,额外的数据要么落盘(MEMORY_AND_DISK)、要么直接放弃(MEMORY_ONLY
  • 主要去 Execution Memory 和 User Memory 去找毛病

User Memory报错

1
2
3
java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf
 
java.lang.OutOfMemoryError: Java heap space at java.lang.reflect.Array.newInstance

调整思路跟 Driver Memory类似,预估再调整

Executor Memory

  • 数据量并不是决定 OOM 与否的关键因素,数据分布与 Execution Memory 的运行时规划是否匹配才是
  • 一旦分布式任务的内存请求超出 1/N 这个上限,Execution Memory 就会出现 OOM 问题
  • N 是task数量,也就是线程数量,并发度
  • 主要问题,数据倾斜和数据膨胀

数据倾斜

  • task1、task2、task3,分别处理 100M、100M、300M
  • 总内存 360M,最终 task3导致了 OOM
  • 但实质上是 Task3 的内存请求超出 1/N 上限。
  • 消除数据倾斜,让所有的数据分片尺寸都不大于 100MB
  • 调整 Executor 线程池、内存、并行度等相关配置,提高 1/N 上限到 300MB

数据膨胀

  • 本身数据集分片是 100M,但是加载到内存变成 300M
  • 每个 task 120M 内存,于是出现OOM
  • 把数据打散,提高数据分片数量、降低数据粒度,让膨胀之后的数据量降到 100MB 左右
  • 加大内存配置,结合 Executor 线程池调整,提高 1/N 上限到 300MB

高效的利用 磁盘

磁盘的作用

  • 溢出临时文件
  • 存储 Shuffle 中间文件
  • 缓存分布式数据集,带有 DISK 字样的存储模式,都会把内存中放不下的数据缓存到磁盘

性能上的作用

  • 失败重试中的磁盘复用,不再是从DAG源头重来,而是与触发点最近的新 Shuffle的中间文件重来
  • 磁盘复用的收益之一就是缩短失败重试的路径,在保障作业稳定性的同时提升执行性能
  • 磁盘复用的另一种形式:ReuseExchange 机制
  • 它指的是相同或是相似的物理计划可以共享 Shuffle 计算的中间结果

未复用中间结果的逻辑

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
//版本1:分别计算PV、UV,然后合并
// Data schema (userId: String, accessTime: Timestamp, page: String)
 
val filePath: String = _
val df: DataFrame = spark.read.parquet(filePath)
 
val dfPV: DataFrame = df.groupBy("userId").agg(count("page").alias("value")).withColumn("metrics", lit("PV"))
val dfUV: DataFrame = df.groupBy("userId").agg(countDistinct("page").alias("value")).withColumn("metrics ", lit("UV"))
 
val resultDF: DataFrame = dfPV.Union(dfUV)
 
// Result样例
| userId | metrics | value |
| user0  | PV      | 25 |
| user0  | UV      | 12 |

复用后的逻辑

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
//版本2:分别计算PV、UV,然后合并
// Data schema (userId: String, accessTime: Timestamp, page: String)
 
// 关键一行 
val filePath: String = _
val df: DataFrame = spark.read.parquet(filePath).repartition($"userId")
 
val dfPV: DataFrame = df.groupBy("userId").agg(count("page").alias("value")).withColumn("metrics", lit("PV"))
val dfUV: DataFrame = df.groupBy("userId").agg(countDistinct("page").alias("value")).withColumn("metrics ", lit("UV"))
 
val resultDF: DataFrame = dfPV.Union(dfUV)
 
// Result样例
| userId | metrics | value |
| user0  | PV      | 25 |
| user0  | UV      | 12 |


优化后

  • 数据源只需要扫描一遍,而且作为“性能瓶颈担当”的 Shuffle 也只发生了一次

触发条件至少有 2 个

  • 多个查询所依赖的分区规则要与 Shuffle 中间数据的分区规则保持一致
  • 多个查询所涉及的字段(Attributes)要保持一致

高效的利用 网络

不管是什么文件格式,也不管是哪种存储系统,访问数据源是否会引入网络开销
取决于任务与数据的本地性关系,也就是任务的本地性级别

  • PROCESS_LOCAL:任务与数据同在一个 JVM 进程中
  • NODE_LOCAL:任务与数据同在一个计算节点,数据可能在磁盘上或是另一个 JVM 进程中
  • RACK_LOCAL:任务与数据不在同一节点,但在同一个物理机架上
  • ANY:任务与数据是跨机架、甚至是跨 DC(Data Center,数据中心)的关系

根据 NODE_LOCAL 的定义,在这个级别下,调度的目标节点至少在磁盘上存有 Spark 计算任务所需的数据分片
在集群部署上,Spark 集群与外部存储系统在物理上是紧紧耦合在一起的
如果 Spark 集群与存储集群在物理上是分开的,那么任务的本地性级别只能退化到 RACK_LOCAL,甚至是 ANY

对于 Spark 加 HDFS 和 Spark 加 MongoDB 来说,是否会引入网络开销完全取决于它们的部署模式
物理上紧耦合,在 NODE_LOCAL 级别下,Spark 用磁盘 I/O 替代网络开销获取数据;物理上分离,网络开销就无法避免

数据处理

  • 应当遵循“能省则省”的开发原则,在适当的场景用 Broadcast Joins 来避免 Shuffle 引入的网络开销
  • 确实没法避免 Shuffle,我们可以在计算中多使用 Map 端聚合,减少需要在网络中分发的数据量
  • 如果应用对于高可用的要求不高,那我们应该尽量避免副本数量大于 1 的存储模式,避免副本跨节点拷贝带来的额外开销

数据传输

  • Kryo Serializer 相比 Java serializer,在处理效率和存储效率两个方面都会胜出数倍
  • 对于一些自定义的数据结构来说,如果你没有明确把这些类型向 Kryo Serializer 注册的话,虽然它依然会帮你做序列化的工作
  • 但它序列化的每一条数据记录都会带一个类名字,这个类名字是通过反射机制得到的,会非常长。在上亿的样本中,存储开销自然相当可观
  • spark.kryo.registrationRequired 推荐设置为 TRUE,当遇到未注册的类型会报错

一个例子

1
2
3
4
5
6
7
//向Kryo Serializer注册类型
val conf = new SparkConf().setMaster(“”).setAppName(“”)
conf.registerKryoClasses(Array(
classOf[Array[String]],
classOf[HashMap[String, String]],
classOf[MyClass]
))
配置项 含义 默认值 推荐设置
spark.serializer 指定使用哪一种序列化器 JavaSerializer KryoSerializer
spark.kryo.registrationRequired 是否强制注册自定义类型 FALSE TRUE

SQL 性能调优

RDD 和 DataFrame

Spark 3.0 发布的时候

  • Spark SQL 占比 46%
  • Structured Streaming 4%
  • MLlib 6%
  • PySpark 7%
  • Test & Docs 12%
  • Spark Core 16%
  • Other 9%

Spark SQL 取代 Spark Core,成为新一代的引擎内核
所有其他子框架如 Mllib、Streaming 和 Graph,都可以共享 Spark SQL 的性能优化,都能从 Spark 社区对于 Spark SQL 的投入中受益

RDD 的问题

  • 太灵活,没有携带数据类型
  • 下图高亮的是 高阶算子,开发者需要以 Lambda 函数的形式自行提供具体的计算逻辑
  • 在 RDD 的开发模式下,Spark Core 只知道“做什么”,而不知道“怎么做”
  • 对于 Spark Core 来说,优化空间受限最主要的影响,莫过于让应用的执行性能变得低下

RDD Programming Guide
https://spark.apache.org/docs/latest/rdd-programming-guide.html

DataFrame

  • DataFrame 就是携带数据模式(Data Schema)的结构化分布式数据集
  • RDD 是不带 Schema 的分布式数据集
  • 从数据表示(Data Representation)的角度来看,是否携带 Schema 是它们唯一的区别
  • RDD 算子多是高阶函数,这些算子允许开发者灵活地实现业务逻辑,表达能力极强。
  • DataFrame 的表达能力却很弱。一来,它定义了一套 DSL(Domain Specific Language)算子,如 select、filter、agg、groupBy 等等
  • DataFrame API 最大的意义在于,它为 Spark 引擎的内核优化打开了全新的空间
  • Spark SQL 的核心组件有二,其一是 Catalyst 优化器,其二是 Tungsten

Catalyst

  • 第一步优化,就是结合 DataFrame 的 Schema 信息,确认计划中的表名、字段名、字段类型与实际数据是否一致
  • 利用启发式的规则和执行策略,Catalyst 最终把逻辑计划转换为可执行的物理计划
  • Catalyst 的优化空间来源 DataFrame 的开发模式

Tungsten

  • 使用定制化的数据结构 Unsafe Row 来存储数据
  • Unsafe Row 的优点是存储效率高、GC 效率高。Tungsten 之所以能够设计这样的数据结构,仰仗的也是 DataFrame 携带的 Schema
  • Tungsten 是用二进制字节序列来存储每一条用户数据的,因此在存储效率上完胜 Java Object

优势

  • 基于 DataFrame 简单的标量算子和明确的 Schema 定义,借助 Catalyst 优化器和 Tungsten
  • Spark SQL 有能力在运行时构建起一套端到端的优化机制
  • 这套机制运用启发式的规则与策略,以及运行时的执行信息,将原本次优、甚至是低效的查询计划转换为高效的执行计划,从而提升端到端的执行性能
  • 不论你使用哪种开发语言,开发者都能共享 Spark SQL 带来的性能福利
  • 所有子框架的源码实现都已从 RDD 切换到 DataFrame
  • 和 PySpark 一样,像 Streaming、Graph、Mllib 这些子框架实际上都是通过 DataFrame API 运行在 Spark SQL 之上,它们自然可以共享 Spark SQL 引入的种种优化机制

Catalyst


Catalyst 逻辑优化阶段分为两个环节

  • 逻辑计划解析和逻辑计划优化
  • 在逻辑计划解析中,Catalyst 把“Unresolved Logical Plan”转换为“Analyzed Logical Plan”
  • 在逻辑计划优化中,Catalyst 基于一些既定的启发式规则(Heuristics Based Rules),把“Analyzed Logical Plan”转换为“Optimized Logical Plan”
  • 在逻辑计划解析环节,Catalyst 就是要结合 DataFrame 的 Schema 信息,来确认计划中的表名、字段名、字段类型与实际数据是否一致
  • 对于同样一种计算逻辑,实现方式可以有多种,按照不同的顺序对算子做排列组合,我们就可以演化出不同的实现方式
  • 最好的方式是,我们遵循“能省则省、能拖则拖”的开发原则,去选择所有实现方式中最优的那个
  • Spark 3.0 版本中,Catalyst 总共有 81 条优化规则(Rules),这 81 条规则会分成 27 组(Batches),其中有些规则会被收纳到多个分组里。因此,如果不考虑规则的重复性,27 组算下来总共会有 129 个优化规则
  • 常见的:谓词下推(Predicate Pushdown)、列剪裁(Column Pruning)、常量替换 (Constant Folding)

逻辑计划(Logical Plan)、物理计划(Physical Plan)都继承自 QueryPlan
QueryPlan 的父类是 TreeNode,TreeNode 就是语法树中对于节点的抽象
TreeNode 有一个名叫 children 的字段,类型是 Seq[TreeNode],利用 TreeNode 类型,Catalyst 可以很容易地构建一个树结构
除了 children 字段,TreeNode 还定义了很多高阶函数,其中最值得关注的是一个叫做 transformDown 的方法

一个例子

1
2
3
4
5
6
7
8
//Expression的转换
import org.apache.spark.sql.catalyst.expressions._
val myExpr: Expression = Multiply(Subtract(Literal(6), Literal(4)), Subtract(Literal(1), Literal(9)))
val transformed: Expression = myExpr transformDown {
  case BinaryOperator(l, r) => Add(l, r)
  case IntegerLiteral(i) if i > 5 => Literal(1)
  case IntegerLiteral(i) if i < 5 => Literal(0)
}

利用 Cache Manager 做进一步的优化

  • 这里的 Cache 指的就是我们常说的分布式数据缓存
  • 可以调用 DataFrame 的.cache 或.persist
  • 或是在 SQL 语句中使用“cache table”关键字
  • 字典的 Key 是逻辑计划,Value 是对应的 Cache 元信息

Catalyst 的物理优化阶段(Physical Planning

  • 在优化 Spark Plan 的过程中,Catalyst 基于既定的优化策略(Strategies),把逻辑计划中的关系操作符一一映射成物理操作符,生成 Spark Plan
  • 在生成 Physical Plan 过程中,Catalyst 再基于事先定义的 Preparation Rules,对 Spark Plan 做进一步的完善、生成可执行的 Physical Plan
  • 所有优化策略在转换方式上都大同小异,都是使用基于模式匹配的偏函数(Partial Functions),把逻辑计划中的操作符平行映射为 Spark Plan 中的物理算子
  • BasicOperators 策略直接把 Project、Filter、Sort 等逻辑操作符平行地映射为物理操作符

Catalyst 在运行时总共支持 5 种 Join 策略

  • Broadcast Hash Join(BHJ)
  • Shuffle Sort Merge Join(SMJ)
  • Shuffle Hash Join(SHJ)
  • Broadcast Nested Loop Join(BNLJ)
  • Shuffle Cartesian Product Join(CPJ)

它们是来自 2 种数据分发方式(广播和 Shuffle)与 3 种 Join 实现机制(Hash Joins、Sort Merge Joins 和 Nested Loop Joins)的排列组合

Catalyst 总会尝试优先选择执行效率最高的策略

  • 第一类是“条件型”信息,用来判决 5 大 Join 策略的先决条件
    • Join 类型,也就是是否等值、连接形式等,这种信息的来源是查询语句本身
    • 内表尺寸,这些信息的来源就比较广泛了,可以是 Hive 表之上的 ANALYZE TABLE 语句,也可以是 Spark 对于 Parquet、ORC、CSV 等源文件的尺寸预估,甚至是来自 AQE 的动态统计信息
  • 第二类是“指令型”信息,也就是开发者提供的 Join Hints

执行语句

1
2
3
4
val result = txDF.select("price", "volume", "userId")
.join(users.hint("shuffle_hash"), Seq("userId"), "inner")
.groupBy(col("name"), col("age")).agg(sum(col("price") * 
col("volume")).alias("revenue"))

Preparation Rules

  • 对于执行计划中的每一个操作符节点,都有 4 个属性用来分别描述数据输入和输出的分布状态
  • Shuffle Sort Merge Join 的计算需要两个先决条件:Shuffle 和排序
  • 而 Spark Plan 中并没有明确指定以哪个字段为基准进行 Shuffle,以及按照哪个字段去做排序
  • Preparation Rules 的规则坚守最后一班岗,负责生成 Physical Plan

EnsureRequirements 规则

  • Project 的 outputPartitioning 属性和 outputOrdering 属性分别是 Unknow 和 None
  • 因此,它们输出的数据没有按照任何列进行 Shuffle 或是排序
  • 但是,SortMergeJoin 对于输入数据的要求很明确:按照 userId 分成 200 个分区且排好序,而这两个 Project 子节点的输出显然并没有满足父节点 SortMergeJoin 的要求
  • EnsureRequirements 规则就要介入了,它通过添加必要的操作符,如 Shuffle 和排序,来保证 SortMergeJoin 节点对于输入数据的要求一定要得到满足

添加节点

  • 在两个 Project 节点之后,EnsureRequirements 规则分别添加了 Exchange 和 Sort 节点
  • Exchange 节点代表 Shuffle 操作,用来满足 SortMergeJoin 对于数据分布的要求
  • Sort 表示排序,用于满足 SortMergeJoin 对于数据有序的要求
  • 这个时候,Spark 可以通过调用 Physical Plan 的 doExecute 方法,把结构化查询的计算结果,转换成 RDD[InternalRow]
  • 这里的 InternalRow,就是 Tungsten 设计的定制化二进制数据结构
  • 通过调用 RDD[InternalRow]之上的 Action 算子,Spark 就可以触发 Physical Plan 从头至尾依序执行

Physical Plan 的再次变化

  • EnsureRequirements 规则在两个分支的顶端分别添加了 Exchange 和 Sort 操作
  • Physical Plan 中多了很多星号“*”,这些星号的后面还带着括号和数字
  • 这种星号“*”标记表示的就是 WSCG,后面的数字代表 Stage 编号
  • 括号中数字相同的操作,最终都会被捏合成一份“手写代码”

Tungsten

Tungsten 又叫钨丝计划,它主要围绕内核引擎做了两方面的改进

  • 数据结构设计
  • 全阶段代码生成(WSCG,Whole Stage Code Generation)

Unsafe Row:二进制数据结构 传统方式存储
一个 String 的 mike,需要48个字节,12个字节对象头、8字节的hash编码、8字节的字段值存储
20字节的的其他开销,实际只需要 4字节,所以开销太大
一亿条记录,可能需要 6亿个对象存储,GC压力很大
而使用数组存储,压力就小很多,前面是定长字段,然后是变长字段的 offset,它指向变长字段(放在数组后面),变长字段由 length + value 组成

基于内存页的内存管理

  • 为了统一管理 Off Heap 和 On Heap 内存空间,Tungsten 定义了统一的 128 位内存地址,简称 Tungsten 地址
  • 前 64 位预留给 Java Object,后 64 位是偏移地址 Offset
  • 对于 On Heap 空间的 Tungsten 地址来说,前 64 位存储的是 JVM 堆内对象的引用或者说指针,后 64 位 Offset 存储的是数据在该对象内的偏移地址
  • Off Heap 的前 64 位存储的是 null 值,后 64 位则用于在堆外空间中直接寻址操作系统的内存空间

页表

  • Tungsten 使用一种叫做页表(Page Table)的数据结构,来记录从 Object 引用到 JVM 对象地址的映射
  • JDK的hash-map 存储开销和 GC 负担比较大
  • 在数据访问的过程中,标准库实现的 HashMap 容易降低 CPU 缓存命中率,进而降低 CPU 利用率。链表这种数据结构的特点是,对写入友好,但访问低效
  • Tungsten 放弃了链表的实现方式,使用数组加内存页的方式来实现 HashMap
  • 数组中存储的元素是 Hash code 和 Tungsten 内存地址,也就是 Object 引用外加 Offset 的 128 位地址
  • Tungsten HashMap 的存储单元是内存页,内存页本质上是 Java Object,一个内存页可以存储多个数据条目
  • 相比标准库中的 HashMap,使用内存页大幅缩减了存储所需的对象数量
  • 原先 100W记录,JVM对象需要几百W个,而 Tungsten HashMap只需要几十个
  • 内存页本质上是 JVM 对象,其内部使用连续空间来存储数据,内存页加偏移量可以精准地定位到每一个数据元素
  • 在需要扫描 HashMap 全量数据的时候,得益于内存页中连续存储的方式,内存的访问方式从原来的随机访问变成了顺序读取(Sequential Access)

WSCG,Whole Stage Code Generation

  • WSCG 指的是基于同一 Stage 内操作符之间的调用关系,生成一份“手写代码”,真正把所有计算融合为一个统一的函数
  • 一个是内存数据的随机存取,另一个是虚函数调用(next)
  • 本质上,WSCG 机制的工作过程就是基于一份“性能较差的代码”,在运行时动态地(On The Fly)重构出一份“性能更好的代码”

手写代码的生成过程分为两个步骤

  • 从父节点到子节点,递归调用 doProduce,生成代码框架
  • 从子节点到父节点,递归调用 doConsume,向框架填充每一个操作符的运算逻辑

过程

  • 在 Stage 顶端节点也就是 Project 之上,添加 WholeStageCodeGen 节点
  • WholeStageCodeGen 节点通过调用 doExecute 来触发整个代码生成过程的计算
  • doExecute 会递归调用子节点的 doProduce 函数,直到遇到 Shuffle Boundary 为止
  • Shuffle Boundary 指的是 Shuffle 边界,要么是数据源,要么是上一个 Stage 的输出
  • 在叶子节点(也就是 Scan)调用的 doProduce 函数会先把手写代码的框架生成出来,如图中右侧蓝色部分的代码
  • Scan 中的 doProduce 会反向递归调用每个父节点的 doConsume 函数
  • 不同操作符在执行 doConsume 函数的过程中,会把关系表达式转化成 Java 代码,然后把这份代码像做“完形填空”一样,嵌入到刚刚的代码框架里
  • 比如图中橘黄色的 doConsume 生成的 if 语句,其中包含了判断地区是否为北京的条件,以及紫色的 doConsume 生成了获取必需字段 userId 的 Java 代码
  • Tungsten 利用 CollapseCodegenStages 规则,经过两层递归调用把 Catalyst 输出的 Spark Plan 加工成了一份“手写代码”,并把这份手写代码会交付给 DAGScheduler
  • DAGScheduler 再去协调 TaskScheduler 和 SchedulerBackend,完成分布式任务调度

AQE优化

CBO的问题

  • 适应面太窄,只支持注册到Hive Metastore的数据表
  • 统计信息收集很慢,需要手动调用 ANALYZE TABLE COMPUTE STATISTICS 语句收集统计信息
  • CBO 结合各类统计信息制定执行计划,一旦执行计划交付运行,CBO 的使命就算完成了,如果在运行时数据分布发生动态变化,CBO 先前制定的执行计划并不会跟着调整、适配

AQE(Adaptive Query Execution,自适应查询执行)

  • AQE 是 Spark SQL 的一种动态优化机制
  • 在运行时,每当 Shuffle Map 阶段执行完毕
  • AQE 都会结合这个阶段的统计信息,基于既定的规则动态地调整、修正尚未执行的逻辑计划和物理计划
  • 来完成对原始查询语句的运行时优化
  • AQE 优化机制触发的时机是 Shuffle Map 阶段执行完毕。也就是说,AQE 优化的频次与执行计划中 Shuffle 的次数一致
  • 如果你的查询语句不会引入 Shuffle 操作,那么 Spark SQL 是不会触发 AQE 的
  • AQE 赖以优化的统计信息与 CBO 不同,这些统计信息并不是关于某张表或是哪个列,而是 Shuffle Map 阶段输出的中间文件
  • 结合 Spark SQL 端到端优化流程图我们可以看到,AQE 从运行时获取统计信息,在条件允许的情况下,优化决策会分别作用到逻辑计划和物理计划

AQE 既定的规则和策略主要有 4 个,分为 1 个逻辑优化规则和 3 个物理优化策略

AQE 的三大特性

  • Join 策略调整:如果某张表在过滤之后,尺寸小于广播变量阈值,这张表参与的数据关联就会从 Shuffle Sort Merge Join 降级(Demote)为执行效率更高的 Broadcast Hash Join
  • 自动分区合并:在 Shuffle 过后,Reduce Task 数据分布参差不齐,AQE 将自动合并过小的数据分区
  • 自动倾斜处理:结合配置项,AQE 自动拆分 Reduce 阶段过大的数据分区,降低单个 Reduce Task 的工作负载

Join 策略调整

  • DemoteBroadcastHashJoin 规则的作用,是把 Shuffle Joins 降级为 Broadcast Joins
  • 这个规则仅适用于 Shuffle Sort Merge Join 这种关联机制,其他机制如 Shuffle Hash Join、Shuffle Nested Loop Join 都不支持
  • 对于参与 Join 的两张表来说,在它们分别完成 Shuffle Map 阶段的计算之后,DemoteBroadcastHashJoin 会判断中间文件是否满足如下条件
  • 中间文件尺寸总和小于广播阈值 spark.sql.autoBroadcastJoinThreshold
  • 空文件占比小于配置项 spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin
  • 只要有任意一张表的统计信息满足这两个条件,Shuffle Sort Merge Join 就会降级为 Broadcast Hash Join
  • 物理计划存在的原因:不论大表还是小表都要完成 Shuffle Map 阶段的计算,并且把中间文件落盘,AQE 才能做出决策
  • AQE 必须等待两张表都完成 Shuffle Map 的计算,然后统计中间文件,才能判断降级条件是否成立,以及用哪张表做广播变量
  • 采取 OptimizeLocalShuffleReader 策略可以省去 Shuffle 常规步骤中的网络分发
  • Reduce Task 可以就地读取本地节点(Local)的中间文件,完成与广播小表的关联操作
  • OptimizeLocalShuffleReader 物理策略的生效与否由一个配置项决定。这个配置项是 spark.sql.adaptive.localShuffleReader.enabled
  • OptimizeLocalShuffleReader 策略避免了 Reduce 阶段数据在网络中的全量分发,仅凭这一点,大多数的应用都能获益匪浅

自动分区合并

  • 在 Reduce 阶段,当 Reduce Task 从全网把数据分片拉回,AQE 按照分区编号的顺序,依次把小于目标尺寸的分区合并在一起
  • spark.sql.adaptive.advisoryPartitionSizeInBytes,由开发者指定分区合并后的推荐尺寸
  • spark.sql.adaptive.coalescePartitions.minPartitionNum,分区合并后,分区数不能低于该值
  • 在 Shuffle Map 阶段完成之后,AQE 优化机制被触发,CoalesceShufflePartitions 策略“无条件”地被添加到新的物理计划中
  • 读取配置项、计算目标分区大小、依序合并相邻分区这些计算逻辑,在 Tungsten WSCG 的作用下融合进“手写代码”于 Reduce 阶段执行

自动倾斜处理

  • 在 Reduce 阶段,当 Reduce Task 所需处理的分区尺寸大于一定阈值时,利用 OptimizeSkewedJoin 策略,AQE 会把大分区拆成多个小分区
  • spark.sql.adaptive.skewJoin.skewedPartitionFactor,判定倾斜的膨胀系数
  • spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes,判定倾斜的最低阈值
  • spark.sql.adaptive.advisoryPartitionSizeInBytes,以字节为单位,定义拆分粒度

倾斜处理的问题1

  • 假设有个 Shuffle 操作,它的 Map 阶段有 3 个分区,Reduce 阶段有 4 个分区。4 个分区中的两个都是倾斜的大分区
  • 4 个分区中的两个都是倾斜的大分区,而且这两个倾斜的大分区刚好都分发到了 Executor 0
  • 尽管两个大分区被拆分,但横向来看,整个作业的主要负载还是落在了 Executor 0 的身上

倾斜问题2

  • 在数据关联的场景中,对于参与 Join 的两张表
  • 如果表 1 存在数据倾斜,表 2 不倾斜,那在关联的过程中,AQE 除了对表 1 做拆分之外
  • 还需要对表 2 对应的数据分区做复制,来保证关联关系不被破坏

两张表都存在数据倾斜

  • 为了不破坏逻辑上的关联关系,表 1、表 2 拆分出来的分区还要各自复制出一份
  • 左表拆出 M 个分区,右表拆出 N 各分区
  • 那么每张表最终都需要保持 M x N 份分区数据,才能保证关联逻辑的一致性
  • 当 M 和 N 逐渐变大时,AQE 处理数据倾斜所需的计算开销将会面临失控的风险
  • 当应用场景中的数据倾斜比较简单,比如虽然有倾斜但数据分布相对均匀,或是关联计算中只有一边倾斜,我们完全可以依赖 AQE 的自动倾斜处理机制
  • 但是,当我们的场景中数据倾斜变得复杂,比如数据中不同 Key 的分布悬殊,或是参与关联的两表都存在大量的倾斜
  • 我们就需要衡量 AQE 的自动化机制与手工处理倾斜之间的利害得失

DPP

DPP(Dynamic Partition Pruning,动态分区剪裁)是 Spark 3.0 版本中第二个引人注目的特性
充分利用过滤之后的维度表,大幅削减事实表的数据扫描量,从整体上提升关联计算的执行性能

分区裁剪

  • 如果过滤谓词中包含分区键,那么 Spark SQL 对分区表做扫描的时候
  • 是完全可以跳过(剪掉)不满足谓词条件的分区目录,这就是分区剪裁

工作过程

  • 过滤条件 users.type = ‘Head User’会帮助维度表过滤一部分数据
  • 保留下来的 ID 值,仅仅是维度表 ID 全集的一个子集
  • 使用广播变量封装过滤之后的维度表数据
  • 在维度表做完过滤之后,Spark SQL 在其上构建哈希表(Hash Table),这个哈希表的 Key 就是用于关联的 Join Key

要求

  • 事实表必须是分区表,而且分区字段(可以是多个)必须包含 Join Key
  • DPP 仅支持等值 Joins,不支持大于、小于这种不等值关联关系
  • 维度表过滤之后的数据集要小于广播阈值

Join Hints

Join 的实现方式详解

  • 嵌套循环连接(NLJ,Nested Loop Join ),大表作为外表,小表作为内表
  • 排序归并连接(SMJ,Shuffle Sort Merge Join)
  • 哈希连接(HJ,Hash Join)

SMJ 过程

  • 外表 Join Key 等于内表 Join Key,满足关联条件,把两边的数据记录拼接并输出,然后把外表的游标滑动到下一条记录
  • 外表 Join Key 小于内表 Join Key,不满足关联条件,把外表的游标滑动到下一条记录
  • 外表 Join Key 大于内表 Join Key,不满足关联条件,把内表的游标滑动到下一条记录

HJ 过程

  • 在 Build 阶段,基于内表,算法使用既定的哈希函数构建哈希表
  • 哈希表中的 Key 是 Join Key 应用(Apply)哈希函数之后的哈希值,表中的 Value 同时包含了原始的 Join Key 和 Payload
  • Probe 阶段,算法遍历每一条数据记录,先是使用同样的哈希函数,以动态的方式(On The Fly)计算 Join Key 的哈希值
  • 用计算得到的哈希值去查询刚刚在 Build 阶段创建好的哈希表
  • 如果 Join Key 一致,就把两边的记录进行拼接并输出,从而完成数据关联

数据在网络中的分发主要有两种方式

  • Shuffle,如果采用 Shuffle 的分发方式来完成数据关联
  • 那么外表和内表都需要按照 Join Key 在集群中做全量的数据分发
  • 两个数据表中 Join Key 相同的数据记录才能分配到同一个 Executor 进程,从而完成关联计算
  • 广播,Spark 只需要把内表(基表)封装到广播变量,然后在全网进行分发
  • 由于广播变量中包含了内表的全量数据,因此体量较大的外表只要“待在原地、保持不动”,就能轻松地完成关联计算

join策略

没有 Broadcast + Sort Merge Join 这种策略

  • 当数据能以广播的形式在网络中进行分发时,说明被分发的数据,也就是基表的数据足够小,完全可以放到内存中去
  • 这个时候,相比 NLJ、SMJ,HJ 的执行效率是最高的
  • Spark 自然就没有必要再去用 SMJ 这种前置开销比较大的方式去完成数据关联

等值数据关联中,选择顺序

  • BHJ,不能是全连接
  • SMJ,支持所有连接类型,全连接,anti join等
  • SHJ,支持类型同上

SHJ 实际效率比 SMJ 更高,但是可能会出现 OOM
SMJ 的实现方式更加稳定,更不容易 OOM
另外:外表大小至少是内表的 3 倍,内表数据分片的平均大小要小于广播变量阈值
spark.sql.join.preferSortMergeJoin = true 时,压根不考虑 SHJ
只有关闭时,Spark才会尝试

不等值 join

  • t1 inner join t2 on t1.date > t2.beginDate and t1.date <= t2.endDate
  • 不等值只能用 NJL实现,可选的策略只有 BNLJ、CPJ
  • 如果内表可以放到广播变量,如果不满足,只能选择 CPJ

在满足前提条件的情况下,如等值条件、连接类型、表大小等等,Spark 会优先尊重开发者的意愿,去选取开发者通过 Join Hints 指定的 Join 策略

  • 通过 SQL 结构化查询语句
  • 使用 DataFrame 的 DSL 语言

等值 join 的选择策略

  • 优先选择 BHJ
  • 其次是 SMJ,用来兜底
  • SHJ 性能浩宇 SMJ,但可能会出现OOM

不等值 join 的选择策略

  • 只能用 NLJ 算法
  • 优先选择 BNLJ
  • 否则,只能选择笨重的 CPJ 来兜底

大表join小表

BHJ 处理大表 Join 小表时的前提条件是,广播变量能够容纳小表的全量数据

Join Key 远大于 Payload

  • 首先,两张表的 Schema 完全一致。其次,无论是在数量、还是尺寸上,两张表的 Join Keys 都远大于 Payload
  • 基于现有的 Join Keys 去生成一个全新的数据列,它可以叫“Hash Key”。生成的方法分两步
  • 把所有 Join Keys 拼接在一起,把性别、年龄、一直到小时拼接成一个字符串,如图中步骤 1、3 所示
  • 使用哈希算法(如 MD5 或 SHA256)对拼接后的字符串做哈希运算,得到哈希值即为“Hash Key”,如上图步骤 2、4 所示
  • 当内表缩减到足以放进广播变量的时候,我们就可以把 SMJ 转化为 BHJ,从而把 SMJ 中的 Shuffle 环节彻底省掉
  • 优化的关键在于,先用 Hash Key 取代 Join Keys,再清除内表冗余数据。Hash Key 实际上是 Join Keys 拼接之后的哈希值。既然存在哈希运算,我们就必须要考虑哈希冲突的问题
  • 消除哈希冲突隐患的方法其实很多,比如“二次哈希”

过滤条件的 Selectivity 较高

  • 案例来源于电子商务场景,在星型(Start Schema)数仓中,我们有两张表,一张是订单表 orders,另一张是用户表 users。订单表是事实表(Fact),而用户表是维度表(Dimension)
  • 统计所有头部用户贡献的营业额,并按照营业额倒序排序
  • 这是一个典型的星型查询,也就是事实表与维度表关联,且维表带过滤条件。维表上的过滤条件是 users.type = ‘Head User’,即只选取头部用户
  • 头部用户的占比很低,这个过滤条件的选择性(Selectivity)很高,它可以帮助你过滤掉大部分的维表数据
  • AQE 允许 Spark SQL 在运行时动态地调整 Join 策略,把最初制定的 SMJ 策略转化为 BHJ 策略
  • 还可以利用 DPP 机制来减少事实表的扫描量,进一步减少 I/O 开销、提升性能
  • 事实表必须是分区表,从5个小时,经过AQE优化到30分钟,再经过DPP 优化到 15分钟

小表数据分布均匀

  • 当参与 Join 的两张表尺寸相差悬殊且小表数据分布均匀的时候,SHJ 往往比 SMJ 的执行效率更高
  • 由于维表的查询条件不复存在,AQE 的join调整策略、DPP都失效了
  • 不妨使用 Join Hints 来强制 Spark SQL 去选择 SHJ 策略进行关联计算
  • 从 7个小时,缩减到 5个小时
  • SHJ 要想成功地完成计算、不抛 OOM 异常,需要保证小表的每个数据分片都能放进内存
  • 要求小表的数据分布必须是均匀的,如果不均匀,可能会抛出OOM

大表 join 大表

  • “分而治之”的核心思想是通过均匀拆分内表的方式 ,把一个复杂而又庞大的 Shuffle Join 转化为多个 Broadcast Joins
  • 消除原有 Shuffle Join 中两张大表所引入的海量数据分发,大幅削减磁盘与网络开销的同时,从整体上提升作业端到端的执行性能
  • 内表拆分,我们要求每一个子表的尺寸相对均匀,且都小到可以放进广播变量
  • 拆分的关键在于拆分列的选取,性别基数太低,身份证号基数太高,日期比较合适
  • 完全可以只扫描那些与内表子表相关的外表数据,并不需要每次都扫描外表的全量数据
  • 外表的分区键包含 join key,每个内表可以通过 DPP机制,帮助与之关联的外表减少扫描的数量

如果内表所有分片都能放到内存中
可以使用 SHJ
需要设置好并行度,并发度,内存之间的关系

数据倾斜
分为三种情况

序号 倾斜类型 外表倾斜 内表倾斜
1 单表倾斜 R
2 单表倾斜 R
3 双表倾斜 R R

AQE 的数据倾斜,是已 task 为粒度做的,如果 executor 内倾斜很严重,还是解决不了
也就是 单个task虽然拆分了,但拆分后的多task 还是落在一个 executor 上,计算量没有降低

执行方式

  • 以 Join Key 是否倾斜为依据来拆解子任务
  • 对于外表中所有的 Join Keys,按照是否存在倾斜把它们分为两组
  • 一组是存在倾斜问题的 Join Keys,另一组是分布均匀的 Join Keys
  • 内表也按照同样方式拆分
  • 对于数据均匀分布的内外两个拆分表,使用 SHJ 的方式,或者继续拆分为 BHJ
  • 用多个广播小表 join 均匀拆分的外表
  • 自后将均匀和不均匀的结果 union 即可

倾斜数据处理

  • 第一阶段,是加盐、Shuffle、关联、聚合
  • 给倾斜的 join key 增加后缀,这样就均匀的打散到不同 executor上了
  • 加盐的粒度,一般为 #N, 也就是executor 总数
  • 外表加盐,对于任意一个join key,增加 1 到 #N 之间的一个随机后缀
  • 内表为复制加盐,对于任意 join key,将原始数据复制 #N - 1 份,得到 #N个副本
  • 对于每个副本,为其 join key追加 1 到 #N 之间的固定后缀,让它与打散后的外表数据保持一致
  • 内外表分别加盐之后,数据倾斜问题就被消除了
  • 之后使用常规方法继续优化,如将SMJ 转为 SHJ,或者拆分为多个 BHJ
  • “两阶段 Shuffle” 的第一阶段执行完毕,得到了初步的聚合结果,这些结果是以打散的 Join Keys 为粒度进行计算得到的

第二阶段

  • 包含“去盐化、Shuffle、聚合”这 3 个步骤
  • 首先,我们把每一个 Join Key 的后缀去掉,这一步叫做“去盐化”
  • 然后,我们按照原来的 Join Key 再做一遍 Shuffle 和聚合计算,这一步计算得到的结果,就是“分而治之”当中倾斜部分的计算结果
  • 将这部分结果与“分而治之”当中均匀部分的计算结果合并,我们就能完成存在倾斜问题的“大表 Join 大表”的计算场景

参考