docker
MapReduce:在大型集群上简化数据处理
摘要
MapReduce是一个用于处理和生成大型数据集的编程模型和相关实现。
用户指定一个map函数来处理每个键/值对以生成一组中间键/值对,并指定一个reduce函数来合并与同一中间键关联的所有中间值。正如论文所示,在这个模型中可以表达许多世界任务。
以这种函数式风格编写的程序可以自动并行化,并在一个大型的普通计算机集群上执行。运行时系统负责处理输入数据的分区细节、在一组机器上调度程序执行、处理机器故障以及管理所需的机器间通信。这使得没有任何并行和分布式系统经验的程序员可以轻松地利用大型分布式系统的资源。
我们的MapReduce实现运行在大规模的廉价机器集群上,并且具有高度可扩展性:一个典型的MapReduce计算可以在数千台机器上处理万亿字节级别的数据。程序员发现该系统易于使用:已经实现了数百个MapReduce程序,每天超过数千个MapReduce作业在谷歌的集群中执行。
1. 介绍
在过去的五年中,作者和其他许多人在谷歌已经完成了数百个特殊用途计算,它们用来处理大量原始数据,比如爬取的文档、Web请求日志等,以计算各种派生数据,例如倒排索引、Web文档图形结构的各种表示形式、每个主机爬取页面数量的摘要、给定日期内最频繁查询集等。大多数这样的计算在概念上都很简单。然而,输入数据通常很大,为了在合理的时间内完成计算,必须将计算分布到数百或数千台机器上。如何并行化计算、分发数据和处理故障等,使得原本简单的计算包含着大量复杂的代码去解决上述问题。
为了应对这种复杂性,我们设计了一种新的抽象,它允许我们表达试图执行的简单计算,但隐藏了并行化、容错、数据分发和负载均衡等混乱细节。我们的抽象灵感来自于Lisp和许多其他函数式语言中存在的map和reduce原语。我们意识到,我们大部分的计算都涉及到对输入中的每条逻辑“记录”应用map操作来计算一组中间的键值对,然后对共享同一个键的所有值应用reduce操作来适当地组合派生数据。
我们使用用户指定的Map和Reduce操作的函数式模型,允许我们轻松地将大型计算并行化,并使用重新执行作为容错的主要机制。
派生数据:Derived data(派生数据)指的是由原始数据通过一系列数据加工、转换和计算生成的新数据集合,通常用于更深入地分析和理解原始数据,或者用于生成更有用的信息。
这项工作的主要贡献是一个简单而强大的接口,它能够自动并行化和分发大规模计算,并且该接口的实现,可以在大型普通PC集群上实现高性能。
第2节描述了基本的编程模型并给出了几个例子。第3节描述了针对我们基于集群的计算环境定制的MapReduce接口的实现。第4节描述了我们发现有用的编程模型的几个改进。第5节会针对不同的任务对我们的实现进行性能测试。第6节探讨了在谷歌内部使用MapReduce,包括以它为基础重写生产环境下索引系统的经验。第7节讨论了一些相关和未来的工作。
2. 编程模型
该计算过程接收一组输入键值对,并生成一组输出键值对。MapReduce库的用户将计算表示为两个函数:Map和Reduce。
Map由用户编写,接收一个输入对并生成一组中间键值对。MapReduce将所有与相同的中间键I相关联的中间值分组在一起,并将它们传递给Reduce函数。
Reduce函数也是由用户编写的,它接受一个中间键I和该键对应的一组值。Reduce将这些值合并在一起,以形成一个可能更小的一组值。通常每次调用Reduce只产生零个或一个输出值。通过迭代器向用户的Reduce函数提供中间结果,从而可以处理无法放入内存中的过大的值列表。
通过迭代器向Reduce函数提供中间结果如何理解?
“Iterator” 是一个编程术语,指的是一种对象,该对象可以用于遍历某种数据结构中的元素。通常情况下,使用迭代器可以访问集合、列表、数组等数据结构中的元素,而无需知道它们的实际存储方式。迭代器通常会提供一些方法,如 next() 或者 iter(),这些方法可以让你逐个地访问数据结构中的元素。通过迭代器,你可以在不了解数据结构内部细节的情况下,以一种统一的方式来遍历数据。
迭代器在编程中被广泛应用,常见的编程语言如 Python、Java 和 C++ 中都有迭代器的概念和实现。
2.1 例子
考虑在大量文档中计算每个单词出现次数的问题。用户将编写类似下面的伪代码:
map(String key, String value):
// key: document name
// value: document contents
for each word w in value:
EmitIntermediate(w, "1");
reduce(String key, Iterator values):
// key: a word
// values: a list of counts
int result = 0;
for each v in values:
result += ParseInt(v);
Emit(AsString(result));
Map函数输出每个单词及其出现次数的计数(在这个简单的例子中只有'1’)。Reduce函数将输出的特定单词所有计数相加。
此外,用户编写代码来填写MapReduce规范对象,其中包含输入和输出文件的名称以及可选的调优参数。然后,用户调用MapReduce函数,并将规范对象传递给它。用户的代码与MapReduce库(用C++实现)链接在一起。附录A包含此示例的完整程序文本。
2.2 类型
尽管前面的伪代码是根据字符串输入和输出编写的,但从概念上讲,用户提供的map和reduce函数有关联的类型:
map (k1, v1) ——> list(k2,v2)
reduce (k2, list(v2)) ——> list(v2)
也就是说,输入的键和值与输出的键和值来自不同的域。此外,中间键和值与输出的键和值来自同一个域。
我们的 C++ 实现将字符串传递给用户定义的函数,并由用户代码负责在字符串和适当类型之间进行转换。
2.3 更多示例
以下是一些简单有趣的程序示例,它们可以轻松地表示为MapReduce计算。
分布式 Grep:Map函数如果匹配了提供的模式,则会输出一行。Reduce函数是一个恒等函数,只将提供的中间数据复制到输出。
grep是一种在Unix或Linux操作系统上使用的命令行工具,用于搜索文本文件中的指定字符串。它的名称来源于“Global Regular Expression Print”的缩写,意思是“全局正则表达式打印”。
使用grep,您可以在一个或多个文件中搜索一个字符串,也可以使用正则表达式来更灵活地匹配字符串。grep将输出包含匹配字符串的行,可以使用一些选项来控制输出的格式和行为。
例如,要在文件example.txt中搜索字符串“hello”,可以使用以下命令:
grep "hello" example.txt
如果您想搜索多个文件,可以使用通配符,例如:
grep "hello" *.txt
这将在所有扩展名为txt的文件中搜索字符串“hello”。
URL访问频率计数:map函数处理Web页面请求的日志并输出⟨URL,1⟩
。reduce函数将相同URL的所有值相加,并输出⟨URL,total count⟩
。
反向网络链接图:map函数在名为source的页面中为每个指向target URL的链接输出⟨target,source⟩
。reduce函数将与给定目标URL相关联的所有源URL列表连接起来并输出:⟨target, list(source)⟩
主机的词向量:一个词向量总结了文档或一组文档中出现的最重要单词,作为⟨单词,频率⟩
列表。Map函数为每个输入文档(从文档的URL中提取主机名)输出一个⟨主机名,术语向量⟩
。reduce函数传递给定主机的所有按文件分割的术语向量。它将这些术语向量相加,在丢弃不常见的项后输出最终⟨主机名,术语向量⟩
。
倒排索引:Map函数解析每个文档,并输出一系列⟨单词,文档ID⟩
对。Reduce函数接受给定单词的所有对,排序相应的文档ID并输出一个⟨单词,list(文档ID)⟩
对。所有输出对的集合形成了一个简单的倒排索引。很容易扩展此计算以跟踪单词位置。
分布式排序:map 函数从每个记录中提取键,并输出⟨key, record⟩
对。reduce 函数输出所有对,不作任何改变。该计算依赖于第 4.1 节描述的分区机制和第 4.2 节描述的排序属性。
3. 实现
MapReduce接口有很多不同的实现。正确的选择取决于环境。例如,一种实现可能适用于小型共享内存机器,另一种可能适用于大型NUMA多处理器,而另一种可能适用于更大的联网机器集合。
本节描述了一个针对谷歌中广泛使用的计算环境的实现: 通过交换式以太网[4]将大量的普通PC连接在一起的大规模集群。在我们的环境中:
(1) 机器通常是双核x86处理器,运行Linux系统,每台机器配备2-4 GB的内存。
(2)使用了商用网络硬件——通常在机器级别上是100兆比特/秒或1千兆比特/秒,但平均带宽要更低。
(3)一个集群由数百或数千台机器组成,因此机器故障很常见。
(4)存储由廉价的IDE硬盘直接连接到各个计算机上提供。我们自行开发了一个分布式文件系统[8]来管理这些硬盘上存储的数据。该文件系统使用复制技术,在不可靠的硬件基础之上提供可用性和可靠性。
(5)用户向调度系统提交作业。每个作业由一组任务组成,并由调度程序映射到集群中的一组可用机器上。
3.1 执行流
Map调用自动将输入数据分成M个分片来在多台机器上进行分布式处理。不同的机器可以并行地处理这些输入部分。 Reduce调用则通过使用一个划分函数(例如,hash(key) mod R)将中间键空间划分为R个片段来进行分布式处理。用户指定划分数(R)和划分函数。
图1展示了我们实现的MapReduce操作的整体流程。当用户程序调用MapReduce函数时,将会发生以下一系列动作(图1中的数字标签对应下面列表中的数字):
- 用户程序中的MapReduce库首先将输入文件分成M个部分,每个片段通常为16MB到64MB(用户可以通过一个可选参数来控制)。然后,它在一组机器上启动该程序的多个副本。
- 这个程序的其中一个副本是特别的——Master。其余的是工作进程(Worker),由Master分配任务。需要分配M个Map任务和R个Reduce任务。Master挑选空闲的Worker,并给每个Worker分配一个Map任务或一个Reduce任务。
- 被分配Map任务的Worker读取对应输入分片的内容。它从输入数据中解析出键值对,并将每个键值对传递给用户定义的Map函数。Map函数产生的中间键值对缓存在内存中。
- 这些缓冲对周期性地写入本地磁盘,通过分区函数将它们划分为R个区域。这些缓冲对在本地磁盘上的位置将传回给Master, Master负责将这些位置转发给Reduce工作进程。
- 当一个Reduce Worker从Master收到这些位置的通知时,它会使用远程过程调用从Map Worker的本地磁盘读取缓冲数据。当Reduce Worker读取完所有的中间数据后,它会根据中间键对数据进行排序,以便将相同键的数据都聚集在一起。排序是必要的,因为通常会有许多不同的键映射到同一个Reduce任务。如果中间数据量太大,无法放入内存,则使用外部排序。
- Reduce Worker遍历排序后的中间数据,对于每个唯一的中间键,它将键和相应的中间值集合传递给用户的Reduce函数。 Reduce函数的输出被追加到此Reduce分区的最终输出文件中。
- 当所有的Map任务和Reduce任务都完成后,Master唤醒用户程序。此时,用户程序中的MapReduce调用返回到用户代码继续执行。
成功完成后,MapReduce执行的输出可在R任务的输出文件中获得(每个Reduce任务一个输出文件,文件名由用户指定)。通常,用户不需要将这些R任务输出文件组合成一个文件——他们通常将这些文件作为输入传递给另一个MapReduce调用,或者从另一个能够处理分成多个文件的输入的分布式应用程序中使用它们。
3.2 Master数据结构
Master维护了多个数据结构。对于每个Map任务和Reduce任务,它都存储了状态(空闲、进行中或已完成),以及工作机器的标识(对于非空闲任务)。
Master是将中间文件区域的位置从Map任务传播到Reduce任务的通道。因此,对于每个完成的Map任务,Master存储了由该Map任务产生的R中间文件区域的位置和大小。随着Map任务完成,这些位置和大小信息会得到更新。这些信息被逐步地推送给正在执行Reduce任务的Worker。
3.3 容错
MapReduce库的设计目标是使用成百上千台机器处理海量数据,因此它必须能够优雅地容忍机器故障。
Worker节点故障
Master进程周期性地ping每个Woker进程。如果在一定的时间内没有收到某个Worker的响应,则Master将该Worker标记为故障。由该Woker进程完成的任务Map任务都会被重置回其初始的空闲状态(只要该Woker节点故障,则在这个Woker节点上执行的Map任务都需要重新进行计算,无论该任务是否完成),因此可以将该任务分配给其他工作进程。类似地,任何正在故障的Woker节点上执行的Map任务或Reduce任务也会被重置为空闲状态,从而可以被重新调度。
已完成的Map任务在发生故障时会重新执行,因为它们的输出存储在故障机器的本地磁盘上,因此无法访问。已完成的Reduce任务不需要重新执行,因为它们的输出存储在全局文件系统中。
当一个Map任务首先被Worker A*执行,然后由于A*故障而稍后被Worker ***B***执行,所有执行Reduce任务的Worker都会被通知重新执行。任何还没有从Worker A读取数据的Reduce任务都会从Worker B读取数据。
MapReduce能够抵御大规模的工作节点故障。例如,在一次MapReduce操作期间,正在运行的集群上进行网络维护导致同时80台机器在几分钟内无法访问。MapReduce Master只需要重新执行不可达节点所做的工作,并继续推进,最终完成MapReduce操作。
Master节点故障
对Master节点维护的数据结构执行周期性检查程序是很容易的。如果Master死亡,可以从上一个检查点状态开始启动一个新的副本。然而,考虑到只有一个Master,它不太可能发生故障(相比数千个工作节点法的故障的可能性,仅有一个的Master节点发生故障的概率要小很多);因此,在我们当前的实现中,如果Master故障就会中止MapReduce的计算。客户端可以检查这种情况,并在需要时重试MapReduce操作。
故障情况下的语义
当用户提供的Map和Reduce操作符是其输入值的确定函数时,我们的分布式实现产生的输出与整个程序无错误地顺序执行产生的输出相同。
我们依靠Map和Reduce任务输出的原子提交来实现这个属性。每个正在进行的任务都将其输出写入私有临时文件。一个Reduce任务生成一个这样的文件,而一个Map任务生成R个这样的文件(每个Reduce任务一个)。当一个Map任务完成时,Worker会向Master发送一条消息,并在消息中包含R个临时文件的名称。如果Master收到一个已经完成的Map任务的完成消息,它会忽略这个消息。否则,它会在Master数据结构中记录这R个文件名。
当一个Reduce任务完成时,Reduce Worker会自动将临时输出文件重命名为最终输出文件。如果在多台机器上执行相同的Reduce任务,则会对同一个最终输出文件执行多个重命名调用。我们依赖于底层文件系统提供的原子重命名操作,以确保最终的文件系统状态仅包含一个Reduce任务执行产生的数据。
我们绝大多数的Map和Reduce操作都是确定的,而且我们的语义在这种情况下等同于顺序执行,这使得程序员可以很容易地推断程序的行为。当Map and/or Reduce操作不确定时,我们提供了较弱但仍然合理的语义。在不确定操作存在的情况下,特定Reduce任务R*1*的输出等价于顺序执行该不确定程序产生的R1的输出。然而,不同的Reduce任务R2的输出可能对应于该不确定程序的不同顺序执行所产生的R**2输出。
考虑Map任务M和Reduce任务R1和R2。设e(R**i)是提交的Ri的执行(只有一次这样的执行)。出现较弱的语义是因为e(R**1)可能读已经取了一次执行M产生的输出,而e(R**2)可能读取由不同M产生的输出。
3.4 局部性
在我们的计算环境中,网络带宽是一个相对稀缺的资源。我们利用输入数据(由GFS [8]管理)存储在组成集群的机器的本地磁盘这一事实来节省网络带宽。GFS将每个文件划分为64MB的块,并在不同的机器上存储每个块的若干副本(通常为3个副本)。MapReduce Master将输入文件的位置信息考虑在内,并尝试将Map任务调度到包含相应输入数据副本的机器上。如果做不到这一点,它会尝试将Map任务安排在该任务输入数据的副本附近(例如,在与包含数据的机器在同一个网络交换机上的工作机器上)。当在集群中有相当一部分工作节点上运行大型MapReduce操作时,大多数输入数据都是在本地读取的,不会消耗网络带宽。
利用空间局部性提高性能,就近取材,节省带宽提高传输效率。
3.5 任务粒度
如上所述,我们将Map阶段分为M个部分,Reduce阶段细分为R个部分。理想情况下,M和R应该比工作机器的数量大得多。让每个工作节点执行许多不同的任务,可以改善动态负载平衡,还可以在一个工作节点失败时加快恢复速度:它已经完成的许多Map任务可以分配到所有其他工作节点上。
由于Master必须做出O(M + R)的调度决策,并如上所述保持内存中的O(M∗R)状态,因此在我们的实现中,对M和R的最大值进行了限制。(然而,内存使用量的常数因子很小:状态中占据O(M∗R)一块包含每个map任务/reduce任务对应大约1字节的数据。)
此外,因为每个Reduce任务的输出最终都保存在一个单独的输出文件中,因此用户通常会对R进行限制。在实践中,我们倾向于选择M值,使得每个任务大约有16MB到64MB的输入数据(这样上面描述的局部性优化是最有效的),并且我们将R设置为预期使用的工作机器数量的小倍数。我们经常使用M = 200000和R = 5000,在2000台工作机器上执行MapReduce计算。
3.6 备份任务
导致MapReduce操作总时间延长的一个常见原因是“掉队者”(straggler):一台机器在计算中完成最后几个Map或Reduce任务之一需要花费异常长的时间。掉队的人出现的原因有很多。例如,有坏磁盘的机器可能会经历频繁的可纠正错误,这将使其读取性能从30MB/s降至1MB/s。集群调度系统可能已经在该机器上安排了其他任务,由于对CPU、内存、本地磁盘或网络带宽的竞争,导致执行MapReduce代码的速度变慢。我们最近遇到的一个问题是机器初始化代码中的一个bug,它导致处理器缓存被禁用:受影响的机器上的计算速度下降了100倍以上。
我们有一个通用的机制来缓解掉队者的问题。当一个MapReduce操作接近完成时,Master会调度剩余正在进行中的任务的备份执行。每当主执行或备份执行完成时,任务都会标记为已完成。我们对这种机制进行了调整,使其通常会将操作使用的计算资源增加不超过几个百分点。我们发现,这大大减少了完成大型MapReduce操作的时间。举个例子,当禁用备份任务机制时,5.3节中描述的sort程序需要多花44%的时间才能完成。
4. 改进
虽然简单地编写Map和Reduce函数提供的基本功能已经足够满足大多数需求,但我们发现了一些有用的扩展。本节将对此进行描述。
4.1 分区函数
MapReduce的用户指定他们所需的Reduce任务/输出文件数量(R)。使用中间键上的分区函数将数据分配到这些任务中。提供了一个默认的分区函数,它使用哈希(例如,“hash(key) mod R”)来进行分区。这往往会导致相当平衡的分区。然而,在某些情况下,按照密钥的其他功能对数据进行划分是有用的。例如,有时输出键是URL,并且我们希望所有单个主机条目都在同一个输出文件中结束。为支持此类情况,MapReduce库的用户可以提供特殊的划分函数。例如,使用“hash(Hostname(urlkey)) mod R”作为划分函数会使来自同一主机的所有URL最终出现在同一个输出文件中。
4.2 顺序保证
我们保证在给定的分区内,中间键/值对按照递增的键顺序进行处理。这种排序保证使得生成每个分区的有序输出文件变得容易,当输出文件格式需要支持通过键进行高效随机访问查找时非常有用,或者用户发现将数据排序后使用更加方便。
4.3 Combiner函数
在某些情况下,每个Map任务产生的中间键会有明显的重复,并且用户指定的Reduce函数是可交换的和可结合的。2.1节中的单词计数就是一个很好的例子。由于词频往往遵循Zipf分布,每个Map任务将产生数百或数千条<the, 1>
形式的记录。所有这些计数将通过网络发送到单个Reduce任务,然后通过Reduce函数将它们相加得到一个数字。我们允许用户指定一个可选的Combiner函数,在数据通过网络发送之前对其进行部分合并。
Combiner函数在每台执行Map任务的机器上执行。通常使用相同的代码来实现Combiner和Reduce函数。Reduce函数和Combiner函数的唯一区别在于MapReduce库如何处理函数的输出。Reduce函数的输出会写入最终的输出文件。Combiner函数的输出被写入一个中间文件,然后发送给Reduce任务。
部分合并可以显著提高某些类型的MapReduce操作的速度。附录A给出了一个使用Combiner的例子。
4.4 输入和输出的类型
MapReduce库提供了对多种不同格式的输入数据的支持。例如,“文本”模式输入将每一行视为一个键/值对:键是文件中的偏移量,而值是该行内容。另一种常见的支持格式是存储按键排序的键/值对序列。每个输入类型的实现都知道如何将自己分割成有意义的范围以便作为单独Map任务进行处理(例如,文本模式下的范围拆分确保仅在行边界处发生范围拆分)。用户可以通过提供简单读取器接口实现来添加新输入类型的支持,但大多数用户只使用少量预定义输入类型之一。
读取器不一定需要从文件中提供数据。例如,很容易定义一个从数据库或内存映射数据结构中读取记录的读取器。
类似地,我们支持一组输出类型以生成不同格式的数据,并且用户代码很容易添加新输出类型的支持。
4.5 副作用
在某些情况下,MapReduce的用户会发现用map和/或reduce操作生成辅助文件作为额外的输出非常方便。我们依赖于应用程序编写者来使这些副作用具有原子性和幂等性。通常,应用程序会写入一个临时文件,并在完全生成该文件后原子重命名该文件。
我们不支持由单个任务产生的多个输出文件的原子两阶段提交。因此,产生具有跨文件一致性要求的多个输出文件的任务应该是确定的。这种限制在实践中从来就不是问题。
4.6 跳过错误记录
有时,用户代码中的bug会导致Map或Reduce函数在处理某些记录时崩溃。这些bug会导致MapReduce操作无法完成。通常的做法是修复bug,但有时这是不可行的;也许这个bug是在一个源代码不可用的第三方库中。此外,有时忽略一些记录是可以接受的,例如在对大型数据集进行统计分析时。我们提供了一种可选的执行模式,MapReduce库可以检测到哪些记录会导致确定性的崩溃,并跳过这些记录以继续执行。
每个工作进程都安装一个信号处理程序,用于捕获分段违规和总线错误。在调用用户Map或Reduce操作之前,MapReduce库会将参数的序号存储在一个全局变量中。如果用户代码生成了一个信号,信号处理程序会发送一个“last gasp”UDP包给MapReduce Master。当Master发现某条记录有多个失败时,就表示当它下一次重新执行对应的Map或Reduce任务时,应该跳过这条记录。
4.7 本地执行
在Map或Reduce函数中调试问题可能会很棘手,因为实际的计算发生在一个分布式系统中,通常在数千台机器上进行,并由Master节点动态地做出工作分配决策。为了便于调试、性能分析和小规模测试,我们开发了一种替代的MapReduce库实现,它可以在本地机器上顺序执行所有MapReduce操作的工作。提供给用户控件以便将计算限制于特定的Map任务中。用户使用一个的特殊标志调用其程序,然后可以轻松使用任何有用的调试或测试工具(例如gdb)。
4.8 状态信息
Master运行一个内部HTTP服务器,并导出一组状态页面供用户使用。状态页面显示计算的进度,例如已经完成了多少任务,有多少正在进行中,输入的字节数,中间数据的字节数,输出的字节数,处理速率等。页面还包含到每个任务生成的标准错误和标准输出文件的链接。用户可以使用这些数据来预测计算需要多长时间,以及是否应该向计算中添加更多的资源。这些页面还可以用来确定何时计算比预期慢得多。
此外,顶层状态页显示了哪些Worker失败了,以及它们失败时正在处理哪些Map和Reduce任务。当试图诊断用户代码中的错误时,这些信息很有用。
4.9 计数器
MapReduce库提供了一个计数器功能,用于计算各种事件的发生次数。例如,用户代码可能想要计算处理的单词总数或索引的德语文档数量等。为了使用这个功能,用户代码创建一个命名的counter对象,然后在Map和/或Reduce函数中适当地增加计数器。例如:
Counter* uppercase;
uppercase = GetCounter("uppercase");
map(String name, String contents):
for each word w int contents:
if (IsCapitalized(w)):
uppercase -> Increment();
EmitIntermediate(w, "1");
各个工作节点的计数器值会周期性地传播到主节点(通过ping响应传递)。Master汇总成功的Map和Reduce任务的计数器值,并在MapReduce操作完成时将它们返回给用户代码。当前计数器的值也显示在Master状态页面上,以便人们可以观察实时计算的进展。当汇总计数器值时,Master消除了重复执行同一个Map或Reduce任务的影响,以避免重复计算。(重复执行可能是由于使用了备份任务,或者由于失败而重新执行任务。)
一些计数器值由MapReduce库自动维护,例如处理的输入键值对的数量和生成的输出键值对的数量等。
用户发现计数器功能对于检查MapReduce操作的行为非常有用。例如,在某些MapReduce操作中,用户代码可能希望确保生成的输出数据对的数量正好等于处理的输入数据对的数量,或者希望处理的德语文档占处理的文档总数的比例在可容忍的范围内。
5. 性能
在本节中,我们测量了在大型集群上运行的两个计算中MapReduce的性能。一个计算搜索约1TB的数据以寻找特定模式。另一个计算对约1TB的数据进行排序。
这两类程序组成了MapReduce用户编写的实际程序的一个大子集,一类程序将数据从一种表示形式转换为另一种表示形式,而另一类程序从大型数据集中提取少量有趣的数据。
5.1 集群配置
所有的程序都在一个由大约1800台机器组成的集群上执行。每台机器都有两个2GHz的Intel Xeon处理器,启用超线程技术,4GB内存,两个160GB的IDE磁盘和一个千兆以太网连接。这些机器被排列在一个两级的树状交换网络中,在根处可获得大约100-200Gbps的总带宽。所有的机器都在同一个托管设施中,因此任何机器之间的往返时间都少于一毫秒。
在4GB的内存中,大约有1-1.5GB分配给了集群上运行的其他任务。这些程序是在周末下午执行的,那时CPU、磁盘和网络基本都是空闲的。
5.2 Grep
Grep程序扫描1010个100字节的记录,寻找一个相对少见的三字符模式(该模式出现在92,337个记录中)。输入被分成大约64MB大小的块(M = 15000),整个输出放置在一个文件中(R = 1)。
图2显示了计算随时间的进展情况。Y轴显示扫描输入数据的速度。随着分配给MapReduce计算的机器越来越多,计算速率逐渐提高,当分配了1764个工作节点时,峰值超过30GB/s。当Map任务完成时,速率开始下降,并在计算大约80秒后降为零。整个计算从开始到结束大约需要150秒。这包括大约一分钟的启动开销。包括将程序传播到所有的工作节点花费的开销,以及包括与GFS交互打开1000个输入文件并获取本地化优化所需的信息的延迟。
5.3 Sort
Sort程序对1010条100字节的记录(大约1TB的数据)进行排序。该程序以TeraSort基准测试[10]建模而成。
排序程序由少于50行用户代码组成。3行Map函数从文本行中提取一个10字节的排序键,并将键和原始文本行作为中间键值对输出。我们使用内置的恒等函数作为Reduce操作符。这个函数传递中间的键值对作为输出的键值对。最终的排序输出被写入一组双路复制的GFS文件(即,2TB的数据被写入程序的输出)。
和之前一样,输入数据被分成64MB的块(M = 15000)。我们将排序后的输出分成4000个文件(R = 4000)。分区函数使用键的初始字节将其分割为R个部分之一。
我们的分区函数针对此基准测试具有内置的键分布知识。在一般的排序程序中,我们将添加一个预处理MapReduce操作,该操作将收集一些键的样本,并使用样本键的分布来计算最终排序传递的切分点。图3(a)显示了排序程序正常执行过程中的进度情况。左上角图表显示输入读取速率。速率峰值约为13GB/s,并且由于所有Map任务都在200秒之前完成,因此衰减得很快。注意,与Grep相比,输入速率较低。这是因为排序Map任务大约花费了一半时间和I/O带宽将中间输出写入本地磁盘。而grep对应的中间输出大小可以忽略不计。
中左的图表显示了数据通过网络从Map任务发送到Reduce任务的速率。这种洗牌(shuffle)在第一个Map任务完成时就开始了。图中的第一个峰值是第一批约1700个Reduce任务(整个MapReduce分配了约1700台机器,每台机器一次最多执行一个Reduce任务)。计算进行到大约300秒后,第一批Reduce任务中的一些完成了,我们开始为剩下的Reduce任务打乱数据。所有的洗牌都是在计算进行到大约600秒时完成的。
左下角的图表显示了Reduce任务将排序后的数据写入最终输出文件的速度。在第一个洗牌周期结束和写入周期开始之间有一个延迟,因为机器正在忙着对中间数据进行排序。在一段时间内,写操作会以2 ~ 4 GB/s的速率持续进行。所有的写操作都在计算过程的850秒左右完成。算上启动开销,整个计算耗时891秒。这与TeraSort基准测试[18]当前报告的最佳结果1057秒相似。
有几点需要注意:由于我们的局部性优化,输入速率高于洗牌速率和输出速率——大多数数据从本地磁盘读取,并绕过相对带宽受限的网络。shuffle速率比输出速率高,因为输出阶段写入两个排序数据的副本(为了可靠性和可用性的原因,我们创建了两个输出副本)。我们写两个副本,因为这是底层文件系统提供的可靠性和可用性机制。如果底层文件系统使用纠删码[14]而不是复制,则用于写入数据的网络带宽需求将会减少。
5.4 备份任务的影响
在图3(b)中,我们展示了禁用备份任务的排序程序执行。执行流程与图3(a)所示类似,只是有一个非常长的尾部,在那里几乎没有任何写入活动发生。在960秒后,除了5个Reduce任务外,所有任务都已完成。然而这最后几个掉队的任务要晚300秒才能完成。整个计算需要1283秒,比过去的时间增加了44%。
5.5 机器故障
在图3(c)中,我们展示了sort程序的执行过程,在执行几分钟后,我们故意杀死了1746个工作进程中的200个。由于只是进程被杀死,机器仍然正常运行,底层的集群调度器立即在这些机器上重启新的工作进程。
由于先前完成的一些Map任务消失了(因为相应的Map工作进程被杀死),需要重新执行这些Map任务,因此工作进程死亡会显示为一个负的输入速率。这种Map任务的重新执行相对较快。整个计算在933秒内完成,包括启动开销(仅比正常执行时间增加了5%)。
6. 经验
我们在2003年2月编写了MapReduce库的第一个版本,并在2003年8月对其进行了重大改进,包括本地局部优化、跨工作节点执行任务的动态负载均衡等。从那时起,我们就惊喜地发现MapReduce可以广泛地应用于我们所处理的各种问题。它已在谷歌的广泛领域中使用,包括:
- 大规模机器学习问题
- 对于Google新闻和Froogle产品的分类归并问题
- 提取用于生成热门查询报告的数据(例如谷歌时代精神)
- 为新的实验和产品提取网页属性(例如从大量网页中提取地理位置用于本地化搜索),以及大规模图计算。
图4展示了我们主要源代码管理系统中独立的MapReduce程序数量随时间显著增长的情况,从2003年初的0个到截至2004年9月底近900个不同实例。MapReduce之所以如此成功,是因为它使得编写简单程序并在半小时内高效地在一千台机器上运行成为可能,大大加快了开发和原型设计周期。此外,它允许没有分布式和/或并行系统经验的程序员轻松利用大量资源。每次作业结束时,MapReduce库会记录有关作业使用的计算资源的统计信息。在表1中,我们展示了谷歌于2004年8月运行的部分MapReduce作业的一些统计数据。
6.1 大规模索引
到目前为止,MapReduce最重要的用途之一是完全重写了生产索引系统,为谷歌网络搜索服务生成数据结构。索引系统将爬行系统检索到的大量文档作为输入,存储为一组GFS文件。这些文件的原始内容超过20TB的数据。索引过程以5到10个MapReduce操作的顺序运行。使用MapReduce(而不是以前版本的索引系统中的临时分布式遍历)有以下几个好处:
- 索引的代码更简单、更小、更容易理解,因为处理容错、分布和并行的代码隐藏在MapReduce库中。例如,在使用MapReduce表达时,一个阶段的计算从大约3800行c++代码减少到大约700行。
- MapReduce库的性能足够好,我们可以将概念上不相关的计算分开进行,而不是将它们混在一起以避免额外的数据传递。这使得改变索引过程变得容易。例如,在旧索引系统中需要几个月才能完成的一个更改,在新系统中只需要几天就可以实现。
- 索引过程变得更加容易操作,因为大多数由机器故障、机器速度慢和网络故障引起的问题都可以由MapReduce库自动处理,而无需操作人员干预。此外,通过向索引集群添加新机器,可以很容易地提高索引过程的性能。
7. 相关工作
许多系统提供了受限的编程模型,并利用这些限制来实现计算的自动并行化。例如,可以使用并行前缀计算在N个处理器上以log N时间计算N个元素数组的所有前缀的关联函数[6,9,13]。MapReduce可以看作是基于我们在大规模实际计算中获得的经验模型的简化和提炼。更重要的是,我们提供了一个可扩展到数千个处理器的容错实现。相比之下,大多数并行处理系统只在较小规模上实现,并将处理机器故障的细节留给程序员来处理。
批量同步编程[17]和一些MPI原语[11]提供了更高级的抽象,使程序员更容易编写并行程序。这些系统与MapReduce的一个关键区别是MapReduce利用受限的编程模型来自动并行化用户程序并提供透明的容错。
我们的局部性优化从活动磁盘[12,15]等技术中获得灵感,在这些技术中,计算被推到靠近本地磁盘的处理元素中,以减少通过I/O子系统或网络发送的数据量。我们在普通处理器上运行,这些处理器直接连接了少量磁盘,而不是直接在磁盘控制器处理器上运行,但总体方法类似。
我们的备份任务机制类似于Charlotte系统[3]所采用的立即调度机制。简单的立即调度的缺点之一是,如果某个任务导致重复失败,则整个计算无法完成。我们用跳过坏记录的机制解决了这个问题的一些情况。
MapReduce实现依赖于一个内部的集群管理系统,该系统负责在大量的共享机器上分发和运行用户任务。虽然本文不是重点,但集群管理系统在本质上类似于其他系统,如Condor [16]。
MapReduce库中的排序功能与NOW-Sort[1]的操作类似。源机器(Map Worker)对要排序的数据进行分区,并将其发送给Rreduce Woker之一。每个Reduce Worker都会在本地对数据进行排序(如果可能的话,在内存中)。当然,NOW-Sort没有用户可定义的Map和Reduce函数,这些函数使我们的库广泛适用。
River[2]提供了一个编程模型,其中进程通过分布式队列发送数据来相互通信。与MapReduce类似,River系统试图提供良好的平均性能,即使在异构硬件或系统扰动带来的不均匀性存在的情况下也是如此。River通过仔细调度磁盘和网络传输以实现平衡的完成时间来实现这一点。MapReduce有不同的方法。通过约束编程模型,MapReduce框架能够将问题划分为大量的细粒度任务。这些任务被动态地调度到可用的Worker上,以便更快的Worker处理更多的任务。受限制的编程模型还允许我们在临近作业结束时安排重复执行任务,这在存在不均匀(例如缓慢或卡住的工人)的情况下大大减少了完成时间。
BAD-FS[5]的编程模型与MapReduce非常不同,与MapReduce不同的是,它的目标是跨广域网执行作业。无论如何,它们有两个基本的相似之处。(1)两个系统都使用重复执行来从故障导致的数据丢失中恢复。(2)两种算法都采用了局部性调度策略,减少了在拥塞的网络链路上发送的数据量。
TACC[7]是一个旨在简化高可用网络服务构建的系统。和MapReduce一样,它依赖于重复执行来实现容错。
8. 结论
MapReduce编程模型已经在谷歌成功地应用于许多不同的目的。我们把这次成功归因于几个原因。首先,该模型易于使用,即使没有并行和分布式系统经验的程序员也可以使用,因为它隐藏了并行化、容错、局部性优化和负载均衡等细节。其次,各种各样的问题都可以很容易地表达为MapReduce计算。例如,MapReduce用于为谷歌的web搜索服务、排序、数据挖掘、机器学习等许多系统生成数据。第三,我们开发了一个MapReduce的实现,它可以扩展到由数千台机器组成的大型集群。该实现有效地利用了这些机器资源,因此适合于处理在谷歌上遇到的许多大型计算问题。
我们从这项工作中学到了一些东西。首先,对编程模型进行限制,使计算的并行化、分布化和容错变得容易;其次,网络带宽是稀缺资源。因此,我们的系统中的许多优化都旨在减少通过网络发送的数据量:局部性优化允许我们从本地磁盘读取数据,将中间数据的一个副本写入本地磁盘可以节省网络带宽。第三,重复执行可以降低个别慢速机器的影响,并处理机器故障和数据丢失。
致谢
Josh Levenberg在修改和扩展用户级MapReduce API方面发挥了重要作用,他根据自己使用MapReduce的经验和其他人提出的改进建议提供了许多新功能。MapReduce从GFS[8]中读取输入,并将输出写入文件系统。我们要感谢Mohit Aron、Howard Gobioff、Markus Gutschke、David Kramer、Shun-Tak Leung和Josh Redstone在开发GFS过程中所做的工作。我们还要感谢Percy Liang和Olcan Sercinoglu在开发MapReduce使用的集群管理系统方面所做的工作。Mike Burrows、Wilson Hsieh、Josh Leven- berg、Sharon Perl、Rob Pike和Debby Wallach对本文的早期草稿提供了有益的评论。匿名的OSDI审稿人和我们的指导教师Eric Brewer提供了许多有用的建议,指出了论文可以改进的地方。最后,我们感谢谷歌工程组织中MapReduce的所有用户,感谢他们提供有用的反馈、建议和bug报告。
参考文献
[1] Andrea C. Arpaci-Dusseau, Remzi H. Arpaci-Dusseau, David E. Culler, Joseph M. Hellerstein, and David A. Pat- terson. High-performance sorting on networks of work- stations. In Proceedings of the 1997 ACM SIGMOD In- ternational Conference on Management of Data, Tucson, Arizona, May 1997.
[2] Remzi H. Arpaci-Dusseau, Eric Anderson, Noah Treuhaft, David E. Culler, Joseph M. Hellerstein, David Patterson, and Kathy Yelick. Cluster I/O with River: Making the fast case common. In Proceedings of the Sixth Workshop on Input/Output in Parallel and Distributed Systems (IOPADS ’99), pages 10–22, Atlanta, Georgia, May 1999.
[3] Arash Baratloo, Mehmet Karaul, Zvi Kedem, and Peter Wyckoff. Charlotte: Metacomputing on the web. In Pro- ceedings of the 9th International Conference on Parallel and Distributed Computing Systems, 1996.
[4] Luiz A. Barroso, Jeffrey Dean, and Urs Ho ̈lzle. Web search for a planet: The Google cluster architecture. IEEE Micro, 23(2):22–28, April 2003.
[5] John Bent, Douglas Thain, Andrea C.Arpaci-Dusseau, Remzi H. Arpaci-Dusseau, and Miron Livny. Explicit control in a batch-aware distributed file system. In Pro- ceedings of the 1st USENIX Symposium on Networked Systems Design and Implementation NSDI, March 2004.
[6] Guy E. Blelloch. Scans as primitive parallel operations. IEEE Transactions on Computers, C-38(11), November 1989.
[7] Armando Fox, Steven D. Gribble, Yatin Chawathe, Eric A. Brewer, and Paul Gauthier. Cluster-based scal- able network services. In Proceedings of the 16th ACM Symposium on Operating System Principles, pages 78– 91, Saint-Malo, France, 1997.
[8] Sanjay Ghemawat, Howard Gobioff, and Shun-Tak Le- ung. The Google file system. In 19th Symposium on Op- erating Systems Principles, pages 29–43, Lake George, New York, 2003.
[9] S. Gorlatch. Systematic efficient parallelization of scan and other list homomorphisms. In L. Bouge, P. Fraigni- aud, A. Mignotte, and Y. Robert, editors, Euro-Par’96. Parallel Processing, Lecture Notes in Computer Science 1124, pages 401–408. Springer-Verlag, 1996.
[10] Jim Gray. Sort benchmark home page. http://research.microsoft.com/barc/SortBenchmark/.
[11] William Gropp, Ewing Lusk, and Anthony Skjellum.
Using MPI: Portable Parallel Programming with the Message-Passing Interface. MIT Press, Cambridge, MA, 1999.
[12] L. Huston, R. Sukthankar, R. Wickremesinghe, M. Satya- narayanan, G. R. Ganger, E. Riedel, and A. Ailamaki. Di- amond: A storage architecture for early discard in inter- active search. In Proceedings of the 2004 USENIX File and Storage Technologies FAST Conference, April 2004.
[13] Richard E. Ladner and Michael J. Fischer. Parallel prefix computation. Journal of the ACM, 27(4):831–838, 1980.
[14] Michael O. Rabin. Efficient dispersal of information for security, load balancing and fault tolerance. Journal of the ACM, 36(2):335–348, 1989.
[15] Erik Riedel, Christos Faloutsos, Garth A. Gibson, and David Nagle. Active disks for large-scale data process- ing. IEEE Computer, pages 68–74, June 2001.
[16] Douglas Thain, Todd Tannenbaum, and Miron Livny. Distributed computing in practice: The Condor experi- ence. Concurrency and Computation: Practice and Ex- perience, 2004.
[17] L.G.Valiant.Abridgingmodelforparallelcomputation. Communications of the ACM, 33(8):103–111, 1997.
[18] Jim Wyllie. Spsort: How to sort a terabyte quickly. http://alme1.almaden.ibm.com/cs/spsort.pdf.
附录A 单词频率
本节包含一个程序,它计算命令行指定的一组输入文件中每个不同单词的出现次数。
# include "mapreduce/mapreduce.h"
// 用户的Map函数
class WordCounter : public Mapper {
public:
virtual void Map(const MapInput & input) {
const string& text = input.value();
const int n = text.size();
for (int i = 0; i < n; ) {
// 跳过头部空格
while ((i < n) && isspace(text[i]))
i++;
// 找到单词结束位置
int start = i;
while ((i < n) && !isspace(text[i]))
i++;
if (start < i)
Emit(text.substr(start, i-start), "1");
}
}
};
REGISTER_MAPPER(WordCounter);
// 用户的Reduce函数
class Adder : public Reducer {
virtual void Reduce (ReduceInput* input) {
// 使用相同的键遍历所有元素,并将它们的值相加
int64 value = 0;
while (!input->done) {
value += StringToInt(input->value());
input->NextValue();
}
// 输出input->key()的总和
Emit(IntToString(value));
}
};
REGISTER_REDUCER(Adder);
int main(int argc, char** argv) {
ParseCommandLineFlags(argc, argv);
MapReduceSpecification spec;
// 将输入文件列表存储到spec中
for (int i = 1; i < argc; i++) {
MapReduceInput* input = spec.add_input();
input->set_format("text");
input->set_filepattern(argv[i]);
input->set_mapper_class("WordCounter");
}
// 指定输出文件
// /gfs/test/freq-00000-of-00100
// /gfs/test/freq-00001-of-00100
// ...
MapReduceOutput* out = spec.output();
out->set_filebase("/gfs/test/freq");
out->set_num_tasks(100);
out->set_format("text");
out->set_reducer_class("Adder");
// 可选:在Map任务中共进行部分求和以节省网络带宽
out->set_combiner_class("Adder");
// 调整参数:每个任务最多使用2000台机器和100MB内存
spec.set_machines(2000);
spec.set_map_megabytes(100);
spec.set_reduce_megabytes(100);
// 运行
MapReduceResult result;
if (!MapReduce(spec, &result)) abort();
// 完成:result结构包含有关计数器、所用时间、使用的机器数量等信息
return 0;
}