从kafka的0.8.11版本开始,它会将consumer的offset提交给ZooKeeper。然而当offset的数量(consumer数量 * partition的数量)的很多的时候,ZooKeeper的适应性就可能会出现不足。幸运的是,Kafka现在提供了一种理想的机制来存储Consumer的offset。Kafka现在是将Consumer的offset写入到一个分布式持久化的、高可用的topic中。开发者可以通过消费这个topic的方式来获取Consumer的offset。为了提升访问速度,kafka还提供了offset的内存缓存。也就是说,现在提交offset是通过普通的生产者请求(代价并不高)来完成的,而获取offset则是通过在内存中的快速查询完成的。
Kafka的官方文档描述了这个特性是如何工作的,以及如何将offset从zookeeper迁移到kafka。下面的代码演示了如何利用基于kafka存储offset的特性。
第一步:通过发送consumer元数据请求到任意Broker来发现并连接offset manager:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
import kafka.api.*; import kafka.cluster.Broker; import kafka.common.OffsetAndMetadata; import kafka.common.OffsetMetadataAndError; import kafka.common.TopicAndPartition; import kafka.javaapi.ConsumerMetadataResponse; import kafka.javaapi.OffsetCommitRequest; import kafka.javaapi.OffsetCommitResponse; import kafka.javaapi.OffsetFetchRequest; import kafka.javaapi.OffsetFetchResponse; import kafka.network.BlockingChannel; import java.util.*; ... try { BlockingChannel channel = new BlockingChannel("localhost", 9092, BlockingChannel.UseDefaultBufferSize(), BlockingChannel.UseDefaultBufferSize(), 5000 /* read timeout in millis */); channel.connect(); final String MY_GROUP = "demoGroup"; final String MY_CLIENTID = "demoClientId"; int correlationId = 0; final TopicAndPartition testPartition0 = new TopicAndPartition("demoTopic", 0); final TopicAndPartition testPartition1 = new TopicAndPartition("demoTopic", 1); channel.send(new ConsumerMetadataRequest(MY_GROUP, ConsumerMetadataRequest.CurrentVersion(), correlationId++, MY_CLIENTID)); ConsumerMetadataResponse metadataResponse = ConsumerMetadataResponse.readFrom(channel.receive().buffer()); if (metadataResponse.errorCode() == ErrorMapping.NoError()) { Broker offsetManager = metadataResponse.coordinator(); // if the coordinator is different, from the above channel's host then reconnect channel.disconnect(); channel = new BlockingChannel(offsetManager.host(), offsetManager.port(), BlockingChannel.UseDefaultBufferSize(), BlockingChannel.UseDefaultBufferSize(), 5000 /* read timeout in millis */); channel.connect(); } else { // retry (after backoff) } } catch (IOException e) { // retry the query (after backoff) } ... |
第二步:发送OffsetCommitRequest 或者 OffsetFetchRequest到offset manager:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
// How to commit offsets long now = System.currentTimeMillis(); Map<TopicAndPartition, OffsetAndMetadata> offsets = new LinkedHashMap<TopicAndPartition, OffsetAndMetadata>(); offsets.put(testPartition0, new OffsetAndMetadata(100L, "associated metadata", now)); offsets.put(testPartition1, new OffsetAndMetadata(200L, "more metadata", now)); OffsetCommitRequest commitRequest = new OffsetCommitRequest( MY_GROUP, offsets, correlationId++, MY_CLIENTID, (short) 1 /* version */); // version 1 and above commit to Kafka, version 0 commits to ZooKeeper try { channel.send(commitRequest.underlying()); OffsetCommitResponse commitResponse = OffsetCommitResponse.readFrom(channel.receive().buffer()); if (commitResponse.hasError()) { for (partitionErrorCode: commitResponse.errors().values()) { if (partitionErrorCode == ErrorMapping.OffsetMetadataTooLargeCode()) { // You must reduce the size of the metadata if you wish to retry } else if (partitionErrorCode == ErrorMapping.NotCoordinatorForConsumerCode() || partitionErrorCode == ErrorMapping.ConsumerCoordinatorNotAvailableCode()) { channel.disconnect(); // Go to step 1 (offset manager has moved) and then retry the commit to the new offset manager } else { // log and retry the commit } } } } catch (IOException ioe) { channel.disconnect(); // Go to step 1 and then retry the commit } // How to fetch offsets List<TopicAndPartition> partitions = new ArrayList<TopicAndPartition>(); partitions.add(testPartition0); OffsetFetchRequest fetchRequest = new OffsetFetchRequest( MY_GROUP, partitions, (short) 1 /* version */, // version 1 and above fetch from Kafka, version 0 fetches from ZooKeeper correlationId, MY_CLIENTID); try { channel.send(fetchRequest.underlying()); OffsetFetchResponse fetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer()); OffsetMetadataAndError result = fetchResponse.offsets().get(testPartition0); short offsetFetchErrorCode = result.error(); if (offsetFetchErrorCode == ErrorMapping.NotCoordinatorForConsumerCode()) { channel.disconnect(); // Go to step 1 and retry the offset fetch } else if (errorCode == ErrorMapping.OffsetsLoadInProgress()) { // retry the offset fetch (after backoff) } else { long retrievedOffset = result.offset(); String retrievedMetadata = result.metadata(); } } catch (IOException e) { channel.disconnect(); // Go to step 1 and then retry offset fetch after backoff } |
原文:https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
####
发表评论