概述
Hadoop Mapreduce是一个简单易用的框架。基于MapReduce写出来的程序能够运行在由上千台商用机器组成的大型集群上,以一种可靠的容错的方式并行处理T级别的海量数据。
一个MapReduce作业通常会把输入的数据集拆分成独立的块,由map任务(task)以完全并行的方式处理。框架先对map的输出结果进行排序,排好序的结果会做为reduce任务的输入。通常作业的输入和输出都存放在一个文件系统里。MapReduce框架负责任务的调度和监控,并重新执行已经失败的任务。
一般来说Hadoop的计算节点和存储节点是在相同的机器上。这是因为MapReduce和HDFS运行在一组相同的节点上。这种设计允许MapReduce在已经存储好数据的节点上高效地进行任务调度,从而更集中高效地使用整个集群的带宽。
MapReduce框架的组成包括:一个单独的master(ResourceManager),集群每个节点上都有一个slave(NodeManager),以及每个应用程序上的MRAppMaster(参考YARN架构)。
应用程序最少应该指定输入输出的位置,并通过实现合适的接口和(或)抽象类来提供map和reduce函数。有了这些设置,再加上其他作业参数,就组成了作业配置(job configuration)。
接下来,Hadoop的作业客户端(job client)就会提交作业(job, 一般是jar包或其他可执行程序)和配置信息给ResourceManager,ResourceManager负责分发软件和配置信息给slave,调度并监控任务执行,并提供状态和诊断信息给作业客户端(job-client)。
尽管Hadoop框架是使用Java实现的,但是MapReduce程序不一定要用Java来写,比如我们可以使用Hadoop Streaming和Hadoop Pipes。
Hadoop Streaming是一种运行作业的实用工具,它允许用户创建和运行任何可执行程序(例如:Shell)来做为mapper和reducer。
Hadoop Pipes是一个与SWIG兼容的C++ API (没有基于JNITM技术),它也可用于实现Map/Reduce应用程序。
输入和输出
MapReduce框架基于<key, value>对执行,也就是说,MapReduce框架把作业的输入视为一组<key, value>对,并生产一组<key, value>对作为作业的输出。这两组<key, value>对可以是不同的类型。
MapReduce框架需要对key和value的实例进行序列化操作,因此这些类需要实现Writable接口。此外,key对应的类还要实现WritableComparable接口以适应MapReduce框架对排序的要求。
一个Mapreduce作业的输入输出类型如下:
1 2 3 4 |
(输入) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (输出) |
注意combin的输出类型是要和map的输出类型一致的。
实例:WordCount v1.0
在深入细节之前,我们先看一个MapReduce的应用示例,以对MapReduce的工作方式有一个初步的认识。
WordCount 是一个简单的应用,可以用来统计在一组输入数据中某个单词出现的次数。
这个应用适用于单机模式,伪分布式模式或完全分布式模式三种Hadoop安装方式。
源码如下,我做了折叠,点开查看就好:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.util.Iterator; import java.util.StringTokenizer; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { private static final IntWritable one = new IntWritable(1); private static final Text word = new Text(); @Override public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer tokenizer = new StringTokenizer(value.toString()); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override public void reduce(Text key, Iterable<IntWritable> value, Context context) throws IOException, InterruptedException { Iterator<IntWritable> iter = value.iterator(); int sum = 0; while (iter.hasNext()) { sum += iter.next().get(); } context.write(key, new IntWritable(sum)); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "word count"); job.setJarByClass(TokenizerMapper.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job, args[0]); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } } |
工程的maven的配置如下,也是做了折叠,点开查看就好:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.zhyea.robin</groupId> <artifactId>hadoop-dev</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.2</version> <scope>provided</scope> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.7</source> <target>1.7</target> </configuration> </plugin> </plugins> <finalName>wc</finalName> </build> </project> |
执行“mvn clean package”完成打包。
执行如下命令运行MapReduce任务:
1 |
hadoop jar wc.jar /adt/input /adt/output |
其中/adt/input是输入目录,/adt/output是输出目录,输入和输出目录都在HDFS上。输入目录下可以有多个输入文件。在/adt/input下有两个输入文件,可以看一下文件的内容:
1 2 3 4 5 |
$ bin/hadoop fs -cat /adt/input/file01 Hello World Bye World $ bin/hadoop fs -cat /adt/input/file02 Hello Hadoop Goodbye Hadoop |
看一下输出结果:
1 2 3 4 5 6 7 |
$ bin/hadoop fs -cat /adt/output/part-r-00000 Bye 1 Goodbye 1 Hadoop 2 Hello 2 World 2 |
运行命令说明
应用程序能够使用-files选项来指定一个由逗号分隔的路径列表,这些路径是task的当前工作目录。使用选项-libjars可以向map和reduce的classpath中添加jar包。使用-archives选项程序可以传递压缩文档做为参数,这些压缩文档会被解压并且在task的当前工作目录下会创建一个指向解压生成的目录的符号链接(以压缩包的名字命名)。
使用-libjars, -files and -archives运行wordcount实例:
1 |
hadoop jar hadoop-mapreduce-examples-<ver>.jar wordcount -files cachefile.txt -libjars mylib.jar -archives myarchive.zip input output |
在这里,压缩文档myarchive.zip将被解压并存放在以“myarchive.zip”命名的目录下。
用户也可以使用#标识通过-files或者-archives为文件或压缩文档指定一个别名。
举个例子:
1 |
hadoop jar hadoop-mapreduce-examples-<ver>.jar wordcount -files dir1/dict.txt#dict1,dir2/dict.txt#dict2 -archives mytar.tgz#tgzdir input output |
在这个例子中,通过别名dict1和dict2分别访问文件dir1/dict.txt和dir2/dict.txt。而压缩文档mytar.tgz将会被解压并置于名为“tgzdir”的目录下。
关于Map
程序中的map方法:
1 2 3 4 5 6 7 8 |
@Override public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } |
Mapper的实现类通过map方法,一次处理一行由指定的FileInputFormat提供的记录。然后Mapper通过StringTokenizer将这一行记录以空格为分隔符分割成若干Tokens,而后输出格式为< <word>, 1>的key-value对。
对于提供的第一个输入样本,map的输出内容为:
1 2 3 4 |
< Hello, 1> < World, 1> < Bye, 1> < World, 1> |
第二个样本的map输出内容为:
1 2 3 4 |
< Hello, 1> < Hadoop, 1> < Goodbye, 1> < Hadoop, 1> |
在我们的WordCount程序里还指定了一个combiner。
1 |
job.setCombinerClass(IntSumReducer.class); |
每个map运行后,会对输出按照key进行排序,然后把输出传递给本地的combiner(按照作业的配置与Reducer一样),进行本地聚合。
第一个map的输出:
1 2 3 |
< Bye, 1> < Hello, 1> < World, 2> |
第二个map的输出:
1 2 3 |
< Goodbye, 1> < Hadoop, 2> < Hello, 1> |
关于reduce
reduce的代码:
1 2 3 4 5 6 7 8 9 |
@Override public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } |
Reducer实现类的reduce方法只是将每个key(本例中就是单词)出现的次数求和。
因此这个作业的输出就是:
1 2 3 4 5 |
< Bye, 1> < Goodbye, 1> < Hadoop, 2> < Hello, 2> < World, 2> |
关于main方法
main方法指定了作业的几个方面,比如说输入输出路径(通过命令行提供),key和value的类型,输入输出的格式等信息。在作业中,调用了job.waitForCompletion函数提交作业并监控它的执行。
############
发表评论