Spark SQL论文
Spark SQL 论文
Spark SQL: Relational Data Processing in Spark
背景
Spark SQL 整合了关系型处理、函数编程API,可以用SQL表示复杂的查询,它增加了两个不同之处:
- 将申明API DataFrame 和 关系处理紧密的整合在一起了
- 提供了一个高度优化,可扩展的框架 Catalyst
MapReduce可以提供很强的能力,但是太底层了,于是一系列申明式的,带优化的系统出现了
Hive、Pig、Dremel、Shark等等
但这些系统的不足以处理很多大数据场景
- 很多用户需要执行ETL,他们的各种数据源是结构、半结构的
- 用户想要执行更复杂的分析如 机器学习、图处理
有些系统则将 关系查询 和复杂的程序算法组合到一起,但是结合的并不好
于是用户只能再这两种范式上 二选一了
本文提供了一个新的组件:Spark SQL,它构建于早期的Shark之上,但不会强制用户对关系模型、程序API之间二选一
而是混合了这两种模型的特性
- 提供了DataFreame API,可以在外部数据源、Spark内建的RDD之上执行关系操作,类似于R,但它是延迟执行的,所以可以
- 为支持各种数据源和算法,提供了Catalyst,可以增加数据源、优化规则、数据类型用于处理某个领域,如机器学习
DataFrame的各种操作会通过Catalyst实现各种优化
- 用scala的模式匹配来表示的
- 提供分析、计划、运行时代码生成
- 可扩展新的数据源、包括结构、半结构
- 用户定义的函数、用户定义的类型(机器学习)
- 函数式的语言非常适合构建编译器
- 目标是将更多的功能整合到Spark SQL中,机器学习、图、流处理等
尽管 Shark 可以实现关系型的优化,但仍然面临三个问题
- 只能用于查询hive catalog中的外部数据源,对Spark中的关系查询没法使用
- 只能通过SQL字符串的方式调用,在模块化的程序中并不方便
- Hive优化是定制化的很难扩展新的特性,如机器学习中的数据类型,或新的数据源
为了在RDD中扩展支持关系处理,Spark SQL需要实现如下目标
- 支持关系处理,使用Spark程序、使用友好的API扩展数据源
- 提供高性能的管理DBMS技术
- 支持新的数据源,如半结构化的,以及扩展的数据源,支持联邦查询
- 使扩展能支持更高级的分析算法,如图处理、机器学习
编程接口
DataFrame 类似关系数据库中的表,包含了schema,可以做更多的操作和优化
每个 DataFrame 对象都表示为一个 逻辑集合,并且是延迟计算的,可以计算一个数据集
比如下面这段:
|
|
user、young 都是DataFrame,他们类似一个AST,而不是scala中的函数,他们都是一个逻辑计划
叫 DataFrame 的原因是,它有点类似于 python、R 中的结构化数据依赖库
对于下面这个操作,计算 女士雇员的数量
|
|
employees 是一个 DataFrame, employees(“ieptId”) 是 deptID列的表达式
表达式对象有各种操作,可以转为其他表达式
这些操作都构建为一个AST,并由Catalyst优化
逻辑计划是立刻生成的,所以如果用户 输入的SQL有误,比如表不存在,字段类型不合法就会立刻报错
DataFrame反射得到对象的类型,下面的usersRDD,就可以确定出name和age的类型
|
|
上述最后两段,读取hive表,和刚刚在内存中创建的对象做join
支持 cache(),可以做列式的压缩,如 字典编码,length长度编码等
很多数据库也支持 UDF,如PG,但是他们需要先用写某个实现,注册进去,然后再执行SQL查询
也就是创建UDF的过程,和查询的过程是分开的,调试不方便
DataFrame 允许创建和查询放在一起:
|
|
Catalyst 优化
设计 catalyst的两个目标
- 可以轻松的将新特性、优化技术加入到Spark SQL中,尤其是各种半结构数据和高级分析
- 可以让开发者定义他们的规则,支持规则下推,支持RBO、CBO,可以增加数据源、数据类型
函数式语言的设计部分是为了构建编译器,scala非常适合这一点
之前想要开发一个 优化会很麻烦,使用 scala则很简单
x + (1 + 2) 对应的表达式为:
|
|
它会被转换为一个 AST
之后对这个 AST应用一个规则
|
|
于是就被转换为 x + 3
可以将很多规则,用于这些AST树之上,编写任意的scala代码,实现模式匹配功能
模式匹配是很多函数语言的特性,可以提取出值的内嵌结构数据类型
通过模式匹配,可以将 树A 转为树B
模式匹配只包含部分功能,所以它只匹配一棵树的 子集
模式匹配只会匹配到那些能匹配上的,对那些不匹配的则会跳过,整个过程是自动的,递归下降的
当转换 达到了一个临界点时,就停止转换
这个临界点 相当于不动点法,从根出发遍历一次后,没有发生变化,那么就停止运行
对一颗树一般执行下面四个阶段
- 分析一个逻辑计划,并解析器引用
- 逻辑计划优化
- 物理执行计划
- 代码生成,编译部分查询 到 java字节码
第三阶段可能会生成多个计划,通过比较其 代码选择一个最合适的,其他的都是基于规则的
分析
- Spark SQL开始于一个关系计算,如通过SQL转换为AST、或者DataFrame 对象构建而成
- 一开始都是包含了未解析的逻辑计划
- 查询 SELECT col FROM sales,会对表和列做绑定
- 从catalog中查询关系表达式中的名字
- 将名字做映射
- 根据唯一ID,决定属性是否映射到相同的值,如 col = col
- 强制类型转换,如 col + 1,无法判断是否正确,只有解析完类型后才能确定
逻辑优化
- 执行一些经典的RBO规则,如常量折叠、谓词下推、列裁剪、null传播、bool表达式简化等
- catalyst使用的是模式匹配方式,增加一个规则比较简单
增加一个优化规则,处理聚合函数中的 decimal类型
|
|
物理执行计划
- 逻辑计划会生成一个或者多个物理计划,然后通过CBO的方式选择一个代价最低的
- 这篇论文所述,当时只将CBO用于join选择的场景
- 物理阶段也有RBO的优化,如map操作的 管道投影、filter等
代码生成
- catalyst依靠 quasiquotes 降低代码生成AST,然后让编译器生成字节码
- 解释执行效率很低,需要遍历语法书,会有大量的分支跳转、虚函数调用
部分代码的编译生成方式:
|
|
如上,一颗语法树:Add(Literal(1), Attribute("x"))
,会变成 1 + row.get("x")
数据类型
- 实现自定义的数据类型,需要实现
createRelation
函数 - 获取一系列key、value参数,以及返回BaseRelation对象
- 成功加载后,就会返回一个schema和一个预估的字节数
- TableScan返回一个表的所有列
- PrunedScan只返回固定的列
- PrunedFilteredScan,只返回固定的列,并带有下推,数据源根据filter只返回一些行
自定义类型
- 用于机器学习、图计算等高级场景
- 可以将自定义的对象,转换为Catalyst Row
一个自定义类型的例子
|
|
高级分析特性
除了上述描述的,Catalyst框架还支持三个重要功能
- 半结构化数据自动推断
- 在机器学习场景下,提供更高级别的API
- 联邦查询
半结构化数据推断
- 大数据场景下的JSON使用频率很高,但有时候使用起来很麻烦,因为可能会增加字段
- 有些人使用jackson库去做映射,将JSON转为Java对象,有的自己实现的很底层的转换
- spark sql可以自动推断数据类型
- 推断算法类似 推断XML类型,但不会执行任意深度的递归调用
- 会尝试推断出各种类型,如果int32可以放下就是integer,不然就是long,再长就是decimal,万能类型string等等
机器学习场景的例子,用管道的方式提取特征,执行逻辑回归
|
|
联邦查询
- 需要join不同数据源,比如提取用户的一些特性信息,这些信息存储在不同的地方
- 正常来说是禁止的,或者查询成本很高
- spark sql 可以用谓词下推的方式,将sql直接发送到mysql端
例子
|
|
之后会将mysql的部分提取出来,下推到mysql端执行
|
|
性能评估、研究、总结
主要评估两种类型
- SQL查询
- spark程序性能
比较
- SparkSQL
- Shark
- Impala
使用的是 AMPLab的大数据测试集
- scan
- aggregation
- join
- UDF-based map-reduce job
scan来看,shark最差,spark sql能接近impala的性能
聚合操作,shark就更差了
join操作,某些场景下spark sql甚至比impala还略快一些
最后UDF,impala没有,这次shark跟spark sql差不多
通过python API实现一段逻辑,运行耗时大概是 scala API的12倍
而data-frame又比scala块一倍
最后是 sql+spark vs data-frame,让他们完成两个阶段的工作
统计单词的频率,sql+spark这种操作需要保存中间结果,所以就慢了很多
真实场景的案例研究
- 一个是近似查询处理
- 基因学
一个是近似查询处理 一个广义的在线聚合操作
- 可以支持任意内嵌的聚合查询
- 通过查看总数据集的一部分执行结果,让用户看到了执行视图的处理过程
- 通过达到指定的精确测量后,停止查询
- 为实现这个功能,将一个关系拆分为一批采样
- 在查询计划中调用 转换,将原始的查询替换为多个查询,每个查询操作连续的采用数据
- 直接替换整个数据集并不能充分的计算出正确的结果
- 标准聚合操作必须同时考虑当前的采样,以及前一批结果的有状态操作
- 根据近似答案可过滤掉的的元组必须被替换某些版本,这些版本是当前评估错误的版本
- 每个转换规则都可以用Catalyst规则表示,用来修改树的算子
- 不急于采样数据的片段会被忽略,
基因学
- 一个通用的操作是基于数字的偏移量,检查重叠的区域
- 这个问题可以表示为join操作,带有不相等的谓词
两个数据集,a和b,其schema为:(start LONG, end LONG)
范围的SQL查询如下:
|
|
很多系统都会使用低效的 nested loop join
算法
而一个特定的优化,可以使用区间树来优化
相关工作
- 程序模型
- Shark也有关系查询和高级分析,但SparkSQL提供了更好的编程风格API
- 同时包含了关系风格和程序模型,可以直接运行在RDD纸上,支持Hive之外的各种数据源
- 受到了DryadLINQ的启动,其查询风格类似LINQ
- Hive和Pig只提供关系查询,编程风格交给UDF完
- 而ASTERIX、Stratosphere提供了半结构化数据,但提供了API让用户方便调用UDF
- SparkSQL通过用原生的代码,直接查询UDF中定义的数据,让SparkSQL跟Spark程序紧密整合
- 也用开发者可以混合关系模型、程序模型API,用相同的语言
- 通过Catalyst优化,提供了代码生成,机器学习数据类型、schema推断等功能
- 数据框架API在单机、集群上都使用同样的API
- 可扩展的优化
- 类似EXODUS、Cascades这样的优化框架,提供简单的可扩展能力
- 之前想要提供类似功能,需要实现一个DSL语言,提供优化编译器将其转换为可运行代码
- SparkSQL的改进是使用了函数语言的特性,大大降低了学习和维护成本
- 大量使用了quasiquotes,这可能是最简单的代码生成方式
- 高级分析
- 提供高级分析算法为大规模集群使用
- 包括迭代算法、图分析等
- 暴露的分析功能和MADlib共享,不过MADlib仅限于PG的UDF,而SparkSQL可以是成熟的Spark程序