提交作业到队列
用户将作业提交到队列。队列是作业的集合,允许系统添加特定的功能,比如,队列通过ACL决定哪些用户可以提交作业。通常主要由Hadoop调度器使用队列。
Hadoop的配置信息使用了一个单独的托管队列,被称为“default”。队列的名称是由mapreduce.job.queuename定义的(Hadoop的site配置)。一些作业调度器支持多重队列,比如Capacity Scheduler。
作业设置要提交到的队列有两种方式:通过mapreduce.job.queuename属性或者通过Configuration.set(MRJobConfig.QUEUE_NAME, String) API。队列名称的设置是可选的。如果作业提交的时候没有指定队列名称,将会被提交到“default”队列。
Counters
Counters是由MapReduce框架或应用程序定义的全局计数器。每个Counter都可以是任一种Enum类型。同一特定类型的Counter可以汇集到一个组,其类型为Counters.Group。
应用程序可以定义任意(Enum类型的)的Counters,并通过map或者reduce方法中的Counters.incrCounter(Enum, long)和Counters.incrCounter(String, String, long)进行更新。之后框架会汇总这些全局计数器。
DistributedCache
DistributedCache可将具体应用相关的,Size大的、只读的文件有效地分布放置。
DistributedCache是MapReduce框架提供的功能,可以缓存应用程序所需的文件(文本、压缩文件、jar包等等)。
应用程序在Job中通过url(hdfs://)指定需要被缓存的文件。DistributedCache假定由hdfs://格式url指定的文件已经在 FileSystem上了。
在slave节点上作业的所有任务执行之前,MapReduce框架会把一些必要的文档复制到相应节点。框架的这个过程运行效率是很高的,主要有两点原因:每个作业的文件只拷贝一次,可以缓存压缩文件并在slave节点上解压文件。
DistributedCache根据缓存文档修改的时间戳进行跟踪。在作业运行期间,当前应用程序或其他外部程序不能修改缓存文件。
DistributedCache可以用来分发简单的只读数据或是文本,也可以分发类型复杂的文件,比如压缩文件和jar包。压缩文件(zip, tar, tgz and tar.gz文件)会在在slave节点上解压。这些文件可以被设置操作权限。
用户可以通过mapreduce.job.cache.{files |archives}来分发文件或压缩包。如果要分发多个文件,可以使用逗号来分割文件路径。也可以使用API来设置这个属性,可以使用的API包括:Job.addCacheFile(URI)/ Job.addCacheArchive(URI) 和[Job.setCacheFiles(URI[])](../../api/org/apache/hadoop/mapreduce/Job.html)/ [Job.setCacheArchives(URI[])](../../api/org/apache/hadoop/mapreduce/Job.html)。这些方法中的URI形式是:hdfs://host:port/absolute-path\#link-name。在Streaming程序中,可以通过命令行选项-cacheFile/-cacheArchive来分发文件。
DistributedCache可以在map或reduce任务中作为一种基础的软件分发机制。他可以用来分发jar包和本地库。Job.addArchiveToClassPath(Path)或Job.addFileToClassPath(Path)这两个API可以用来缓存jar包或文件,并把它们添加到classpath和子jvm中去。同样的事情也可以通过设置配置属性mapreduce.job.classpath.{files |archives}做到。类似的,被链接到任务的工作目录的缓存文件可以用来分发和装载本地库。
私有的和公共的DistributedCache文件
DistributedCache文件可以是私有的也可以是公共的。这要看它们是如何在slave节点上被共享的。
私有的DistributedCache文件被缓存在用户私有的本地目录下,用户的作业需要使用这些缓存文件。这些文件只能被特定用户的作业或任务访问。其他用户无法访问这些文件。可以在DistributedCache文件被上传的文件系统上(通常是HDFS)将其权限设置为私有。如果文件或文件所在的目录无法被外界搜索到或访问到,这个文件就是私有的。
公共的DistributedCache文件缓存在一个全局共享目录下,并被设置为所有用户可见。这些文件可以被slave节点上的所有用户的作业和任务访问。同样,可以在文件系统将文件设置为公有。如果用户想把文件设置为公共的,那么文件的权限必须被设置为全局可读,并且文件所在的目录权限需要被设置为可执行。
Profiling
Profiling是一个工具,它使用内置的java profiler工具进行分析获得(2-3个)map和reduce样例运行分析报告。
用户可以通过设定mapreduce.task.profile属性来决定是否收集作业任务的profiler信息。利用API Configuration.set(MRJobConfig.TASK_PROFILE, boolean)可以修改属性值。如果设置为true,则开启profiling功能。收集的profiler信息被保存在用户日志目录。默认profiling功能是关闭的。
如果用户设定使用profiling功能,可以使用配置文档里的属性 mapred.task.profile.{maps|reduces} 设置要profile的map/reduce task范围。设置该属性值的api是 JobConf.setProfileTaskRange(boolean,String)。 范围的缺省值是0-2。
用户可以通过设定配置文档里的属性mapred.task.profile.params 来指定profiler配置参数。修改属性要使用api Configuration.set(MRJobConfig.TASK_PROFILE_PARAMS, String)。当运行task时,如果字符串包含%s。 它会被替换成profileing的输出文件名。这些参数会在命令行里传递到子JVM中。缺省的profiling 参数如下:
1 |
-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s |
Debugging
Map/Reduce框架能够运行用户提供的调试脚本。 当map/reduce任务失败时,用户可以通过运行调试脚本对任务日志(例如任务的标准输出、标准错误、系统日志以及作业配置文件)进行分析和处理。调试脚本的标准输出和错误信息会在作业UI的Diagnostic处展示。
在接下来的章节,我们讨论如何与作业一起提交调试脚本。为了提交调试脚本, 首先要把这个脚本分发出去,而且还要在配置文件里设置。
如何分发脚本文件:
用户要用 DistributedCache 来分发和链接脚本文件
如何提交脚本:
一个快速提交调试脚本的方法是分别为需要调试的map任务和reduce任务设置 “mapreduce.map.debug.script” 和 “mapreduce.reduce.debug.script” 属性的值。这些属性也可以通过 Configuration.set(MRJobConfig.MAP_DEBUG_SCRIPT, String)和 Configuration.set(MRJobConfig.REDUCE_DEBUG_SCRIPT, String) API来设置。对于streaming, 可以分别为需要调试的map任务和reduce任务使用命令行选项-mapdebug 和 -reducedegug来提交调试脚本。
脚本的参数是任务的标准输出、标准错误、系统日志以及作业配置文件。在运行map/reduce失败的节点上运行的调试命令:
1 |
$script $stdout $stderr $syslog $jobconf |
Pipes 程序根据第五个参数获得c++程序名。 因此调试pipes程序的命令是
1 |
$script $stdout $stderr $syslog $jobconf $program |
默认行为:
对于pipes,默认的脚本会用gdb处理core dump, 打印 stack trace并且给出正在运行线程的信息。
数据压缩
Hadoop MapReduce框架为应用程序的写入文件操作提供了压缩工具,这些工具可以为map输出的中间数据和作业结果输出数据(例如reduce的输出)提供支持。它还附带了一些 CompressionCodec的实现,比如实现了 zlib压缩算法。 Hadoop同样支持gzip、bzip2 、snappy和 lz4等文件格式。
考虑到性能问题(zlib)以及Java类库的缺失等因素,Hadoop也为上述压缩解压算法提供本地库的实现。
中间输出
应用程序可以通过 Configuration.set(MRJobConfig.MAP_OUTPUT_COMPRESS, boolean) api控制map输出的中间结果,并且可以通过 Configuration.set(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC, Class) api指定 CompressionCodec。
结果输出
应用程序可以通过 api方法FileOutputFormat.setCompressOutput(Job, boolean) 控制输出是否需要压缩,并且可以使用 FileOutputFormat.setOutputCompressorClass(Job, Class)指定CompressionCodec。
如果作业输出要保存成 SequenceFileOutputFormat格式,需要使用 SequenceFileOutputFormat.setOutputCompressionType(Job, SequenceFile.CompressionType) api,来设定 SequenceFile.CompressionType (i.e. RECORD / BLOCK – defaults to RECORD)。
Skipping Bad Records
在处理map输入的时候, hadoop允许跳过一定数量的记录集。应用程序可以使用SkipBadRecords类来使用这个特性。
有时map任务一遇到某些特定的输入集合就会崩溃,这主要是map函数的一些BUG导致的。通常,我们会尝试去修复这些BUG。然而,有些时候修复却是不可能或不容易完成的。比如如果确认BUG是由一些不开源的第三方库导致的,修复BUG就会变得极为困难。在这种情况下,不管任务尝试执行多少次也不能执行成功,最终整个作业也会执行失败。此时我们可以尝试使用hadoop的这个特性来跳过这些会导致崩溃的输入集合。即使这样做会导致一小部分与之关联的记录集的丢失,不过这在一些应用中也是可以接受的(比如一些大数据统计分析的场景)。
默认情况下,hadoop的这个特性是被禁用的。要启用的话,可以使用SkipBadRecords.setMapperMaxSkipRecords(Configuration, long)和SkipBadRecords.setReducerMaxSkipGroups(Configuration, long)这两个API。
启用了这个特性以后,map任务一旦发生一定次数的执行失败的情况后,框架就会进入“skipping”模式。如果想了解更多的细节,可以看一下这个方法SkipBadRecords.setAttemptsToStartSkipping(Configuration, int)。在“skipping”模式下,map任务会维护一个记录处理的范围值。框架依赖处理记录的计数器(Counter)来实现这一点。可以看一下SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS和SkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS。框架可以通过这个计数器知道多少条记录被成功处理,同样的也可以检测到导致任务崩溃的记录的范围。出于长远的考量,这个(导致崩溃的)范围内的记录会被跳过不处理。
跳过的纪录的数目取决于处理记录的计数器增长的频率。通常推荐记录每处理一次,计数器就增长一次。这在一些批处理的应用中可能不太好实现。在这种情况下,框架就会同时跳过一批与之关联的纪录。用户可以通过SkipBadRecords.setMapperMaxSkipRecords(Configuration, long)和SkipBadRecords.setReducerMaxSkipGroups(Configuration, long)来控制跳过的记录(或记录集)的数量。框架会采用类似二分查找的方式尽量收缩跳过的记录的范围。被选择的要跳过的记录会被分成两部分,最终只有一半会被跳过。在之后的处理中,框架就可以自动识别出究竟哪一半是需要被跳过的记录。一个任务会重复执行直到跳过的记录数复合用户的设置或者尝试执行的次数被耗尽。可以通过Job.setMaxMapAttempts(int) 和Job.setMaxReduceAttempts(int)来调整任务尝试的次数。
被跳过的纪录会在序列化后写入到HDFS中,以供后来的分析。文件存储的位置可以通过SkipBadRecords.setSkipOutputPath(JobConf, Path)来设置。
############
发表评论