3.2.1 创建基本的消息生产者

要在Kafka消息集群中写入消息,首先需要创建一个生产者对象。Kafka生产者有三个必选的属性,其他属性是可选的。下面列出了这三个必须设置的属性及它们的含义。

(1)bootstrap.servers。

该属性指定Kafka集群中Broker的地址列表,其地址的格式为host:port,如果有多个Broker地址,可以用逗号进行分隔。当然,该地址列表中不需要包含所有Broker,因为Kafka会将整个集群的元信息和配置信息存储在ZooKeeper中,生产者会从给定的Broker中,通过ZooKeeper查找到其他Kafka Broker地址信息。在生产环境中,建议至少要提供两个Broker地址信息,这样做的目的是支持容错。一旦其中一个Broker出现了宕机,生产者仍然能够通过另一个Broker连接到Kafka集群上。

(2)key.serializer。

发送到Kafka的消息需要经过序列化后,才能实现正常发送与转发。生产者将要发送的消息通过序列化器进行序列化后,生成一个key/value的值,并发送到Broker。当创建Kafka生产者的时候,生产者需要知道采用何种方式把消息(即Java对象)转换为字节数组。因此通过生产者端的参数key.serializer就是这一项配置的工作。它必须实现org.apache.kafka.common.serialization.Serializer接口,然后生产者会使用这个类把键对象序列化为字节数组。

(3)value.serializer。

value.serializer与key.serializer一样,用于指定的类会将值序列化。创建一个Kafka生产者的代码如下。

其中,第05行~第08行代码不是必需的,如果没有配置这些参数,将会采用默认的参数值。