原文
MapReduce: Simplified Data Processing on Large Clusters

背景

MapReduce可以运行到大规模集群上,用户只需要编写mapreduce函数

  • map函数,将生成一系列的key/value中间结果
  • reduce按照相同的key读取这些中间结果做合并

程序员无需知道任何并行计算、分布式相关的内容,便可以有效利用集群资源
MapReduce也可以高度可扩展的,可以在几千台机器上运行上百个MapReduce任务

介绍

  • 过去几年,作者们处理了各种web爬虫文档、web请求日志、倒排索引、图结构文档、每台机器的爬虫摘要
  • 这些计算都很简单,但需要几千台机器来处理
  • 而为了应对失败,则需要增加很多复杂的逻辑
  • 为此实现了一套抽象框架,可以执行简单的计算逻辑,而隐藏了复杂的并行细节、容错、数据分布等问题
  • 这个设计来自 Lisp和其他函数语言的 map 和 reduce 特性
  • map用来处理每一个逻辑的行记录然后输出key/value中间结果,reduce读取相同key做合并
  • 这种函数模型只需要简单重试就可以处理容错问题
  • 这种计算框架可以很容易在普通机器上运行

程序模型

这里需要用户手动编写两个函数,mapreduce map

  • 输入为 key/value,输出一些列中间的 key/value对
  • map-reduce函数将所有的中间做分组,将key相同的value放在一起
  • 之后将他们发送给reduce函数

reduce

  • 也是由用户实现,读取中间结果key,以及key关联的一系列value
  • 合并这些value产生更小的结果集,一般每个reduce只产生 1个或0个结果
  • 中间结果由Iterator的方式提供给用户
  • 这可以让我们处理非常大的数据集

一段 map 和 reduce 的伪代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
map(String key, String value):
	// key: document name
	// value: document contents
	for each word w in value:
		EmitIntermediate(w, "1");

reduce(String key, Iterator values):
	// key: a word
	// values: a list of counts
	int result = 0;
	for each v in values:
		result += ParseInt(v);
	Emit(AsString(result));

之后用户指定输入、输出的文件名,以及可选的参数,将自定义的代码和MapReduce库整合到一起
用户提供的 函数也是有类型的

1
2
map (k1,v1)            →     list(k2,v2)
reduce(k2,list(v2))  →      list(v2)

输入的 key和value跟输出的key/value是不同的范围
而中间key/value跟输出的key/value属于相同范围

更多的例子

  • 分布式grep:map函数匹配指定模式然后输出,reduce识别中间结果并最终输出
  • URL访问频率:map函数输出 (URL,1),reduce将所有的value相同的URL做统计并输出(URL,total count)
  • 倒排web连接图:map函数输出(target,source),这里的target是souce文件中指向的各种目标url,reduce输出(target,list(source))
  • 每个主机的条目向量:是一系列(word,frequency)对,map函数对每个文档输出(hostname,term vector)对,reduce传递给定主机的所有文档向量,并加起来丢弃不常用的,生成最终的(hostname,term vector)
  • 倒排索引:map解析每个文档输出(wrod,doc_ID)对,reduce接收所有word对应的pair,排序doc_id,输出(word,list(doc_id)),这个就是倒排索引
  • 分布式排序:map函数根据一条记录,提取出(key,record)对,reduce输出也是这样的,但依赖于分区,排序逻辑

实现

map-reduce的实现有很多不同

  • 一种是依赖于共享内存的小型机器
  • 依赖于NUMA的多处理器机器
  • 更大规模的联机机器集合

谷歌的环境是大量普通机器组成的

  • x86机器+linux环境,2-4G内存
  • 100M、1G的网络,但是平均使用的带宽要更少
  • 集群有几百上千个机器组成,因此失败是常态
  • 存储由廉价的磁盘组成,数据存储在内部开发的分布式文件系统之上,并使用副本来保存高可靠
  • 用户提交job到调度系统,每个job由一系列task组成,调度器会将其映射到集群的一组机器上

执行概览

  • 输入文件会自动拆分为M个子集,map函数并行的在多个机器上执行
  • reduce也是并行的,通过分区函数,将中间结果划分为R个区域
  • hash(key) mod R,分区数量R和分区函数都是可以被指定的

当用户调用 MapReduce函数时,就会触发下面动作

