为什么选择High Level Consumer
很多时候用户只是想从kafka中读取数据,对于如何处理消息的offset则不怎么关注。在抽象了Kafka中消费事件的大部分细节后,High Level Consumer可以让用户使用起来更为简单。
首先需要知道的就是High Level Consumer保存了从zookeeper中读取某个partition最后的offset。这个offset基于读取进程一开始时提供给kafka的名称保存。这个名称可以理解为Consumer Group的名称。
Consumer Group的名称在kafka集群中是一个全局的属性,因此在启动新的Consumer程序前,注意要关掉“旧的”Consumer。当使用已有的Consumer Group名称启动一个新的Consumer的时候,Kafka将会把这个进程的线程添加到一个现有的线程组中来消费Topic,并触发一次“re-balance”。在“re-balance”中,kafka将会把可用的partition分配给可用的线程。这就有可能把一个partition移交给另一个进程。如果同时使用新的和旧的业务处理逻辑,就很有可能把一些消息导向旧的业务处理逻辑。
设计一个High Level Consumer
首先要说明的是使用High Level Consumer的可以(或者说应该)是一个多线程应用。围绕着topic中partition的数量定义的线程模型有如下几个明显的特征:
- 如果提供的线程数量多于topic的partition的数量,一些线程将永远接收不到任何消息;
- 如果提供的线程数量少于topic的partition数量,一些线程将会收到来自多个partition的数据;
- 如果一个线程对应着多个partition,那么接收到的消息的有序性将会得不到保证,除非在partition内部的offset是序列化的。举例说,你可能会从partition10中获取5条消息,从partition11中获取6条消息,那么有可能在你从partition10中获取5条消息后,又继续从partition10中获取了5条消息,尽管此时partition11中的消息是可用的;
- 添加进程或线程将会导致re-balance,这就有可能会导致线程对应的partition会重新分配。
现在就可以尝试从Kafka集群读取数据了,如果没有新数据的话,读取数据的进程可能会阻塞。
如下是一个非常简单的Kafka High Level Consumer线程实例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; public class ConsumerTest implements Runnable { private KafkaStream m_stream; private int m_threadNumber; public ConsumerTest(KafkaStream a_stream, int a_threadNumber) { m_threadNumber = a_threadNumber; m_stream = a_stream; } public void run() { ConsumerIterator<byte[], byte[]> it = m_stream.iterator(); while (it.hasNext()) System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message())); System.out.println("Shutting down Thread: " + m_threadNumber); } } |
程序中值得关注的部分是“while (it.hasNext()) ”这一句,程序就是通过这一句不停地从Kafka集群读取数据的——直到用户主动停止线程。
配置测试应用
和SimpleConsumer不同的是,High Level Consumer为我们做了大量的信息记录以及故障处理工作,然而我们还是需要告诉Kafka将一些信息存储在哪儿。在下面的方法中,定义了创建一个High Level Consumer最基本的配置信息:
1 2 3 4 5 6 7 8 9 10 |
private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) { Properties props = new Properties(); props.put("zookeeper.connect", a_zookeeper); props.put("group.id", a_groupId); props.put("zookeeper.session.timeout.ms", "400"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); return new ConsumerConfig(props); } |
简单说明下这里的配置参数:
- “zookeeper.connect”指明了如何在kafka集群中找到启动的Zookeeper实例。Kafka使用zookeeper保存了当前ConsumerGroup从指定topic中消费消息的偏移量以及对应partition信息;
- “group.id”定义了当前进程所代表的Consumer Group;
- “zookeeper.session.timeout.ms”定义了kafka等待Zookeeper响应请求(读或者写请求)的时间,时间单位是毫秒,如果超过时间,Kafka就会放弃并继续消费消息;
- “zookeeper.sync.time.ms”表示了没有发生故障时,zookeeper的一个follower和master同步的时间间隔;
- “auto.commit.interval.ms”定义了多久更新一次写入到zookeeper的消费offset信息。注意,因为这个提交频率是基于时间的而非基于消费的消息的,如果在提交更新时发生了错误,就有可能重新消费消息。
关于配置的更多信息可以查看这里:http://kafka.apache.org/08/configuration.html。
创建线程池
在示例程序中使用java的“java.util.concurrent”包来管理线程,使用这个包可以很方便的创建一个线程池:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
public void run(int a_numThreads) { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(a_numThreads)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); // 创建多个线程 executor = Executors.newFixedThreadPool(a_numThreads); // 创建一个对象消费消息 int threadNumber = 0; for (final KafkaStream stream : streams) { executor.execute(new ConsumerTest(stream, threadNumber)); threadNumber++; } } |
首先我们创建了一个Map用来告诉Kafka我们要为目标topic启动多少个线程。我们调用consumer.createMessageStreams方法来传递这个信息给Kafka。这个方法的返回值是一个Map对象,表示了topic和监听topic的KafkaStream的映射关系。注意,我们的示例程序中只向Kafka请求了一个topic,实际上我们可以请求多个topic的信息,只需要在topicCountMap加入对应的信息即可。
最后,我们成功创建了一个线程池,并为每个线程创建了一个ConsumerTest对象作为具体的业务逻辑。
安全退出和错误处理
前面已经提过,Kafka并不会在每次读取消息后就立即更新保存在Zookeeper中的消息offset,而是每隔一段时间更新一次。这就有可能产生一小段延迟,比如我们的程序已经消费了消息,但是实际上此时仍未同步到Zookeeper。如果此时客户端退出了或者崩溃了,那么此前我们消费过的消息可能会再次出现。
还要注意,有时候Broker故障或者其他事件导致的Partition的leader的改变也有可能导致消息的重复消费。
为了避免这种情况的发生,需要尽可能保证安全退出,不要使用“kill -9”这种指令。
在我们的示例程序中,主线程执行到最后sleep了10秒钟。这样后台消费线程就有了10秒钟时间消费stream中的数据。因为已经开启了自动提交,Kafka将会每秒钟提交一次offset。最后,主线程调用了shutdown方法,这个方法会先调用每个消费者线程的shutdown方法,而后才会调用ExecutorService的shutdown方法,最后会等待ExecutorService完成所有未完成的工作。这给了消费者线程一些时间来处理完成仍在stream中的少量未处理消息。如果消费者线程已经处理完了所有来自server的消息,此时关闭消费者线程,Stream的迭代器的hasNext()方法将会返回false。这样消费者线程也可以优雅的退出。另外,如果开启了自动提交,调用消费者线程的consumer方法将会在退出前提交最终的offset。
1 2 3 4 5 |
try { Thread.sleep(10000); } catch (InterruptedException ie) { } example.shutdown(); |
在实际工作中,通常采用的工作模式是让主线程无限期的睡眠,通过shutdown hook的方式实现安全退出。(有必要了解一下java的hook机制)。
运行示例程序
运行示例程序需要如下命令行参数:
- 包含端口号的Zookeeper连接字符串;
- 这次消费进程要使用的Consumer Group名称;
- 消费的消息所属的Topic;
- 此次消费进程启动的线程数目。
例如:
1 |
server01.myco.com1:2181 group3 myTopic 4 |
这个命令表示将会通过连接主机server01.myco.com1的2181端口与其上的Zookeeper进行通信,请求了名为“myTopic”的Topic的全部partition,并启动了4个线程来消费这些partition上的消息。这个示例中使用的Consumer Group是“group3”。
完整的代码如下(做了折叠):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class ConsumerGroupExample { private final ConsumerConnector consumer; private final String topic; private ExecutorService executor; public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) { consumer = kafka.consumer.Consumer.createJavaConsumerConnector( createConsumerConfig(a_zookeeper, a_groupId)); this.topic = a_topic; } public void shutdown() { if (consumer != null) consumer.shutdown(); if (executor != null) executor.shutdown(); try { if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly"); } } catch (InterruptedException e) { System.out.println("Interrupted during shutdown, exiting uncleanly"); } } public void run(int a_numThreads) { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(a_numThreads)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); // 加载所有的线程 executor = Executors.newFixedThreadPool(a_numThreads); // 创建一个对象消费消息 int threadNumber = 0; for (final KafkaStream stream : streams) { executor.submit(new ConsumerTest(stream, threadNumber)); threadNumber++; } } private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) { Properties props = new Properties(); props.put("zookeeper.connect", a_zookeeper); props.put("group.id", a_groupId); props.put("zookeeper.session.timeout.ms", "400"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); return new ConsumerConfig(props); } public static void main(String[] args) { String zooKeeper = args[0]; String groupId = args[1]; String topic = args[2]; int threads = Integer.parseInt(args[3]); ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic); example.run(threads); try { Thread.sleep(10000); } catch (InterruptedException ie) { } example.shutdown(); } } |
参考文档
- https://cwiki.apache.org/confluence/display/KAFKA/Index
- https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
- http://www.cnblogs.com/fxjwind/p/3794255.html?utm_source=tuicool&utm_medium=referral
- http://www.open-open.com/lib/view/open1434551761926.html
- http://my.oschina.net/ielts0909/blog/110280
- http://my.oschina.net/infiniteSpace/blog/312890?p=1
- http://www.cnblogs.com/airwindow/archive/2012/06/24/2559754.html
##############################
发表评论