这一部分内容会适当深入说明用户即将面对的MapReduce框架的各个环节。这有助于用户从一个更细的粒度地去实现、配置、调优作业。
我们先看看Mapper和Reducer接口。通常应用程序实现这两个接口需要提供map和reduce方法。
然后我们会讨论下其他的核心接口,包括Job,Partitioner,InputFormat,OutputFormat等接口。
最后,我们将通过讨论MapReduce框架一些有用的功能点比如DistributedCache,IsolationRunner等等来收尾。
Mapper
Mapper将输入的key-alue对集合处理成另外一组key-value对集合。得到的新的key-value对集合只是一个用来过渡的中间记录集。
Map是一类将输入的记录处理为中间记录集的独立任务。生成的中间记录集不需要和输入记录类型一致。一组输入记录经过Map处理,可能会生成0个或多个输出key-value对。
Hadoop MapReduce框架为每个InputSplit创建一个map任务,而每个InputSplit是由作业的InputFormat产生的。
概括地说,作业通过Job.setMapperClass(Class)方法调用Mapper的实现类。然后MapReduce框架就可以在任务中调用map方法来处理InputSplit中每个的key-value对。应用程序可以通过重写cleanup方法来执行必要的清理工作。
输出的key-value集合不必要和输入的key-value集合类型一致。一个输入的key-value对经过处理可能输出0个或多个输出key-value对。通过调用context.write()方法可以收集输出的key-value对。
应用程序可以使用Counter(计数器)来汇报统计信息。
MapReduce框架随后会把一个特定的key所关联的所有中间记录进行分组,并提交给Reducer去处理生成最终结果。用户可以通过Job.setGroupingComparatorClass(Class)来指定一个Comparator来控制分组。
Mapper的输出结果排好序后就被分割开分配给每个Reducer去处理。分割的数目和作业中reduce的任务数目是一致的。用户可以通过实现自定义的Partitioner来控制哪个key(或之后的结果集)被分配给哪个Reducer。
用户也可以选择使用一个combiner来对产生的中间结果集进行一次本地的聚合操作(或者说是本地的reduce操作),这有助于减少Mapper传递给Reducer的数据量。这里需要使用到Job.setCombinerClass()方法。
这些排好序的中间过程的输出结果的保存格式是(key-len, key, value-len, value)。程序可以在Configuration中使用CompressionCodec来控制是否压缩以及怎样压缩输出的中间记录集。
需要多少个Map?
map的数目通常是由输入记录的大小决定的。一般就是输入文件的总块(block)数。
map正常的并行规模大致是每个节点10-100个map。如果map任务对cpu的消耗较小的话,那每个节点可以设置多达300个map。由于每个任务的初始化需要占用一定的时间,所以比较合理的情况是每个map的执行时间最少1分钟。
因此,如果你有10TB的输入数据,每个块的规模是128MB,那么最终会需要82000个map来完成任务。除非使用了Configuration.set(MRJobConfig.NUM_MAPS, int)(这也只不过是给了MapReduce框架一个提示)来将这个数值设置的更高。
Reducer
Reducer将一组有相同key的中间记录集处理为一组更小的结果集。
用户可以通过Job.setNumReduceTasks(int)来控制作业中reduce的数量。
概括地说,在作业中,Job对象可以用通过Job.setReducerClass(Class)来调用Reducer的实现类,并完成对其的初始化。然后MapReduce框架就可以调用reduce()方法来处理每个已分组好的输入键值对<key, (list of values)>。此时应用程序也可以重写cleanup(Context)来执行必要的清理。
Reducer有三个主要的阶段:shuffle,sort和reduce。
Shuffle
Reducer的输入就是Mapper已经排好序的输出。在这个阶段,MapReduce框架通过HTTP为Reducer获取所有Mapper输出中与之相关的块。
Sort
在这个阶段中,框架会对Reducer的输入按key进行分组(因为不同mapper的输出中可能会有相同的key)。
Shuffle和Sort两个阶段是同时进行的;map的输出也是一边被取出一边被合并的。
Secondary Sort
如果需要中间记录集的分组规则和提交reduce前对key进行分组的规则不一致,那么可以通过Job.setSortComparatorClass(Class)指定一个Comparator。再加上Job.setGroupingComparatorClass(Class)可以被用来控制中间记录集如何分组,因此结合两者可以实现按值的二次排序。
Reduce
在这一阶段,框架为已分好组的每个<key, (list of values)>对调用reduce()方法。
reduce任务的输出通常会通过Context.write()写入文件系统。
应用程序可以通过Counter来汇报统计信息。
Reducer的输出是没有经过排序的。
需要多少个Reduce?
reduce的适合数量大致等于 (0.95或1.75) *节点数*每个节点最大的Container数。
使用0.95的话,所有的reduce任务会在所有map任务执行完毕后立即执行,开始传输map的输出结果。使用1.75的话,较快的节点在完成第一轮reduce后就开始第二轮的reduce任务,这样可以达到一个比较好的负载均衡的效果。
增加reduce的数量会增加MapReduce框架的开销,但是可以改善负载均衡,降低因为执行失败带来的负面影响。
上述的比例系数比整体数目稍小一些是为了给框架中的推测任务(speculative-tasks)和失败的任务保留部分的reduce空间。
不使用Reducer
在没有reduce需求的时候,将reduce任务数目设置为0也是合理的。
在这种情况下,map任务的输出结果会被直接存入文件系统中。存放的路径FileOutputFormat.setOutputPath(Job, Path)指定。同样的,在存放入文件系统前,框架也没有对map的输出进行排序。
Partitioner
Partitioner用以分隔键值空间。
Partitioner负责控制对map输出结果的键值进行分割。key(或者一个key的子集)用于产生分区,通常这个过程基于hash函数实现。分区的数目和作业中reduce任务的数目一致。因此它控制着将中间记录的key交给哪个reduce任务去处理。
默认的Partitioner是HashPartitioner。
Counter
Counter是MapReduce用于汇报统计信息的一套机制。
Mapper和Reducer的实现类可以使用Counter来汇报一些统计信息。
Hadoop MapReduce框架自带了一套包含许多有用的Mapper、Reducer和Partitioner的类库。
发表评论