配置
一个Job就表示了一个MapReduce的作业配置。
Job是用户向Hadoop框架描述一个MapReduce作业如何执行的最主要的接口。框架会尽力按Job的描述去忠实地执行一个作业,但是:
- 一些配置参数可能会被管理员标记为final(了解下final参数),这意味着不能被更改;
- 作业的一些参数可以被直接设置(比如Job.setNumReduceTasks(int)),而另一些参数则与框架或者作业的其他参数之间存在微妙的联系,设置起来比较复杂(比如Configuration.set(JobContext.NUM_MAPS, int))。
Job通常是用来指定Mapper,Combiner(如果有的话),Partitioner,Reducer,InputFormat,OutputFormat的实现类。Job可以指定一组输入文件,使用的方法包括(FileInputFormat.setInputPaths(Job, Path…)/ FileInputFormat.addInputPath(Job, Path))和(FileInputFormat.setInputPaths(Job, String…)/ FileInputFormat.addInputPaths(Job, String))。Job还可以指定结果输出文件的地址(FileOutputFormat.setOutputPath(Path))。
此外,Job还可以用来为作业做一些高级的配置。比如使用哪个Comparator,哪些文件可以被放进DistributedCache,中间结果或者作业的输出结果是否可以被压缩以及怎样压缩,作业是否允许预防性任务的执行(setMapSpeculativeExecution(boolean))/ setReduceSpeculativeExecution(boolean)),每个任务的最大尝试次数(setMaxMapAttempts(int)/ setMaxReduceAttempts(int))等等。
当然用户可以使用Configuration.set(String, String)/ Configuration.get(String)来set/get应用程序需要的任意参数。然而,有些参数需要慎重使用,比如DistributedCache更适用于大规模的只读数据。
执行
MRAppMaster是在一个单独的jvm上以子进程的形式来执行Mapper/Reducer的任务的。
子任务会继承父MRAppMaster的执行环境。当然,用户也可以自行设置子jvm上的附加选项。这需要使用mapreduce.{map|reduce}.java.opts和Job中的配置参数(比如使用-Djava.library.path=<>将一个非标准路径设为运行时的链接用以搜索共享库)。如果mapreduce.{map|reduce}.java.opts的参数包含“@taskid@”标识符,它会被替换成MapReduce中taskid的值。
下面是一个包含多个参数和替换的例子,其中包括:jvm GC日志;无密码启动JVM JMX代理程序以使之可以连接到jconsole或者类似的工具来查看子进程的内存、线程并得到线程的dump;分别设置map和reduce的子jvm的堆内存为512MB和1024MB;还添加了一个附加路径到子jvm的java.library.path中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
<property> <name>mapreduce.map.java.opts</name> <value> -Xmx512M -Djava.library.path=/home/mycompany/lib -verbose:gc -Xloggc:/tmp/@taskid@.gc -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false </value> </property> <property> <name>mapreduce.reduce.java.opts</name> <value> -Xmx1024M -Djava.library.path=/home/mycompany/lib -verbose:gc -Xloggc:/tmp/@taskid@.gc -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false </value> </property> |
内存管理
用户或者管理员可以使用mapreduce.{map|reduce}.memory.mb设置子任务以及其子进程的虚拟内存的最大值。注意,这里设置的值是对每一个进程都有效的。mapreduce.{map|reduce}.memory.mb设置的值的单位是MB。还有,这个值必须大于或等于java虚拟机的-Xmx的值,否则虚拟机可能无法启动。
注意:mapreduce.{map|reduce}.java.opts只用于设置MRAppMaster启动的子任务。为守护进程设置内存的选项请参看文档《Hadoop守护进程环境配置》
框架一些部分的可用内存也是可配置的。在map和reduce任务中,程序的性能可能会被一些参数的调整所影响——比如是并发相关选项或者将数据写入硬盘的频率。监控文件系统关系到一个作业的计数器——尤其是关系到从map到reduce的byte数——是至关重要的。这些数据对于协调我们刚才提到的参数有着非常宝贵的价值。
Map参数
一个map所产生的某条记录会在被序列化后置入缓存,相关的元数据则会被存放入accounting buffer中。就像下面这些选项所描述的,若是缓存中序列化数据的大小或者元数据的大小超过了某个阈值,而此时map还在持续输出记录,缓存中的内容就会被排序并写入硬盘生成一个临时文件,这个过程可以称为“溢写(spill)”。如果缓存已被填满并且产生溢写,此时map线程就会被阻塞。在map任务执行完成后,缓存中现存的所有的记录都会被写入硬盘,而后硬盘上所有相关的临时文件会被合并成一个独立的文件。较小的溢写可以减少map执行的时间,但是一个较大的缓存又会占用Mapper的可用内存。
名称
名称 |
类型 |
描述 |
mapreduce.task.io.sort.mb |
int |
累积的序列化数据以及accounting buffer中存储的map输出数据的大小。单位MB |
mapreduce.map.sort.spill.percent |
float |
排序缓存区大小的百分比。一旦超过,则会有一个线程将缓存内容溢出写入硬盘 |
其他需要注意的内容:
- 如果超出了溢写阈值而还有一个溢写正在执行,那么将只做收集工作直到当前的溢写完成。举个例子:如果mapreduce.map.sort.spill.percent的值被设置为0.33,缓存的数据已经达到了阈值但此时仍有溢写在执行,那么下一次的溢写会一起处理这次收集的数据(即是缓存中0.66的内容),而不是产生额外的溢写。换句话说,这里定义的阈值是触发性的,不是阻塞性的。
- 如果一条记录的大小超过了排序缓存区的的容量,那么这将会先触发一个溢写,这条记录将会被溢写到一个单独的文件。而这条记录是否会被combiner优先处理则是不确定的。
Shuffle/Reduce参数
就像之前所描述的,每个reduce会取得Partitioner通过和HTTP分配给它的map的输出并且分阶段的将这些输出合并到硬盘上。如果map输出中间集的压缩设置被开启的话,那么map每一次的输出都会解压到内存。下面的这些选项影响到reduce处理前将输出合并到硬盘的频率以及在reduce处理时分配给map输出的内存大小。
名称 |
类型 |
描述 |
mapreduce.task.io.soft.factor |
int |
指定在硬盘上每次合并的临时文件的数目。它限制了合并时打开文件的数目以及压缩编码方式。如果文件的数目超过了这个限制,合并的工作将会提交给几个不同的进程。虽然这个限制也适用于map,但是大部分作业最好是做下设置,这样这个限制将不可能达到。 |
mapreduce.reduce.merge.inmem.thresholds |
int |
在被合并写入硬盘前,被抓取到内存中的排好序的map输出记录的总数。就像之前提到的溢写阈值,这个参数也不是被定义为一个切分单元,而是一个触发值。在实践中,这个值通常被设置的非常高(1000)或者被禁用(0),因为在内存中对片段的合并往往要比在硬盘上的合并开销低得多(查看下文的注意事项)。这个阈值只是影响了shuffle阶段内存中进行文件合并的频率。 |
mapreduce.reduce.shuffle.merge.percent |
float |
在内存文件合并前放置到内存中的map输出的阈值,这个值表示了内存中用于存储map输出的百分比。因为map的输出若无法装入内存就会被停滞,这个值设置过高的话又会降低获取和合并的平行度。相反的,这个值达到1.0时对于那些输入可以完全放入内存的reduce是有效的。这个参数只是影响了shuffle阶段内存中进行文件合并的频率。 |
mapreduce.reduce.shuffle.input.buffer.percent |
float |
在shuffle阶段分配给map输出缓存的堆内存百分比——相对于通常在mapreduce.reduce.java.opts中设置的堆内存的大小。尽管还需要留一些内存给框架,通常把这个值设置的足够大还是很有用的,这样可以存储以存储更大更多的map输出,减少溢写的内容。 |
mapreduce.reduce.input.buffer.percent |
float |
在reduce阶段用于存储map输出的最大堆内存的百分比。在reduce开始时缓存中的map输出内容将会被合并到硬盘直到剩下的内容小于这个值。默认情况下,所有的map输出都会被合并到硬盘,以为reduce留出足够大的内存空间。对于内存需求比较小的reduce,应当适当的增加这个值,以避免写入硬盘从而提升性能。 |
其他需要注意的内容:
如果一条map输出大于用于复制map输出的内存的25%,它将会被直接写入到硬盘。
当使用combiner的时候,关于使用高合并阈值以及大缓存的说法就不太可靠了。因为合并发生在所有的map输出被获取前,而combiner是在溢写到硬盘的时候运行。在一些案例中,用户通过将资源用于对map输出的combin而不是简单增加缓存的大 小可以优化reduce的执行时间。使用combier有助于减小溢写,实现溢写和获取的并行处理。
在合并内存中的map输出到硬盘并启动reduce时,如果有碎片要溢写而硬盘上至少有mapreduce.task.io.sort.factor个临时文件,此时需要进行一次中间合并。仍在内存中的map输出也会成为中间合并的一部分。
配置参数
下面的属性是每个task执行时使用的本地作业参数:
名称 |
类型 |
描述 |
mapreduce.job.id |
String |
作业ID |
mapreduce.job.jar |
String |
在job目录中job.jar的位置 |
mapreduce.job.local.dir |
String |
job指定的共享存储空间 |
mapreduce.task.id |
String |
task id |
mapreduce.task.attempt.id |
String |
task尝试ID |
mapreduce.task.is.map |
boolean |
是否是map task |
mapreduce.task.partition |
int |
task在job中使用的id |
mapreduce.map.input.file |
String |
map读取的文件名 |
mapreduce.map.input.start |
long |
map输入的数据块的起始位置偏移量 |
mapreduce.map.input.length |
long |
map输入的数据块的字节数 |
mapreduce.task.output.dir |
String |
task的临时输出目录 |
注意:在使用streaming执行作业时,mapreduce的参数名称会被变形。参数名称中的点“.”会被下划线“_”替换。比如,mapreduce.job.id会被写为“mapreduce_job_id”,而mapreduce.job.jar会被写为“mapreduce_job_jar”。要获取一个streaming作业中的Mapper或Reducer的值需要使用下划线的参数名称。
任务日志
NodeManager会读取标准输出(stdout)和错误(stderr)流,以及任务的系统日志并记录到日志文件夹${HADOOP_LOG_DIR}/userlogs下
lib的分配
DistributedCache也可以用于map或reduce任务中用到的jar包和本地库的分配。子JVM通常会把它现在工作目录添加到java.library.path和LD_LIBRARY_PATH。因此缓存的库也可以被System.loadLibrary或System.load方法调用。要了解更多的关于怎样通过分布式缓存调用共享库的细节参看《Native Libraries》文档。
提交和监控
Job 是用户提交的作业与ResourceManager交互的主要接口。
Job提供了提交作业,追踪作业进程,查看组件任务报表和日志,获取MapReduce集群状态信息等功能。
提交
作业提交过程包括:
- 检查作业输入输出配置;
- 为作业计算InputSplit的值;
- 如果需要的话,为作业的DistributedCache建立必要的统计信息;
- 复制作业的jar包和配置文件到FileSystem上的MapReduce系统目录下;
- 提交作业到ResourceManager并视需求监控其状态。
作业的历史文件也会记录到用户指定目录mapreduce.jobhistory.intermediate-done-dir和mapreduce.jobhistory.done-dir,默认是作业输出目录。
用户使用下面的命令可以查看指定目录下的历史日志摘要:
1 |
$ mapred job -history output.jhist |
这条指令可以打印作业的细节,以及失败的和被杀死的任务的细节。要查看作业更多的细节,比如成功的任务、每个任务尝试的次数(task attempt)等可以使用下面这条命令:
1 |
$ mapred job -history all output.jhist |
通常,用户使用Job创建应用程序,配置作业属性,提交作业并监控其运行。
控制
有时候用一个单独的MapReduce作业不能完成复杂的任务,此时用户需要将多个MapReduce作业连接在一起。这很简单,因为作业通常是输出到分布式文件系统中,这个作业的输出反过来也可以作为下一个作业的输入。
然而,这也意味着,保证作业完成(成功或失败)的责任就直接落到了客户身上。在这种情况下,可以使用的作业控制选项有:
- Job.submit():提交作业给集群,并立即返回;
- Job.waitForCompletion(boolean):提交作业到集群,并等待其完成后再返回。
#######
发表评论