3.2.2 发送自定义消息对象
前面提到,Kafka生产者发送的消息必须经过序列化。实现序列化可以简单地总结为两步,第一步继承序列化Serializer接口;第二步实现接口方法,将指定类型序列化成byte[],或者将byte[]反序列化成指定数据类型。接下来,我们来实现序列化/反序列化方式。
实现Java对象的序列化有很多不同的方式。这里我们介绍基于FastJson的序列化方式。Fastjson是一个Java库,可以将Java对象转换为JSON格式,当然它也可以将JSON字符串转换为Java对象,加入以下依赖。
下面通过一个具体的案例演示如何使用Fastjson的一个Java对象进行序列化,并将其作为Kafka生产者的消息发送到Kafka消息集群中。使用下面的员工数据进行测试,其数据格式描述如表3-1所示。
表3-1 员工数据格式
下面展示了Employee.java的完整代码,为了输出结果方便,这里还重写了Employee类的toString方法。
为了将Employee对象进行序列化,创建一个EmployeeJSONSerializer类并使用Fastjson将其序列化成一个JSON对象,完整的代码如下。
最后,创建一个EmployeeProducer来将Employee对象发送到Kafka集群的Broker上。
我们创建EmployeeConsumer来消费消息,图3.2展示了程序运行的效果。
图3.2 EmployeeConsumer的运行效果
关于EmployeeConsumer的完整代码,将会在后面章节展示。