Kafka Producer程序示例

这次是要写一个Producer示例程序。使用的kafka版本是0.8.2。开发语言是java。

程序主体是一个Producer类。这个Producer类主要是用来为指定的topic创建消息。

首先需要引入一些支持类:

然后就是定义一些属性来告诉Producer如何找到Kafka集群、怎样对消息进行序列化以及怎样恰当地引导消息到指定的分区。这些属性是以Java Properties对象的形式定义的:

第一个属性“metadata.broker.list”定义了一个或者多个broker。Producer会为每个topic选择一个broker作为Leader。没必要将集群中所有的broker都添加到这个属性中,但是建议最少设置两个,以防止第一个broker不可用。不用考虑Kafka如何指明哪个broker作为topic(和partition)的Leader,kafka知道怎样与broker建立连接、请求元数据,并最终连接到正确的Broker。

第二个属性“serializer.class”定义了在准备传递消息给Broker时要使用的序列化类。在我们的示例程序中使用了Kafka提供的一个简单的序列化类StringEncoder。注意这里使用的encoder必须能够处理下一步在KeyedMessage中定义的类型。

也可以单独调整消息的Key使用的序列化类,这个可以通过恰当地定义“key.serializer.class”来实现。默认情况下,这个属性和“serializer.class”的值一致。

第三个属性“partitioner.class”定义了要使用哪个类来判断将消息发送给Topic中的哪个Partition。这个属性是可选的。不过在一些特殊的应用中,用户可能想自定义实现一个partition方案。稍后再讨论如何实现Partition。如果消息的key值不为null,可是又没有定义一个“partitioner.class”属性,Kafka将会使用默认的partitioner。如果key值为null,Producer将会把消息分配给一个随机的Partition。

最后一个属性“request.required.acks”告诉Kafka您希望broker在接收到消息后能发送一个确认信号给您的Producer。不设置这个属性的话,Producer将会“fire and forget(放弃并遗忘)”(消息),这有可能会导致数据的丢失。要了解更多,可以参考这个网页:http://kafka.apache.org/08/configuration.html。

然后就是定义Producer对象:

这里使用了java的泛型,您需要指明Producer两个参数的类型。第一个参数就是Partition的key的类型,第二个是消息的类型。在这个示例程序中,这两个属性都是String类型。还需要注意这两个属性需要和前面定义的的配置属性“serializer.class”和“partitioner.class”呼应。

现在开始构建要发送的消息:

这里我们伪造了一系列网站访问的IP信息。将消息以逗号分隔,第一部分是事件发生时的时间戳,第二部分是网址,第三部分是请求来源IP。在这里,我们使用java Random类来保证IP地址的最后八位字节不同,以便我们观察Partitioner是怎样工作的。

然后将消息发送到broker:

“page_visits”就是将要把消息写入的Topic。这里我们将IP作为partition的key。需要注意,如果不设置partition的key的话,即使已经定义了一个Partitioner类,Kafka仍然会将消息分配给一个随机的partition。

完整的代码如下:

自定义的Partitioner类:

在自定义的Partitioner类中,我们使用IP地址作为key。我们取出IP地址的最后八位字节与Kafka中topic的partition总数进行模运算。这种partition方案的好处就是所有相同IP的访问记录都会被放置到相同的partition。此外,Consumer处理逻辑也要知道怎样对之进行处理。

在运行我们的程序前,需要确认是否创建了名为”page_visits”的Topic。新建topic的指令如下:

就这样.

参考文档

https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example

######

发表评论

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据