Hadoop中一个不明确的内容就是Block复制:它自动完成,通常不需要用户关心。HBase将数据保存到HDFS,并完全相信它的安全性。正是因为HDFS的Block复制对HBase来说是完全透明的,就产生了一个问题:HBase的效率会受到多大的影响?当我们开始写MapReduce作业访问HBase和Hadoop的时候,就难免会想到这个问题。尤为关键的是,当HBase中存储的数据很多的时候,它怎样使数据距离需要的地方更近一些?这就涉及到HBase的数据是怎样在HDFS上存储了。
首先,我们看看Hadoop是如何处理这个问题的。在MapReduce文档中说明了task通常运行在距离其处理的数据比较近的位置。这主要是通过将HDFS中的数据拆分成小块实现的,这些小块的数据被称为Block。HDFS中的Block的大小要比文件系统的Block大得多:默认是64M,但是通常会被设置为128M(这个值还可以设置得更大一些——如果确认所有文件的size都大于单个Block)。每个Block对应一个map task。这也就意味着Block的size设置得越大,Block的数量就越少,需要的map task的数量就越少。Hadoop知道每个Block的存储位置,它会在存储Block的节点上直接运行map task。每个Block都有两到三个副本。事实上,hadoop选择的节点通常是存储Block副本的节点。这样Hadoop保证了MapReduce作业总是在本地处理数据。
现在回到HBase。既然已经知道Hadoop是如何让它的每个map task处理本地数据的,就可以再进一步思考下HBase是如何做到Data Locality的。要是读过关于HBase存储结构的文章的话,就知道HBase是将相关的文件保存在HDFS上。这些文件包括数据文件(HFile)和日志文件(WAL)。在源码里也能看到HBase是调用了FileSystem.create(Path path)方法来创建这些文件的。现在可以想想两种常见的访问模型:1.直接随机访问;2.使用MapReduce扫描全表。我们会很好奇HBase是怎样就近读取HDFS上的Block,从而提升这两种访问方式的效率的。
话说回来,如果Hadoop和Hbase没有在同一个集群上,而是分隔开的,就不要再想Data Locality了——这根本不可能做到。这就如同运行一个独立的MapReduce集群,而它根本无法在DataNode上执行task。所以要实现Data Locality就必须让相关的服务在一个同一个集群上,包括Hadoop(或者说是HDFS)、MapReduce和HBase。就是这样。
好了,是不是弄清了所有的服务都在同一个集群(希望是一个比较大的集群)上?弄清了,那就继续。在HBase访问数据的时候,Hadoop是怎样找到数据在哪儿呢?这样又回到了前面提到的两种访问模型上。这涉及到一个概念:RegionServer。不管是直接随机访问还是扫描全表,都是通过相同的API实现的。正如之前所提到的,HBase仅仅是执行文件的保存,而文件的分布和Block的复制是通过HDFS的DatNode来实现的。假设一个场景:在向HBase中存储了大量的数据后停止HBase服务并连续地对其进行重启。RegionServer在重启后会被分配随机数量的Regions。在这个时候,HBase的Data Locality是无法保证的。为什么呢?
最重要的是HBase不会频繁重启,并且会定期进行内部维护。随着时间的推移,数据不停地增加,HBase会执行compact来重写文件。由于各种原因,文件一旦写入HDFS就是不可变的。因此,数据会不停地写入新的文件。随着数据文件越来越多,HBase会将这些数据文件压缩(compact)合并成另一组新的文件。这里是最让人惊奇的地方:HDFS足够聪明,它知道将数据放到被需要的地方。这是怎么做到的呢?我们需要深入Hadoop的源码来看看HBase调用的FileSystem.create(Path path)方法是怎么工作的。因为我们选择的HBase存储方案是HDFS,所以我们实际上调用的是DistributedFileSystem.create(Path path)方法,该方法的代码如下:
1 2 3 |
public FSDataOutputStream create(Path f) throws IOException { return create(f, true); } |
这个方法返回了一个FSDataOutputStream类型的实例。create这个实例的方法如下:
1 2 3 4 5 6 7 8 9 10 11 |
public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { return new FSDataOutputStream(dfs.create(getPathName(f), permission, overwrite, replication, blockSize, progress, bufferSize), statistics); } |
这个方法中使用了一个DFSClient实例作为纽带来连接client和NameNode:
1 |
this.dfs = new DFSClient(namenode, conf, statistics); |
最终返回的是一个DFSClient.DFSOutputStream实例。数据不停地写入到DFSOutputStream,DFSClient会将之收集起来打成包作为一个Block写入到DataNode。这个过程是由DFSClient.DFSOutputStream.DataStreamer实现的,它以守护进程的形式在后台运行。接下来我们一步一步的推断出这个过程具体是怎么实现的。首先我们看一下DataStreamer后台线程的run()方法,它获取了存储数据的DataNode的列表:
1 |
nodes = nextBlockOutputStream(src); |
而这个方法又调用了如下的代码:
1 2 3 4 |
long startTime = System.currentTimeMillis(); lb = locateFollowingBlock(startTime); block = lb.getBlock(); nodes = lb.getLocations(); |
接下来看看locateFollowingBlocks()又调用了哪个方法:
1 |
return namenode.addBlock(src, clientName); |
这里就是关键了。这里使用namenode对象添加了一个新的Block,方法中使用的src参数表示要写入的文件,clientName表示DFSClient实例的名称。接下来跳过部分不太重要的内容,直接看看稍后一些关键的步骤:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
public LocatedBlock getAdditionalBlock(String src, String clientName) throws IOException { ... INodeFileUnderConstruction pendingFile = checkLease(src, clientName); ... fileLength = pendingFile.computeContentSummary().getLength(); blockSize = pendingFile.getPreferredBlockSize(); clientNode = pendingFile.getClientNode(); replication = (int)pendingFile.getReplication(); // choose targets for the new block tobe allocated. DatanodeDescriptor targets[] = replicator.chooseTarget(replication, clientNode, null, blockSize); ... } |
最后也是最为核心的代码,就是replicator.chooseTarget() 方法的详情了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
private DatanodeDescriptor chooseTarget(int numOfReplicas, DatanodeDescriptor writer, List<Node> excludedNodes, long blocksize, int maxNodesPerRack, List<DatanodeDescriptor> results) { if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) { return writer; } int numOfResults = results.size(); boolean newBlock = (numOfResults==0); if (writer == null && !newBlock) { writer = (DatanodeDescriptor)results.get(0); } try { switch(numOfResults) { case 0: writer = chooseLocalNode(writer, excludedNodes, blocksize, maxNodesPerRack, results); if (--numOfReplicas == 0) { break; } case 1: chooseRemoteRack(1, results.get(0), excludedNodes, blocksize, maxNodesPerRack, results); if (--numOfReplicas == 0) { break; } case 2: if (clusterMap.isOnSameRack(results.get(0), results.get(1))) { chooseRemoteRack(1, results.get(0), excludedNodes, blocksize, maxNodesPerRack, results); } else if (newBlock) { chooseLocalRack(results.get(1), excludedNodes, blocksize, maxNodesPerRack, results); } else { chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack, results); } if (--numOfReplicas == 0) { break; } default: chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize, maxNodesPerRack, results); } } catch (NotEnoughReplicasException e) { FSNamesystem.LOG.warn("Not able to place enough replicas, still in need of " + numOfReplicas); } return writer; } |
接回上面的内容,我们已经启动了一个DFSClient实例并创建了一个写满数据的序列化文件。当要将数据文件存储为Block的时候,上面的代码首先检查是否可以将Block保存在client所在的本地主机(代码中的“writer”)——这是程序中“case 0”的内容。而“case 1”的部分表示程序尝试在另一个机架上保存一个远程备份。后面“case 2”优先选择一个与“case 0”中相同的机架,然后才考虑选择不同的机架。如果还需要更多的备份,则会随机选择一个机架的任意节点保存。
这就表示,在完成一次对所有表的major compaction(可以是手动触发也可以在配置文件中配置)以后,RegionServer运行的时间越长,它将数据保存在本地节点的概率就越大。和RegionServer在相同物理主机上的DataNode会保留一份这个RegionServer需要的所有数据的拷贝。这样可以保证在执行scan、get或其他任何操作时有最好的性能。
最后,要想全面地了解HDFS的设计和数据冗余备份的内容请移步Hadoop官网:HDFS Architecture。也请注意到HBase团队一直在重新设计HMaster分配Region给RegionServer的方案。新的方案将会把Regions分配给拥有Block最多的RegionServer。这调整对改善HBase重启后的DataLocality特别有用。
原文:
HBase File Locality in HDFS:http://www.larsgeorge.com/2010/05/hbase-file-locality-in-hdfs.html
#####
发表评论