Smart's Blog

Kafka自定义消息的序列化与反序列化

序言:
我们在利用kafka做为消息中间件的使用过程中,有时候希望kafka存储的是自定义消息,如JavaBean,而kafka客户端只提供了String类型和Byte数组类型的消息序列化与反序列化的接口,这不能够满足我们的需求。所以我们需要通过实现kafka客户端提供的Serializer和Deserializer这两个接口来达到我们的目的。

1、 producer端序列化JavaBean并发送

这是一个简单的kafka的producer的demo,与平常我们使用的demo不同,producer的config中的value.serializer参数的值我们配置为我们自己定义的序列化类WritableBeanSerializer。

WritableBeanSerializer是我们自定义WritableBean的序列化类,这个类实现了kafka客户端的Serializer接口,通过实现其中的serialize方法,对WritableBean进行序列化。

对象的序列化操作就是将对象序列化成一个byte数组,即字节流的形式,这里我们利用Alibaba的fastJson实现,也可以使用其他的序列化框架或者自己实现序列化过程。

2、consumer端消费数据并反序列化

这是kafka的cosumer端的实现,与之前producer端一样,将value.deserializer参数配为我们自已定义的反序列化类WritableBeanSerializer,同时我们定义的KafkaConsumer和ConsumerRecords类内的value要定义为WritableBean类型,即这两行代码:

1
2
KafkaConsumer<String,WritableBean> consumer = new KafkaConsumer<>(props2);
ConsumerRecord<String,WritableBean> record;

以下是WritableBeanSerializer类的代码,实现Deserializer接口中的deserialize方法,同样也是使用fastJson进行反序列化,这个方法的返回类型可以由我们自己定义,因此我们直接定义为WritableBean类型。

最后需要注意的一点的是被序列化和反序列化的JavaBean一定要有默认的构造方法!