MapReduce论文
背景
MapReduce可以运行到大规模集群上,用户只需要编写map
和reduce
函数
- map函数,将生成一系列的key/value中间结果
- reduce按照相同的key读取这些中间结果做合并
程序员无需知道任何并行计算、分布式相关的内容,便可以有效利用集群资源
MapReduce也可以高度可扩展的,可以在几千台机器上运行上百个MapReduce任务
介绍
- 过去几年,作者们处理了各种web爬虫文档、web请求日志、倒排索引、图结构文档、每台机器的爬虫摘要
- 这些计算都很简单,但需要几千台机器来处理
- 而为了应对失败,则需要增加很多复杂的逻辑
- 为此实现了一套抽象框架,可以执行简单的计算逻辑,而隐藏了复杂的并行细节、容错、数据分布等问题
- 这个设计来自 Lisp和其他函数语言的 map 和 reduce 特性
- map用来处理每一个逻辑的行记录然后输出key/value中间结果,reduce读取相同key做合并
- 这种函数模型只需要简单重试就可以处理容错问题
- 这种计算框架可以很容易在普通机器上运行
程序模型
这里需要用户手动编写两个函数,map
和reduce
map
- 输入为 key/value,输出一些列中间的 key/value对
- map-reduce函数将所有的中间做分组,将key相同的value放在一起
- 之后将他们发送给reduce函数
reduce
- 也是由用户实现,读取中间结果key,以及key关联的一系列value
- 合并这些value产生更小的结果集,一般每个reduce只产生 1个或0个结果
- 中间结果由Iterator的方式提供给用户
- 这可以让我们处理非常大的数据集
一段 map 和 reduce 的伪代码如下:
|
|
之后用户指定输入、输出的文件名,以及可选的参数,将自定义的代码和MapReduce库整合到一起
用户提供的 函数也是有类型的
|
|
输入的 key和value跟输出的key/value是不同的范围
而中间key/value跟输出的key/value属于相同范围
更多的例子
- 分布式grep:map函数匹配指定模式然后输出,reduce识别中间结果并最终输出
- URL访问频率:map函数输出 (URL,1),reduce将所有的value相同的URL做统计并输出(URL,total count)
- 倒排web连接图:map函数输出(target,source),这里的target是souce文件中指向的各种目标url,reduce输出(target,list(source))
- 每个主机的条目向量:是一系列(word,frequency)对,map函数对每个文档输出(hostname,term vector)对,reduce传递给定主机的所有文档向量,并加起来丢弃不常用的,生成最终的(hostname,term vector)
- 倒排索引:map解析每个文档输出(wrod,doc_ID)对,reduce接收所有word对应的pair,排序doc_id,输出(word,list(doc_id)),这个就是倒排索引
- 分布式排序:map函数根据一条记录,提取出(key,record)对,reduce输出也是这样的,但依赖于分区,排序逻辑
实现
map-reduce的实现有很多不同
- 一种是依赖于共享内存的小型机器
- 依赖于NUMA的多处理器机器
- 更大规模的联机机器集合
谷歌的环境是大量普通机器组成的
- x86机器+linux环境,2-4G内存
- 100M、1G的网络,但是平均使用的带宽要更少
- 集群有几百上千个机器组成,因此失败是常态
- 存储由廉价的磁盘组成,数据存储在内部开发的分布式文件系统之上,并使用副本来保存高可靠
- 用户提交job到调度系统,每个job由一系列task组成,调度器会将其映射到集群的一组机器上
执行概览
- 输入文件会自动拆分为M个子集,map函数并行的在多个机器上执行
- reduce也是并行的,通过分区函数,将中间结果划分为R个区域
- hash(key) mod R,分区数量R和分区函数都是可以被指定的
执行过程:
- 用户程序中的MapReduce函数首先将输入文件split,一般为16M-64M为单位做拆分,然后在多台机器上同时启动程序
- 其中一个程序副本是master,其余是master分配到了work上,有M个map任务和R各reduce任务
- master挑选空闲的work分配map和reduce
- worker上的map任务首先读取一个split文件,然后解析key/value,并将每个key/value交给用户编写的map程序处理
- 之后map程序会产生中间结果key/value,将他们缓存到内存中
- 之后key/value对会被写到本地磁盘,被分区函数划分到R个不同的分区中
- 本地磁盘上的key/value对位置被master转发给reduce程序
- 之后reduce程序通过RPC调用,从远程的work机器上读取中间缓存结果
- 当reduce读取完所有数据后,就根据key对中间结果排序,这样所有相同的key都会被分组到一起
- 排序是必须的,因为不同的key会被映射到相同的reduce任务中,如果内存不够则触发外排序
- reduce执行iterator,将每个key和对应的list(value)传递给用户编写的reduce函数
- reduce函数的输出会append到这个reduce分区的最终文件中
- 当所有的map和reduce任务完成后master唤醒用户程序,用户程序中的mapreduce函数就返回了
当做 mapreduce 任务执行完后,会输出 R个文件,每个都是reduce程序输出的
一般来说不用将这 R个文件再做合并,它们会由另一个MapReduce程序来读取
master数据结构
- 存储每个map和reduce的状态(进行中、完成、空闲)
- 识别机器的身份
- master将map完成的任务交给reduce,所以需要知道这些数据的位置和大小
- 这些信息被不断的传给reduce
容错
- master会定期ping worker机器,如果一段时间没反应就worker挂了
- 这台机器上的所有完成的、未完成的map任务都将变成空闲状态,然后在其他机器上重新执行
- 已完成的map任务将数据写到了本地磁盘,所以这些数据无法访问,只能重新执行map
- 而reduce输出的数据是放在全局的文件系统中的,因此无需再执行
- mapreduce有很好的容错性,即使集群内有几十台机器因为维护等原因不可用,master分发不可达worker上的工作,直到最终完成
master失败
- master会定期写checkpoint,挂了之后会从新的checkpoint读取数据
- master只有一个,一旦挂了mapreduce任务就会终止
- 终止的任务会通知到客户端,客户端检查到后就继续重试
失败时的语义
- 当用户的map和reduce操作是确定性的,那么重新执行就会得到同样的结果
- map完成后,这个机器会发送消息给master,报告这R个文件的名字和位置
- 当reduce任务完成后,会自动重命名临时文件到最终文件
- 当多个重命名同时操作时,底层的文件系统会保证只包含一次
- 如果语义是不确定的,就只能保证弱的语义了
本地性
- 利用GFS来读取数据,GFS本身有3副本保障
- master根据位置信息,首先将map发送到对应的机器(此机器本身就包含了输入数据)
- 否则,就将map发送到位置靠近的机器,也就是同一个交换机下的机器(此机器也有输入数据)
任务粒度
- 将map任务划分为M个,将reduce任务划分为R个
- M和R应该远大于集群机器数量,一个机器可以同时执行多个任务,这样可以达到动态平衡
- M和R的数量是有上限的,因为master需要用O(M+R)来决定调动,并用O(M*R)在内存中保持状态
- M中每个独立的数据大小为16M-64M,R是集群总量的 x 倍,这个x是比较小的数
- M大概为20W,R为5K,机器数量为2K个
备份任务
- 导致mapreduce任务执行时间超长的原因是,出现了 掉队者
- 比如有坏道的磁盘会频繁经历纠正的错误,导致30M/s读取变成1M/s
- 调度不当,导致任务抢资源,导致其他任务执行的更慢
- 机器的bug禁用了缓存导致速度慢了100倍
- 解决策略是当mapreduce操作快完成时增加一个备份task
- 当主任务或者备份task之一完成时,整个任务都完成了
- 通过调整资源,这种备份机制就不会太占整体资源,大概百分之几,但可以大幅降低总执行时间
- 有测试表明,当备份机制关闭后,排序算法慢了44%
细节
分区函数
- 通过指定的分区函数,执行 hash(key) mod R,可以将数据均匀打散
- 如果想将相同主机下的URL放在一个文件中,可以用:hash(hostname(url_key)) mod R 来实现
combiner函数
- 对于词频统计程序来说,map端可能会出现大量重复的中间key,比如<the,1>这样的
- 一个优化思路是,将这些大量的<the,1>直接在map端做合并,合并完了之后再发给reduce端
- combiner跟reduce很类似,唯一的区别是reduce输出到最终文件,而combiner输出到中间结果
- combiner对于某些场景下,可以大幅提速
输入输出类型
- 支持 text这种类型, key是文件的偏移量,value是一行内容
- 文本类型输入是按key排序的key/value对
- 而指定的split就可以确保本文类型的拆分范围,一般是按行拆分
- 用户也可以自己定义输入类型和输出类型
- 输入类型不一定要从文件中读取,也可以从数据库,内存中读取,输出也是一样
副作用
- map端和reduce端都会产生文件,等文件写完后做rename,这种是原子和冥等的
- 单个task只有一个输出,这样只会对一个文件做rename
- 单个task产生多个文件不支持,因为需要原子的两阶段提交
跳过错误记录
- map/reduce程序可能有bug,导致执行失败
- 而如果代码来自第三方库失败,没有源码则很难解决
- 对此可以忽略掉过这些错误
- map/reduce程序会安装一个signal用于捕获错误,当信号被触发后就发送UDP包通知master
- master发现某个记录频繁出错后,就让其跳过这些记录
本地执行
- 由于map/reduce真实环境上前台机器,而且是动态分配的,很难调试
- 于是出现了一个本地版的map/reduce实现
- 可以顺序执行一系列的map/reduce,并可以限制某些map/reduce的执行
- 用户可以通过flag打开这些调试命令,并使用类似 gdb 方式调试
状态信息
- master有一个HTTP服务,可以展示一些内部的状态信息
- 如多个任务成功、失败、读取/写入字节数、写入中间状态字节数、执行速率等
- 对每个任务都有一个标准输出、错误文件
- 对于任务诊断时,这些信息非常有用,还可以帮助用户预估程序所需的机器规模,完成时间等
计数器
- 提供了一个计数器,用于计算各种事件,比如统计处理单词总数、索引的德语文档数量等
- map/reduce会独立的统计这些信息,然后通过ping的方式交给master
- master的监控页面会实时看到这些数据,当map/reduce任务执行完后,master就将结果返回给用户
- 有些是内置的,比如处理的输入key/键值对的数量,产生的输出文件数量等
- 用户可以利用这个功能,来检测输入文件数量是否 == 输出文件数量
计数器代码如下:
|
|
性能
这里使用两个测试场景
- 对1TB的数据做grep,这表示从一个大数据集中提取一小部分数据
- 对1TB数据做排序,这表示对一类数据做shuffle然后转换成另一类数据
- 这两种类型都是比较典型的真实场景的例子
- 机器配置是2Gcpu、4G内存、160G磁盘、100M网络,两层树状网络,顶层100-200G带宽
- 1800台机器,任意两个机器之间延迟小于1ms,4G内存中大约1-1.5G被其他任务保留
Grep
- 搜索10亿条记录,比较少见的三字符模式,该模式出现在92337条记录中
- 输入为M=15000,64M分片,输出为R=1
- figure 2展示了这个计算时间,Y轴是数据规模,随着机器的增加速度加快,峰值为1764个work,30G/s
- 当所有的map执行完后大概在80秒后将为0
- 整个计算任务大概花费150秒,包括1分钟的启动开销
- 开销是将程序分发到所有机器上,延迟交互GFS打开1000个文件,并做数据本地优化
Figure 2: Data transfer rate over time
排序
- 大约是10亿条记录,1TB数据量,使用TeraSort benchmark做测试
- 排序程序总共不到50行代码,3行map函数从一行文本中提取10个字节的排序key
- 将排序好的key和文本行作为key/value作为中间结果
- 使用内置的 Identity函数作为reduce的操作符,将中间结果作为传递过去
- 最终的排序输出写入一组双向复制的GFS文件中,也就总共 2TB
- 之后按照64M作为分片拆分,M=15000,R=4000,使用分区函数将其分给reduce程序
- 增加了一个pre-pass的map/reduce操作,收集采用的key,这个key是分布式计算排序时的分裂点
- figure3(a)展示了整个处理过程,最上方的是处理输入数据,其峰值大约是13G/s
- 这个处理速度比例比 Grep 程序要小,因为map程序要花费一半的时间用来做I/O操作(写中间结果)
- 中间的是map将结果传递给reduce(通过网络),第一个峰值是reduce,第二个是reduce的shuffle
- 下方图是sort,因为正在对结果做排序,第一个shuffle周期结束和写周期开始之间存在延迟
- 因为本地性优化,输入数据比列高于shuffle比列
- shuffle比列高于sort后的写入速率,因为存在两副原因,要保证高可用和可靠性
- 如果底层的系统使用了纠删码,则写入数据会减少
备份任务的效果
- figure3(b)展示了关闭备份task后的效果
- 总体跟(a)差不多,但有很长的尾巴,此时没有任何写入活动
- 大概有5个straggler任务把整个map/reduce任务拖长了,增加了44%的时间
机器失效
- 在figure3(c)展示了排序算法,执行中有200个任务被人为的kill了(一共有1746个worker)
- 被杀掉后的任务立刻重启执行了(只有任务被杀,机器还是好的)
- 这里显示了负的输入比列,因为之前完成的task被杀了,任务需要重新执行
- 整个计算时间比正常时间只增加了 5%
figure4展示了最近一段时间map/reduce job的变化情况,目前整个集群有上千个job了
大大加快了程序开发和原型开发的速度,也为不会分布式的程序员提供了便利
每个任务结束后,map/reduce库记录了作业使用的计算资源统计信息,如table 1所示
经验和总结
目前谷歌内部试用map/reduce的场景:
- 大规模的机器学习
- 谷歌新闻和 Froogle 产品
- 提取数据的使用报告和流行分析,谷歌 zeitgeist
- 为实验性的产品和新产品 提取网页的属性,从大量语料中提取地理位置用于本地化搜索
- 大规模图计算
大规模索引
- map/reduce的一个重要用户是索引系统,爬虫将20T的数据抓取到GFS上,再用map/reduce分析
- 索引程序顺序执行 5 - 10个map/reduce任务
- 索引代码很简单,底层的分布式处理都被隐藏了,执行的map/reduce从3800行C++代码优化到700行
- map/reduce库资源充足,可以将不相关的计算逻辑隔离
- 之前计算索引大概需要几个月,现在只需要几天
- 通过简单的增加机器,就可以提高map/reduce集群处理索引的性能
相关工作
- 很多系统通过受限的编程模型、受限的并行化计算来实现自动化
- 比如计算N个元素的前缀,可以在N个机器上计算,来实现N个元素数组的前缀
- map/reduce跟其他分布式系统相比,提供了更大规模以及容错性
- 之前的分布式系统规模有限,而且把容错的任务丢给了用户
- 本地性优化,是将计算下推到靠近本地磁盘的处理单元中,避免了网络传输
- 我们的备用task有点类似Charlotte 系统的迫切调度机制
- 但迫切调度的问题是一旦计算失败整个任务就失败了,在map/reduce中修复了这个问题
- map/reduce实现依赖于一个内部集群管理系统,该系统负责在大量共享机器上分配和运行任务
- 这个内部管理系统类似 Condor
- map/reduce的排序类似于 NOW-Sort
- River提供了一个编程模型,可以通过分布式队列发送数据来相互通信,容错也挺好
- map/reduce跟River类似,但提供了一个受限的编程模型,这样底层就可以更好的去调度
- BAD-FS提供了一种跨广域网的编程模型,它跟map/reduce一样都提供本地性优化、冗余任务来实现容错
- TACC 通过重新执行来实现容错
结论
map/reduce在谷歌内部大规模使用,其成功的原因包括:
- 使用简单,隐藏了大量的底层分布式特性
- map/reduce可以解决各种问题,如:web检索、排序、数据检索、机器学习等等
- map/reduce可以扩展到几千台机器
我也们从这项工作中学习到了一些东西:
- 限制编程模型可以使并行计算和分布式计算变得容易,也使容错变得简单
- 网络带宽有限,很多的优化都是基于优化网络的,比如本地性优化,以及将中间结果写到本地磁盘
- 冗余的执行任务,可以对应机器执行过慢的影响,也可以应对失败和数据丢失