执行过程:

  • 用户程序中的MapReduce函数首先将输入文件split,一般为16M-64M为单位做拆分,然后在多台机器上同时启动程序
  • 其中一个程序副本是master,其余是master分配到了work上,有M个map任务和R各reduce任务
  • master挑选空闲的work分配map和reduce
  • worker上的map任务首先读取一个split文件,然后解析key/value,并将每个key/value交给用户编写的map程序处理
  • 之后map程序会产生中间结果key/value,将他们缓存到内存中
  • 之后key/value对会被写到本地磁盘,被分区函数划分到R个不同的分区中
  • 本地磁盘上的key/value对位置被master转发给reduce程序
  • 之后reduce程序通过RPC调用,从远程的work机器上读取中间缓存结果
  • 当reduce读取完所有数据后,就根据key对中间结果排序,这样所有相同的key都会被分组到一起
  • 排序是必须的,因为不同的key会被映射到相同的reduce任务中,如果内存不够则触发外排序
  • reduce执行iterator,将每个key和对应的list(value)传递给用户编写的reduce函数
  • reduce函数的输出会append到这个reduce分区的最终文件中
  • 当所有的map和reduce任务完成后master唤醒用户程序,用户程序中的mapreduce函数就返回了

当做 mapreduce 任务执行完后,会输出 R个文件,每个都是reduce程序输出的
一般来说不用将这 R个文件再做合并,它们会由另一个MapReduce程序来读取

master数据结构

  • 存储每个map和reduce的状态(进行中、完成、空闲)
  • 识别机器的身份
  • master将map完成的任务交给reduce,所以需要知道这些数据的位置和大小
  • 这些信息被不断的传给reduce

容错

  • master会定期ping worker机器,如果一段时间没反应就worker挂了
  • 这台机器上的所有完成的、未完成的map任务都将变成空闲状态,然后在其他机器上重新执行
  • 已完成的map任务将数据写到了本地磁盘,所以这些数据无法访问,只能重新执行map
  • 而reduce输出的数据是放在全局的文件系统中的,因此无需再执行
  • mapreduce有很好的容错性,即使集群内有几十台机器因为维护等原因不可用,master分发不可达worker上的工作,直到最终完成

master失败

  • master会定期写checkpoint,挂了之后会从新的checkpoint读取数据
  • master只有一个,一旦挂了mapreduce任务就会终止
  • 终止的任务会通知到客户端,客户端检查到后就继续重试

失败时的语义

  • 当用户的map和reduce操作是确定性的,那么重新执行就会得到同样的结果
  • map完成后,这个机器会发送消息给master,报告这R个文件的名字和位置
  • 当reduce任务完成后,会自动重命名临时文件到最终文件
  • 当多个重命名同时操作时,底层的文件系统会保证只包含一次
  • 如果语义是不确定的,就只能保证弱的语义了

本地性

  • 利用GFS来读取数据,GFS本身有3副本保障
  • master根据位置信息,首先将map发送到对应的机器(此机器本身就包含了输入数据)
  • 否则,就将map发送到位置靠近的机器,也就是同一个交换机下的机器(此机器也有输入数据)

任务粒度

  • 将map任务划分为M个,将reduce任务划分为R个
  • M和R应该远大于集群机器数量,一个机器可以同时执行多个任务,这样可以达到动态平衡
  • M和R的数量是有上限的,因为master需要用O(M+R)来决定调动,并用O(M*R)在内存中保持状态
  • M中每个独立的数据大小为16M-64M,R是集群总量的 x 倍,这个x是比较小的数
  • M大概为20W,R为5K,机器数量为2K个

备份任务

  • 导致mapreduce任务执行时间超长的原因是,出现了 掉队者
  • 比如有坏道的磁盘会频繁经历纠正的错误,导致30M/s读取变成1M/s
  • 调度不当,导致任务抢资源,导致其他任务执行的更慢
  • 机器的bug禁用了缓存导致速度慢了100倍
  • 解决策略是当mapreduce操作快完成时增加一个备份task
  • 当主任务或者备份task之一完成时,整个任务都完成了
  • 通过调整资源,这种备份机制就不会太占整体资源,大概百分之几,但可以大幅降低总执行时间
  • 有测试表明,当备份机制关闭后,排序算法慢了44%

细节

分区函数

  • 通过指定的分区函数,执行 hash(key) mod R,可以将数据均匀打散
  • 如果想将相同主机下的URL放在一个文件中,可以用:hash(hostname(url_key)) mod R 来实现

