Kafka Simple Consumer

Kafka的Simple Consumer并不简单。相反的,它是直接在对Kafka的partition和offset进行操作,是一种比较接近底层的方案。

为什么要使用Simple Consumer

使用SimpleConsumer最主要的原因是想在消费消息时获取更大的权限。比如说要做下面这些事情:

  • 多次读取同一条消息;
  • 在一个处理过程中,只消费topic中partition的子集;
  • 进行事务管理,保证消息被消费了一次且只消费了一次。

使用Simple Consumer的负面影响

较之ConsumerGroup,使用SimpleConsumer需要做大量额外的工作:

  • 在应用中需要跟踪offset以便知道消费到哪里了;
  • 需要指明topic和partition对应的leader Broker;
  • 需要对leader Broker的改变做出应对。

使用SimpleBroker的步骤

  • 找到一个活跃Broker,并找出要消费的Topic和Partition的leader Broker;
  • 决定哪个Broker是要消费的Topic和Partition的副本Broker;
  • 建立请求,并定义要抓取的数据;
  • 抓取数据;
  • 确认并还原leader的变化。

找到一个Topic和Partition的Leader Broker

要找到Leader Broker最简单的解决方案就是传送一组已知的Broker到处理程序中,这可以通过配置信息或者命令行来完成。

这里没必要传递集群中全部的Broker给处理程序,只要提供少量的活跃Broker,而后程序可以通过这些Broker得到Leader Broker的信息。程序如下:

    private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) {
        PartitionMetadata returnMetaData = null;
        loop:
        for (String seed : a_seedBrokers) {
            SimpleConsumer consumer = null;
            try {
                consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");
                List<String> topics = Collections.singletonList(a_topic);
                TopicMetadataRequest req = new TopicMetadataRequest(topics);
                kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

                List<TopicMetadata> metaData = resp.topicsMetadata();
                for (TopicMetadata item : metaData) {
                    for (PartitionMetadata part : item.partitionsMetadata()) {
                        if (part.partitionId() == a_partition) {
                            returnMetaData = part;
                            break loop;
                        }
                    }
                }
            } catch (Exception e) {
                System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic
                        + ", " + a_partition + "] Reason: " + e);
            } finally {
                if (consumer != null) consumer.close();
            }
        }
        if (returnMetaData != null) {
            m_replicaBrokers.clear();
            for (kafka.cluster.Broker replica : returnMetaData.replicas()) {
                m_replicaBrokers.add(replica.host());
            }
        }
        return returnMetaData;
    }

在上面的程序中调用了topicsMetadata()方法,通过这个方法,程序可以向已经连接的Broker请求关于目标topic的全部细节。

对partitionsMetadata进行迭代循环会遍历所有的partitions,直到找到我们想要的partition。一旦我们找了想要的partition,将会立即跳出全部循环。

在代码中后面还记录了topic所有副本所在的broker。如果需要重新找出新的leader这些记录就可以派上用场了。

找到消费起始的offset

现在定义从哪儿开始读取数据。Kafka有两个常量可以派上用场:

  • kafka.api.OffsetRequest.EarliestTime():从日志中找到数据最开始的位置,并从该位置开始读取数据;
  • kafka.api.OffsetRequest.LatestTime():这个只传递新的消息。

假使已经有数据了,第一个方法会从头开始读取历史数据;第二个方法则不会读取历史数据,只读取新数据。

不要假设起始offset是0,因为随着时间推移,分区中的消息可能会被删除。

    public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
                                     long whichTime, String clientName) {
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
                requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
        OffsetResponse response = consumer.getOffsetsBefore(request);

        if (response.hasError()) {
            System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));
            return 0;
        }
        long[] offsets = response.offsets(topic, partition);
        return offsets[0];
    }

如果要读取最早的数据,在调用getLastOffset方法时,可以为whichTime赋值为kafka.api.OffsetRequest.EarliestTime();如果要读取最新的数据,可以为whichTime赋值为kafka.api.OffsetRequest.LatestTime()。

错误处理

因为SimpleConsumer不会处理关于Leader Broker的错误,需要写一些代码来解决这个问题:

            if (fetchResponse.hasError()) {
                numErrors++;
                // 出错啦!
                short code = fetchResponse.errorCode(a_topic, a_partition);
                System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);
                if (numErrors > 5) break;
                if (code == ErrorMapping.OffsetOutOfRangeCode()) {
                    // 请求了一个无效的offset。简单重置为最新的offset
                    readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);
                    continue;
                }
                consumer.close();
                consumer = null;
                leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);
                continue;
            }

