必需要提前说明下:不建议使用自定义的Filter。所有的Filter都是在服务端生效:就是说需要将自定义的Filter封装为jar,上传到HBase的类路径下,并重启HBase使之生效。对于生产环境的HBase来说,重启通常是不能接受的。
Filter的设置是在客户端完成的,而Filter的逻辑是在HBase的服务端完成的,中间需要一次序列化。我试过几种序列化方案,不过protobuffer以外的其他几种效果不算好。HBase自带的Filter也是用protobuffer进行的序列化,因此使用protobuffer还可以少传几个包。
需要提前说明的已经说完了,开始进入正题。这次从一个案例开始说起:在HBase中存储着用户行为记录,行键设计为“uid(6位)+etime(时间戳/1000)+tid(7位)+顺序号(8位)”。其中uid为用户ID、etime为事件时间、tid为行为标签。目标是检索出某个用户在指定时间范围内的几种行为数据。
针对这个案例我们自定义一个CustomRowKeyFilter,并将一个用户ID、事件起止时间以及多个行为ID作为CustomRowKeyFilter的成员变量。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
package com.zhyea.dev.hbase.filter; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.filter.FilterBase; import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; public class CustomRowKeyFilter extends FilterBase { private long pid; private long eventTime; private String tids; private boolean filterOutRow = false; public CustomRowKeyFilter(long _pid, long _eventTime, String _tids) { this.pid = _pid; this.eventTime = _eventTime; this.tids = _tids; } @Override public boolean filterRowKey(byte[] data, int offset, int length) { String rowKey = Bytes.toString(data, offset, length); this.filterOutRow = check(rowKey); return this.filterOutRow; } public ReturnCode filterKeyValue(Cell v) throws IOException { if (this.filterOutRow) { return ReturnCode.NEXT_ROW; } return ReturnCode.INCLUDE; } private boolean check(String rowKey) { try { if (rowKey.length() < 7) { return true; } long _pid = Long.valueOf(rowKey.substring(0, 6)); long _eTime = Long.valueOf(rowKey.substring(6, 16)); long _tid = Long.valueOf(rowKey.substring(16, 23)); if (this.pid != _pid) { return true; } if (this.eventTime > _eTime) { return true; } if (!this.tids.contains(_tid + "")) { return true; } } catch (Exception e) { return true; } return false; } } |
代码中继承了FilterBase类,可以减少一些结构性的代码工作。至于Filter是如何工作的,在网上找到的这张图应该描述得很清楚了:
前面的代码只是实现了Filter的处理逻辑。要想使用这个Filter还需要做一些序列化处理。如前面所说序列化方案选择的是protobuffer,这里需要先定义一个描述文件CustomRowKeyFilterProto.proto,内容如下:
1 2 3 4 5 6 7 8 9 10 |
package filter; option java_package = "com.zhyea.dev.hbase.filter.proto"; option java_outer_classname = "CustomRowKeyFilterProto"; message CustomRowKeyFilter { required int64 pid = 1; required int64 eventTime = 2; required string tids = 3; } |
定义完成后,执行protoc命令:
1 |
protoc -I=./ --java_out=../src/main/java CustomRowKeyFilterProto.proto |
其中“-I”指定了proto描述文件的父目录, “—java_out”指定了java类的类路径,具体请根据自己的情况进行设置。执行命令后会在包com.zhyea.dev.hbase.filter.proto下生成序列化工具类CustomRowKeyFilterProto.java。
接下来在CustomRowKeyFilter中重写Filter类的toByteArray()方法和parseFrom()方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
public byte[] toByteArray() throws IOException { CustomRowKeyFilterProto.CustomRowKeyFilter.Builder builder = CustomRowKeyFilterProto.CustomRowKeyFilter.newBuilder(); builder.setPid(this.pid); builder.setEventTime(this.eventTime); builder.setCids(this.tids); return builder.build().toByteArray(); } public static Filter parseFrom(final byte[] pbBytes) throws DeserializationException { CustomRowKeyFilterProto.CustomRowKeyFilter proto; try { proto = CustomRowKeyFilterProto.CustomRowKeyFilter.parseFrom(pbBytes); } catch (InvalidProtocolBufferException e) { throw new DeserializationException(e); } long _pid = proto.getPid(); long _eventTime = proto.getEventTime(); String _tids = proto.getCids(); return new CustomRowKeyFilter(_pid, _eventTime, _tids); } |
这样自定义Filter就完成了。剩下的事情就是将之打包并上传到HBase(每个RegionServer)的类路径下。然后就可以在程序中使用了。
现在再仔细想想这个程序,是否一定需要一个自定义Filter呢!我们已经将查询需要的所有元素都定义在行键里了。那么可以使用“uid+起始时间”作为startRow,“uid+结束时间”作为stopRow完成时间范围的匹配,使用RegexStringComparator来处理tid的匹配,这样直接使用HBase提供的RowFilter就能解决问题了。唯一需要注意的事情就是在设计表时多花些心思在行键上罢了。
就是这样。
参考文档
HBase Filter介绍及执行流程:https://my.oschina.net/cloudcoder/blog/289649
您好,按照您的例子写了一个自定义filter,但是用的时候报错说 proto为空,想请教一下原因
不妨贴一下错误详情。
你好,我自定义了一个Filter,然后放到了hbase.dynamic.jars.dir下,但是使用的时候报protocol出错,楼主你是使用的CDH集群吗,怎么部署的Filter?
不是CDH集群。也没有部署Filter,这只是一个废弃的方案,不可能发布到公司的生产环境的,只是在测试环境试验过。