combiner函数

  • 对于词频统计程序来说,map端可能会出现大量重复的中间key,比如<the,1>这样的
  • 一个优化思路是,将这些大量的<the,1>直接在map端做合并,合并完了之后再发给reduce端
  • combiner跟reduce很类似,唯一的区别是reduce输出到最终文件,而combiner输出到中间结果
  • combiner对于某些场景下,可以大幅提速

输入输出类型

  • 支持 text这种类型, key是文件的偏移量,value是一行内容
  • 文本类型输入是按key排序的key/value对
  • 而指定的split就可以确保本文类型的拆分范围,一般是按行拆分
  • 用户也可以自己定义输入类型和输出类型
  • 输入类型不一定要从文件中读取,也可以从数据库,内存中读取,输出也是一样

副作用

  • map端和reduce端都会产生文件,等文件写完后做rename,这种是原子和冥等的
  • 单个task只有一个输出,这样只会对一个文件做rename
  • 单个task产生多个文件不支持,因为需要原子的两阶段提交

跳过错误记录

  • map/reduce程序可能有bug,导致执行失败
  • 而如果代码来自第三方库失败,没有源码则很难解决
  • 对此可以忽略掉过这些错误
  • map/reduce程序会安装一个signal用于捕获错误,当信号被触发后就发送UDP包通知master
  • master发现某个记录频繁出错后,就让其跳过这些记录

本地执行

  • 由于map/reduce真实环境上前台机器,而且是动态分配的,很难调试
  • 于是出现了一个本地版的map/reduce实现
  • 可以顺序执行一系列的map/reduce,并可以限制某些map/reduce的执行
  • 用户可以通过flag打开这些调试命令,并使用类似 gdb 方式调试

状态信息

  • master有一个HTTP服务,可以展示一些内部的状态信息
  • 如多个任务成功、失败、读取/写入字节数、写入中间状态字节数、执行速率等
  • 对每个任务都有一个标准输出、错误文件
  • 对于任务诊断时,这些信息非常有用,还可以帮助用户预估程序所需的机器规模,完成时间等

计数器

  • 提供了一个计数器,用于计算各种事件,比如统计处理单词总数、索引的德语文档数量等
  • map/reduce会独立的统计这些信息,然后通过ping的方式交给master
  • master的监控页面会实时看到这些数据,当map/reduce任务执行完后,master就将结果返回给用户
  • 有些是内置的,比如处理的输入key/键值对的数量,产生的输出文件数量等
  • 用户可以利用这个功能,来检测输入文件数量是否 == 输出文件数量

计数器代码如下:

1
2
3
4
5
6
7
Counter* uppercase;
uppercase = GetCounter("uppercase");
map(String name, String contents):
    for each word w in contents:
        if (IsCapitalized(w)):
            uppercase->Increment();
        EmitIntermediate(w, "1");

性能

这里使用两个测试场景

  • 对1TB的数据做grep,这表示从一个大数据集中提取一小部分数据
  • 对1TB数据做排序,这表示对一类数据做shuffle然后转换成另一类数据
  • 这两种类型都是比较典型的真实场景的例子
  • 机器配置是2Gcpu、4G内存、160G磁盘、100M网络,两层树状网络,顶层100-200G带宽
  • 1800台机器,任意两个机器之间延迟小于1ms,4G内存中大约1-1.5G被其他任务保留

Grep

  • 搜索10亿条记录,比较少见的三字符模式,该模式出现在92337条记录中
  • 输入为M=15000,64M分片,输出为R=1
  • figure 2展示了这个计算时间,Y轴是数据规模,随着机器的增加速度加快,峰值为1764个work,30G/s
  • 当所有的map执行完后大概在80秒后将为0
  • 整个计算任务大概花费150秒,包括1分钟的启动开销
  • 开销是将程序分发到所有机器上,延迟交互GFS打开1000个文件,并做数据本地优化


Figure 2: Data transfer rate over time

