通过HA访问Hdfs的时候如何获取到活跃节点是一个稍稍有些麻烦的事情。
目前使用过两种方案:一是通过webhdfs接口逐一访问测试,找到状态为可用的节点;一是在zookeeper上直接获取当前活跃的节点。
简单说下第二种方案。ha的ActiveNode在zookeeper上的存储节点为:/hadoop-ha/dcnameservice/ActiveStandbyElectorLock。只需要通过ZooKeeper的API监听获取这个节点的信息即可。不过这个节点保存的信息不能当做字符串来读取,它是一个序列化后的对象,需要反序列化才能使用。
实现的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
import com.google.protobuf.InvalidProtocolBufferException; import org.apache.hadoop.hdfs.server.namenode.ha.proto.HAZKInfoProtos; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import java.io.IOException; import static org.apache.zookeeper.Watcher.Event.EventType.NodeDataChanged; public class ZooKeeperClient { private static ZooKeeper zk; private static Stat stat = new Stat(); private static String HA_ACTIVE_NODE = ""; static { try { zk = new ZooKeeper(HDFS_HA_ZK_ADDRESS, 50000, new Watcher() { @Override public void process(WatchedEvent event) { if (event.getType() == NodeDataChanged) { update(); System.out.println("HA Address changed......"); } } }); } catch (IOException e) { throw new RuntimeException(e); } update(); } private static void update() { try { byte[] bytes = zk.getData(HDFS_HA_ZK_NODE, true, stat); HAZKInfoProtos.ActiveNodeInfo nodeInfo = HAZKInfoProtos.ActiveNodeInfo.parseFrom(bytes); HA_ACTIVE_NODE = nodeInfo.getHostname(); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } } public static String getActiveNode() { return HA_ACTIVE_NODE; } public static void main(String[] args) { System.out.println(getActiveNode()); } } |
就这样。
发表评论