概述
Hadoop Archive是Hadoop官方提供的解决HDFS上小文件过多的一种方案。可以通过如下命令来执行生成har文件:
1 |
hadoop archive -archiveName foo.har -p /user/hadoop -r 3 dir1 dir2 /user/zoo |
执行archive
命令会提交一个MapReduce任务来生成har文件。在了解har文件结构后也可以考虑本地生成har文件再上传。
关于“hadoop archive”指令的更多细节请参考官方文档 。
HAR文件结构
HAR文件实际上是一个以”.har”结尾命名的目录,其中至少包含三个文件:
- _index
- _masterindex
- part-00000 …
其中“_index”文件中存储包内目录、文件的元数据信息,并按路径java字符串hashCode()运算的哈希值排序。
“_masterindex”记录了“_index”文件中每1000条文件元数据信息的起止哈希值、以及其在“_index”文件中的起止位置。
“part-X”文件中直接拼接了原始文件内容,无压缩处理,每个map操作的节点生成一个“part-X”文件。
HAR文件读取
在平时工作中读取hdfs文件有三种形式:即在java代码中通过hadoop-client形式读取,执行spark任务读取,通过webhdfs的rest api读取。下面一一介绍下相关的的解决方案。
通过java-client方式读取
假设我们有一个har归档文件,其存储目录是:/har/hp2.har。
在这个归档文件下存在一个目录hp,存储了7个txt文件:
1 2 3 4 5 6 7 8 9 10 |
$ hfs -ls har:///har/hp2.har/hp/ 2019-10-28 17:45:24,682 WARN erasurecode.ErasureCodeNative: ISA-L support is not available in your platform... using builtin-java codec where applicable Found 7 items -rw-r--r-- 1 bg supergroup 451867 2019-10-25 17:07 har:///har/hp2.har/hp/1.Harry Potter and the Sorcerer's Stone.txt -rw-r--r-- 1 bg supergroup 506099 2019-10-25 17:07 har:///har/hp2.har/hp/2.Harry Potter and The Chamber Of Secrets.txt -rw-r--r-- 1 bg supergroup 628052 2019-10-25 17:07 har:///har/hp2.har/hp/3.Harry Potter and the Prisoner of Azkaban.txt -rw-r--r-- 1 bg supergroup 1135689 2019-10-25 17:07 har:///har/hp2.har/hp/4.Harry Potter and the Goblet of Fire.txt -rw-r--r-- 1 bg supergroup 1535861 2019-10-25 17:07 har:///har/hp2.har/hp/5.Harry Potter and the Order of the Phoenix.txt -rw-r--r-- 1 bg supergroup 1000626 2019-10-25 17:07 har:///har/hp2.har/hp/6.Harry Potter and The Half-Blood Prince.txt -rw-r--r-- 1 bg supergroup 1199467 2019-10-25 17:07 har:///har/hp2.har/hp/7.txt |
下面通过hadoop-client编写程序列出“har:///har/hp2.har/hp/”下的全部文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
import java.net.URI import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{HarFileSystem, Path} class HDFS(uriParent: String) { private val conf = new Configuration() private val fs = new HarFileSystem() fs.initialize(new URI(uriParent), conf) def files(path: String): Iterable[Path] = fs.listStatus(new Path(path)).map(f => f.getPath) } val harPath = "har:///har/hp2.har" val files = new HDFS(harPath).files("./hp") |
上面是用scala代码做的实现。
比照普通的HDFS文件的访问方式,访问Har文件的主要特点在于其FileSystem
操作对象是一个HarFileSystem
的实例。
对HarFileSystem
实例执行initialize()
操作的时候需要传入要访问的har文件的根路径,其后所有的操作都是对har子项的相对路径进行操作。
注意:执行initialize()
操作时只能传入har文件的根路径,不能像执行上面的“hfs -ls har:///har/hp2.har/hp/”指令一样传入一个完整的har子项的路径。
通过spark任务读取
spark读取数据文件大致有三种形式:
- 通过SparkSession.read.textFile读取文件创建DataSet
- 通过SparkSession.read.text读取文件创建DataFrame
- 通过SparkContext.textFile读取文件创建RDD
针对这三种形式,我分别写了一段代码进行验证:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
val hpHarDir = "har:///har/hp2.har/hp/" val ds1 = spark.read.textFile(hpHarDir) val c1 = ds1.filter(_.contains("Harry")).count() println(s"------------------------------->>>>>>>>>>>1 Harry: $c1") val ds2 = spark.read.text(hpHarDir) val c2 = ds2.filter(_.getString(0).contains("Harry")).count() println(s"------------------------------->>>>>>>>>>>2 Harry: $c2") val ds3 = spark.sparkContext.textFile(hpHarDir) val c3 = ds3.filter(_.contains("Harry")).count() println(s"------------------------------->>>>>>>>>>>3 Harry: $c3") |
在执行日志中输出的验证信息如下:
1 2 3 |
------------------------------->>>>>>>>>>>1 Harry: 0 ------------------------------->>>>>>>>>>>2 Harry: 0 ------------------------------->>>>>>>>>>>3 Harry: 15534 |
根据执行结果可知,通过SparkSession读取的两种方案均不可行,只有通过SparkContext进行读取才能达到预期效果。
通过webhdfs方式读取
关于直接通过webhdfs方式读取har中内容的方式,我查了些资料,包括Hadoop官方文档,StackOverflow的相关话题以及Google检索出的条目,其中勉强可行的是如下两篇文章建议的方案:
- Downloading a file inside a hadoop archive using Apache Knox
- Hadoop WebHDFS usage in combination with HAR (hadoop archive) from PHP
其实现思路大致如下:
- 通过webhdfs获取har的index文件
- 在index文件中找到在HDFS中存储目标文件的数据文件,以及目标文件在数据文件中的起始offset及长度
- 通过webhdfs获取目标文件:http://<hadoop-server>:50070/webhdfs/v1/data-file-path?op=OPEN&offset=$offset&length=$len
扫描har index文件并截取内容不太像是一种优雅的做法,至少我并不喜欢。
简单介绍一种替代方案,即通过hadoop-client实现数据流拷贝,下面是代码实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
private static final FileSystem fs; static { try { Configuration conf = new Configuration(); conf.addResource("hdfs-site.xml"); conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem"); fs = FileSystem.get(conf); } catch (Throwable e) { throw new RuntimeException("初始化HDFS操作实例异常"); } } public static void copy(Path harPath, HttpServletResponse response) throws IOException { FSDataInputStream in = null; OutputStream output = null; try { in = fs.open(harPath); output = response.getOutputStream(); copy(in, output); output.flush(); output.close(); } finally { IOUtils.closeStream(in); IOUtils.closeStream(output); } } private static int copy(InputStream input, OutputStream output) throws IOException { long count = copyLarge(input, output); if (count > Integer.MAX_VALUE) { return -1; } return (int) count; } private static long copyLarge(InputStream input, OutputStream output) throws IOException { byte[] buffer = new byte[1024]; long count = 0; int n = 0; while (-1 != (n = input.read(buffer))) { output.write(buffer, 0, n); count += n; } return count; } |
在api接口中调用copy方法即可实现下载功能。代码稍稍有些多,但窃以为还是要比扫描index好一些。
发表评论