如果fetchResponse.hasError()返回true,即出现了错误,我们会在日志上记录原因,并关闭consumer,然后尝试找出新的leader。

    private String findNewLeader(String a_oldLeader, int a_port, String a_topic, int a_partition) throws Exception {
        for (int i = 0; i < 3; i++) {
            boolean goToSleep = false;
            PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);
            if (metadata == null) {
                goToSleep = true;
            } else if (metadata.leader() == null) {
                goToSleep = true;
            } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
                // 第一次通过时,如果leader并没有变化,就留给zookeeper一秒时间进行恢复
                // 第二次,假如broker已经恢复过来,就可能不是Broker的问题
                goToSleep = true;
            } else {
                return metadata.leader().host();
            }
            if (goToSleep) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ie) {
                }
            }
        }
        System.out.println("Unable to find new leader after Broker failure. Exiting");
        throw new Exception("Unable to find new leader after Broker failure. Exiting");
    }

在这个方法中,我们调用了早些时候定义的findLeader()方法来找出新的leader。如果我们尝试连接的只是topic或partition的一个副本,程序将无法尝试找出新的leader。这样我们也无法再从Broker中读取到需要的数据,然后放弃读取任务并抛出异常退出。

Zookeeper需要一小段时间才能发现leader不存在了并尝试重新指定一个leader,因此处理线程在得不到回复的情况下会先sleep一小段时间。事实上,Zookeeper执行错误恢复的速度非常快,通常不需要sleep等待。

读取数据

最后从partition中读取topic数据并输出:

            if (consumer == null) {
                consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
            }

            // 在使用FetchRequestBuilder时,注意不要调用.replicaId()方法,这个方法只在内部使用
            // 错误地设置replicaId将会导致broker异常
            FetchRequest req = new FetchRequestBuilder()
                    .clientId(clientName)
                    .addFetch(a_topic, a_partition, readOffset, 100000)// 注意:如果写入Kafka的量很大的话,可以将fetchSize设置得比100000更大一些
                    .build();
            FetchResponse fetchResponse = consumer.fetch(req);

            if (fetchResponse.hasError()) {
                // 前面已经介绍过这部分代码
            }
            numErrors = 0;

            long numRead = 0;
            for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {
                long currentOffset = messageAndOffset.offset();
                if (currentOffset < readOffset) {
                    System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);
                    continue;
                }
                readOffset = messageAndOffset.nextOffset();
                ByteBuffer payload = messageAndOffset.message().payload();

                byte[] bytes = new byte[payload.limit()];
                payload.get(bytes);
                System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));
                numRead++;
                a_maxReads--;
            }

            if (numRead == 0) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ie) {
                }
            }

注意readOffset请求的是在读取消息后需要的下一个offset。这样当程序处理完当前的消息块以后就可以知道从哪里开始继续抓取消息了。

还有一点需要注意,就是在程序中我们特意判断了读取到的offset是否比readOffset的值小。这是操作是有必要的。如果Kafka正在对消息进行压缩,抓取请求将会返回一个完全压缩后的消息块,尽管最开始readOffset返回的值不是这个压缩后的消息块的起始位置。因此,我们之前曾经见到过的消息也有可能会被再次返回。还要注意的是我们请求的fetchSize的长度是100000 bytes。如果此时Kafka的Producer正在大批量写入,这个长度可能就不够,也就有可能返回空的消息集合。在这种情况下,需要调整fetchSize的值,直到不再继续返回空消息集。

最后,我们会持续记录读取的消息的数量。如果在上一次请求中没有读取到任何数据,读取线程将会sleep一秒钟。这样程序就不会在没有数据的情况下还反复向Kafka发起请求。

运行示例程序

运行示例程序需要如下参数:

  • 要读取的消息的最大总数(一个整型值,这样我们的程序不会一直循环下去);
  • 要读取的Topic(一个字符串,比如dmp_xxx);
  • 要读取的Partition(一个整型值,即Partition ID);
  • 一个用来查找Metadata的Broker(Broker IP,如127.0.0.1);
  • Broker监听的端口(如 9092)。

源代码

程序源代码如下,这里我暂时做了折叠:

import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.*;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class SimpleExample {
    public static void main(String args[]) {
        SimpleExample example = new SimpleExample();
        long maxReads = Long.parseLong(args[0]);
        String topic = args[1];
        int partition = Integer.parseInt(args[2]);
        List<String> seeds = new ArrayList<String>();
        seeds.add(args[3]);
        int port = Integer.parseInt(args[4]);
        try {
            example.run(maxReads, topic, partition, seeds, port);
        } catch (Exception e) {
            System.out.println("Oops:" + e);
            e.printStackTrace();
        }
    }

    private List<String> m_replicaBrokers = new ArrayList<String>();

    public SimpleExample() {
        m_replicaBrokers = new ArrayList<String>();
    }

    public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception {
        // find the meta data about the topic and partition we are interested in
        //
        PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);
        if (metadata == null) {
            System.out.println("Can't find metadata for Topic and Partition. Exiting");
            return;
        }
        if (metadata.leader() == null) {
            System.out.println("Can't find Leader for Topic and Partition. Exiting");
            return;
        }
        String leadBroker = metadata.leader().host();
        String clientName = "Client_" + a_topic + "_" + a_partition;

        SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
        long readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName);

        int numErrors = 0;
        while (a_maxReads > 0) {

            if (consumer == null) {
                consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
            }

            // 在使用FetchRequestBuilder时,注意不要调用.replicaId()方法,这个方法只在内部使用
            // 错误地设置replicaId将会导致broker异常
            FetchRequest req = new FetchRequestBuilder()
                    .clientId(clientName)
                    .addFetch(a_topic, a_partition, readOffset, 100000)// 注意:如果写入Kafka的量很大的话,可以将fetchSize设置得比100000更大一些
                    .build();
            FetchResponse fetchResponse = consumer.fetch(req);

            if (fetchResponse.hasError()) {
                numErrors++;
                // 出错啦!
                short code = fetchResponse.errorCode(a_topic, a_partition);
                System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);
                if (numErrors > 5) break;
                if (code == ErrorMapping.OffsetOutOfRangeCode()) {
                    // 请求了一个无效的offset。简单重置为最新的offset
                    readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);
                    continue;
                }
                consumer.close();
                consumer = null;
                leadBroker = findNewLeader(leadBroker, a_port, a_topic, a_partition);
                continue;
            }
            numErrors = 0;

            long numRead = 0;
            for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {
                long currentOffset = messageAndOffset.offset();
                if (currentOffset < readOffset) {
                    System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);
                    continue;
                }
                readOffset = messageAndOffset.nextOffset();
                ByteBuffer payload = messageAndOffset.message().payload();

                byte[] bytes = new byte[payload.limit()];
                payload.get(bytes);
                System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));
                numRead++;
                a_maxReads--;
            }

            if (numRead == 0) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ie) {
                }
            }
        }
        if (consumer != null) consumer.close();
    }

    public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
                                     long whichTime, String clientName) {
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
                requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
        OffsetResponse response = consumer.getOffsetsBefore(request);

        if (response.hasError()) {
            System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));
            return 0;
        }
        long[] offsets = response.offsets(topic, partition);
        return offsets[0];
    }


    private String findNewLeader(String a_oldLeader, int a_port, String a_topic, int a_partition) throws Exception {
        for (int i = 0; i < 3; i++) {
            boolean goToSleep = false;
            PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);
            if (metadata == null) {
                goToSleep = true;
            } else if (metadata.leader() == null) {
                goToSleep = true;
            } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
                // 第一次通过时,如果leader并没有变化,就留给zookeeper一秒时间进行恢复
                // 第二次,假如broker已经恢复过来,就可能不是Broker的问题
                goToSleep = true;
            } else {
                return metadata.leader().host();
            }
            if (goToSleep) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ie) {
                }
            }
        }
        System.out.println("Unable to find new leader after Broker failure. Exiting");
        throw new Exception("Unable to find new leader after Broker failure. Exiting");
    }


    private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) {
        PartitionMetadata returnMetaData = null;
        loop:
        for (String seed : a_seedBrokers) {
            SimpleConsumer consumer = null;
            try {
                consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");
                List<String> topics = Collections.singletonList(a_topic);
                TopicMetadataRequest req = new TopicMetadataRequest(topics);
                kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

                List<TopicMetadata> metaData = resp.topicsMetadata();
                for (TopicMetadata item : metaData) {
                    for (PartitionMetadata part : item.partitionsMetadata()) {
                        if (part.partitionId() == a_partition) {
                            returnMetaData = part;
                            break loop;
                        }
                    }
                }
            } catch (Exception e) {
                System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic
                        + ", " + a_partition + "] Reason: " + e);
            } finally {
                if (consumer != null) consumer.close();
            }
        }
        if (returnMetaData != null) {
            m_replicaBrokers.clear();
            for (kafka.cluster.Broker replica : returnMetaData.replicas()) {
                m_replicaBrokers.add(replica.host());
            }
        }
        return returnMetaData;
    }
}

##########

发表评论

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理