排序

  • 大约是10亿条记录,1TB数据量,使用TeraSort benchmark做测试
  • 排序程序总共不到50行代码,3行map函数从一行文本中提取10个字节的排序key
  • 将排序好的key和文本行作为key/value作为中间结果
  • 使用内置的 Identity函数作为reduce的操作符,将中间结果作为传递过去
  • 最终的排序输出写入一组双向复制的GFS文件中,也就总共 2TB
  • 之后按照64M作为分片拆分,M=15000,R=4000,使用分区函数将其分给reduce程序
  • 增加了一个pre-pass的map/reduce操作,收集采用的key,这个key是分布式计算排序时的分裂点
  • figure3(a)展示了整个处理过程,最上方的是处理输入数据,其峰值大约是13G/s
  • 这个处理速度比例比 Grep 程序要小,因为map程序要花费一半的时间用来做I/O操作(写中间结果)
  • 中间的是map将结果传递给reduce(通过网络),第一个峰值是reduce,第二个是reduce的shuffle
  • 下方图是sort,因为正在对结果做排序,第一个shuffle周期结束和写周期开始之间存在延迟
  • 因为本地性优化,输入数据比列高于shuffle比列
  • shuffle比列高于sort后的写入速率,因为存在两副原因,要保证高可用和可靠性
  • 如果底层的系统使用了纠删码,则写入数据会减少

备份任务的效果

  • figure3(b)展示了关闭备份task后的效果
  • 总体跟(a)差不多,但有很长的尾巴,此时没有任何写入活动
  • 大概有5个straggler任务把整个map/reduce任务拖长了,增加了44%的时间

机器失效

  • 在figure3(c)展示了排序算法,执行中有200个任务被人为的kill了(一共有1746个worker)
  • 被杀掉后的任务立刻重启执行了(只有任务被杀,机器还是好的)
  • 这里显示了负的输入比列,因为之前完成的task被杀了,任务需要重新执行
  • 整个计算时间比正常时间只增加了 5%

figure4展示了最近一段时间map/reduce job的变化情况,目前整个集群有上千个job了
大大加快了程序开发和原型开发的速度,也为不会分布式的程序员提供了便利

每个任务结束后,map/reduce库记录了作业使用的计算资源统计信息,如table 1所示

经验和总结

目前谷歌内部试用map/reduce的场景:

  • 大规模的机器学习
  • 谷歌新闻和 Froogle 产品
  • 提取数据的使用报告和流行分析,谷歌 zeitgeist
  • 为实验性的产品和新产品 提取网页的属性,从大量语料中提取地理位置用于本地化搜索
  • 大规模图计算

大规模索引

  • map/reduce的一个重要用户是索引系统,爬虫将20T的数据抓取到GFS上,再用map/reduce分析
  • 索引程序顺序执行 5 - 10个map/reduce任务
  • 索引代码很简单,底层的分布式处理都被隐藏了,执行的map/reduce从3800行C++代码优化到700行
  • map/reduce库资源充足,可以将不相关的计算逻辑隔离
  • 之前计算索引大概需要几个月,现在只需要几天
  • 通过简单的增加机器,就可以提高map/reduce集群处理索引的性能

相关工作

  • 很多系统通过受限的编程模型、受限的并行化计算来实现自动化
  • 比如计算N个元素的前缀,可以在N个机器上计算,来实现N个元素数组的前缀
  • map/reduce跟其他分布式系统相比,提供了更大规模以及容错性
  • 之前的分布式系统规模有限,而且把容错的任务丢给了用户
  • 本地性优化,是将计算下推到靠近本地磁盘的处理单元中,避免了网络传输
  • 我们的备用task有点类似Charlotte 系统的迫切调度机制
  • 但迫切调度的问题是一旦计算失败整个任务就失败了,在map/reduce中修复了这个问题
  • map/reduce实现依赖于一个内部集群管理系统,该系统负责在大量共享机器上分配和运行任务
  • 这个内部管理系统类似 Condor
  • map/reduce的排序类似于 NOW-Sort
  • River提供了一个编程模型,可以通过分布式队列发送数据来相互通信,容错也挺好
  • map/reduce跟River类似,但提供了一个受限的编程模型,这样底层就可以更好的去调度
  • BAD-FS提供了一种跨广域网的编程模型,它跟map/reduce一样都提供本地性优化、冗余任务来实现容错
  • TACC 通过重新执行来实现容错

结论
map/reduce在谷歌内部大规模使用,其成功的原因包括:

  • 使用简单,隐藏了大量的底层分布式特性
  • map/reduce可以解决各种问题,如:web检索、排序、数据检索、机器学习等等
  • map/reduce可以扩展到几千台机器

我也们从这项工作中学习到了一些东西:

  • 限制编程模型可以使并行计算和分布式计算变得容易,也使容错变得简单
  • 网络带宽有限,很多的优化都是基于优化网络的,比如本地性优化,以及将中间结果写到本地磁盘
  • 冗余的执行任务,可以对应机器执行过慢的影响,也可以应对失败和数据丢失