必需要提前说明下:不建议使用自定义的Filter。所有的Filter都是在服务端生效:就是说需要将自定义的Filter封装为jar,上传到HBase的类路径下,并重启HBase使之生效。对于生产环境的HBase来说,重启通常是不能接受的。
Filter的设置是在客户端完成的,而Filter的逻辑是在HBase的服务端完成的,中间需要一次序列化。我试过几种序列化方案,不过protobuffer以外的其他几种效果不算好。HBase自带的Filter也是用protobuffer进行的序列化,因此使用protobuffer还可以少传几个包。
需要提前说明的已经说完了,开始进入正题。这次从一个案例开始说起:在HBase中存储着用户行为记录,行键设计为“uid(6位)+etime(时间戳/1000)+tid(7位)+顺序号(8位)”。其中uid为用户ID、etime为事件时间、tid为行为标签。目标是检索出某个用户在指定时间范围内的几种行为数据。
针对这个案例我们自定义一个CustomRowKeyFilter,并将一个用户ID、事件起止时间以及多个行为ID作为CustomRowKeyFilter的成员变量。
package com.zhyea.dev.hbase.filter;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
public class CustomRowKeyFilter extends FilterBase {
private long pid;
private long eventTime;
private String tids;
private boolean filterOutRow = false;
public CustomRowKeyFilter(long _pid, long _eventTime, String _tids) {
this.pid = _pid;
this.eventTime = _eventTime;
this.tids = _tids;
}
@Override
public boolean filterRowKey(byte[] data, int offset, int length) {
String rowKey = Bytes.toString(data, offset, length);
this.filterOutRow = check(rowKey);
return this.filterOutRow;
}
public ReturnCode filterKeyValue(Cell v) throws IOException {
if (this.filterOutRow) {
return ReturnCode.NEXT_ROW;
}
return ReturnCode.INCLUDE;
}
private boolean check(String rowKey) {
try {
if (rowKey.length() < 7) {
return true;
}
long _pid = Long.valueOf(rowKey.substring(0, 6));
long _eTime = Long.valueOf(rowKey.substring(6, 16));
long _tid = Long.valueOf(rowKey.substring(16, 23));
if (this.pid != _pid) {
return true;
}
if (this.eventTime > _eTime) {
return true;
}
if (!this.tids.contains(_tid + "")) {
return true;
}
} catch (Exception e) {
return true;
}
return false;
}
}
代码中继承了FilterBase类,可以减少一些结构性的代码工作。至于Filter是如何工作的,在网上找到的这张图应该描述得很清楚了:
前面的代码只是实现了Filter的处理逻辑。要想使用这个Filter还需要做一些序列化处理。如前面所说序列化方案选择的是protobuffer,这里需要先定义一个描述文件CustomRowKeyFilterProto.proto,内容如下:
package filter;
option java_package = "com.zhyea.dev.hbase.filter.proto";
option java_outer_classname = "CustomRowKeyFilterProto";
message CustomRowKeyFilter {
required int64 pid = 1;
required int64 eventTime = 2;
required string tids = 3;
}
定义完成后,执行protoc命令:
protoc -I=./ --java_out=../src/main/java CustomRowKeyFilterProto.proto
其中“-I”指定了proto描述文件的父目录, “—java_out”指定了java类的类路径,具体请根据自己的情况进行设置。执行命令后会在包com.zhyea.dev.hbase.filter.proto下生成序列化工具类CustomRowKeyFilterProto.java。
接下来在CustomRowKeyFilter中重写Filter类的toByteArray()方法和parseFrom()方法:
public byte[] toByteArray() throws IOException {
CustomRowKeyFilterProto.CustomRowKeyFilter.Builder builder = CustomRowKeyFilterProto.CustomRowKeyFilter.newBuilder();
builder.setPid(this.pid);
builder.setEventTime(this.eventTime);
builder.setCids(this.tids);
return builder.build().toByteArray();
}
public static Filter parseFrom(final byte[] pbBytes) throws DeserializationException {
CustomRowKeyFilterProto.CustomRowKeyFilter proto;
try {
proto = CustomRowKeyFilterProto.CustomRowKeyFilter.parseFrom(pbBytes);
} catch (InvalidProtocolBufferException e) {
throw new DeserializationException(e);
}
long _pid = proto.getPid();
long _eventTime = proto.getEventTime();
String _tids = proto.getCids();
return new CustomRowKeyFilter(_pid, _eventTime, _tids);
}
这样自定义Filter就完成了。剩下的事情就是将之打包并上传到HBase(每个RegionServer)的类路径下。然后就可以在程序中使用了。
现在再仔细想想这个程序,是否一定需要一个自定义Filter呢!我们已经将查询需要的所有元素都定义在行键里了。那么可以使用“uid+起始时间”作为startRow,“uid+结束时间”作为stopRow完成时间范围的匹配,使用RegexStringComparator来处理tid的匹配,这样直接使用HBase提供的RowFilter就能解决问题了。唯一需要注意的事情就是在设计表时多花些心思在行键上罢了。
就是这样。
参考文档
HBase Filter介绍及执行流程:https://my.oschina.net/cloudcoder/blog/289649
您好,按照您的例子写了一个自定义filter,但是用的时候报错说 proto为空,想请教一下原因
不妨贴一下错误详情。
你好,我自定义了一个Filter,然后放到了hbase.dynamic.jars.dir下,但是使用的时候报protocol出错,楼主你是使用的CDH集群吗,怎么部署的Filter?
不是CDH集群。也没有部署Filter,这只是一个废弃的方案,不可能发布到公司的生产环境的,只是在测试环境试验过。