这次是要写一个Producer示例程序。使用的kafka版本是0.8.2。开发语言是java。
程序主体是一个Producer类。这个Producer类主要是用来为指定的topic创建消息。
首先需要引入一些支持类:
1 2 3 |
import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; |
然后就是定义一些属性来告诉Producer如何找到Kafka集群、怎样对消息进行序列化以及怎样恰当地引导消息到指定的分区。这些属性是以Java Properties对象的形式定义的:
1 2 3 4 5 6 7 |
Properties props = new Properties(); props.put("metadata.broker.list", "broker1:9092,broker2:9092 "); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("partitioner.class", "example.producer.SimplePartitioner"); props.put("request.required.acks", "1"); ProducerConfig config = new ProducerConfig(props); |
第一个属性“metadata.broker.list”定义了一个或者多个broker。Producer会为每个topic选择一个broker作为Leader。没必要将集群中所有的broker都添加到这个属性中,但是建议最少设置两个,以防止第一个broker不可用。不用考虑Kafka如何指明哪个broker作为topic(和partition)的Leader,kafka知道怎样与broker建立连接、请求元数据,并最终连接到正确的Broker。
第二个属性“serializer.class”定义了在准备传递消息给Broker时要使用的序列化类。在我们的示例程序中使用了Kafka提供的一个简单的序列化类StringEncoder。注意这里使用的encoder必须能够处理下一步在KeyedMessage中定义的类型。
也可以单独调整消息的Key使用的序列化类,这个可以通过恰当地定义“key.serializer.class”来实现。默认情况下,这个属性和“serializer.class”的值一致。
第三个属性“partitioner.class”定义了要使用哪个类来判断将消息发送给Topic中的哪个Partition。这个属性是可选的。不过在一些特殊的应用中,用户可能想自定义实现一个partition方案。稍后再讨论如何实现Partition。如果消息的key值不为null,可是又没有定义一个“partitioner.class”属性,Kafka将会使用默认的partitioner。如果key值为null,Producer将会把消息分配给一个随机的Partition。
最后一个属性“request.required.acks”告诉Kafka您希望broker在接收到消息后能发送一个确认信号给您的Producer。不设置这个属性的话,Producer将会“fire and forget(放弃并遗忘)”(消息),这有可能会导致数据的丢失。要了解更多,可以参考这个网页:http://kafka.apache.org/08/configuration.html。
然后就是定义Producer对象:
1 |
Producer<String, String> producer = new Producer<String, String>(config); |
这里使用了java的泛型,您需要指明Producer两个参数的类型。第一个参数就是Partition的key的类型,第二个是消息的类型。在这个示例程序中,这两个属性都是String类型。还需要注意这两个属性需要和前面定义的的配置属性“serializer.class”和“partitioner.class”呼应。
现在开始构建要发送的消息:
1 2 3 4 |
Random rnd = new Random(); long runtime = new Date().getTime(); String ip = "192.168.2." + rnd.nextInt(255); String msg = runtime + ",www.example.com," + ip; |
这里我们伪造了一系列网站访问的IP信息。将消息以逗号分隔,第一部分是事件发生时的时间戳,第二部分是网址,第三部分是请求来源IP。在这里,我们使用java Random类来保证IP地址的最后八位字节不同,以便我们观察Partitioner是怎样工作的。
然后将消息发送到broker:
1 2 |
KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg); producer.send(data); |
“page_visits”就是将要把消息写入的Topic。这里我们将IP作为partition的key。需要注意,如果不设置partition的key的话,即使已经定义了一个Partitioner类,Kafka仍然会将消息分配给一个随机的partition。
完整的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
import java.util.*; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class TestProducer { public static void main(String[] args) { long events = Long.parseLong(args[0]); Random rnd = new Random(); Properties props = new Properties(); props.put("metadata.broker.list", "broker1:9092,broker2:9092 "); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("partitioner.class", "example.producer.SimplePartitioner"); props.put("request.required.acks", "1"); ProducerConfig config = new ProducerConfig(props); Producer<String, String> producer = new Producer<String, String>(config); for (long nEvents = 0; nEvents < events; nEvents++) { long runtime = new Date().getTime(); String ip = “192.168.2.” + rnd.nextInt(255); String msg = runtime + “,www.example.com,” + ip; KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg); producer.send(data); } producer.close(); } } |
自定义的Partitioner类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
import kafka.producer.Partitioner; import kafka.utils.VerifiableProperties; public class SimplePartitioner implements Partitioner { public SimplePartitioner (VerifiableProperties props) { } public int partition(Object key, int a_numPartitions) { int partition = 0; String stringKey = (String) key; int offset = stringKey.lastIndexOf('.'); if (offset > 0) { partition = Integer.parseInt( stringKey.substring(offset+1)) % a_numPartitions; } return partition; } } |
在自定义的Partitioner类中,我们使用IP地址作为key。我们取出IP地址的最后八位字节与Kafka中topic的partition总数进行模运算。这种partition方案的好处就是所有相同IP的访问记录都会被放置到相同的partition。此外,Consumer处理逻辑也要知道怎样对之进行处理。
在运行我们的程序前,需要确认是否创建了名为”page_visits”的Topic。新建topic的指令如下:
1 |
bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 3 --partitions 5 --topic page_visits |
就这样.
参考文档
https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example
######
发表评论