Smart's Blog

Kafka的网络通信机制

序言:kafka的网络层设计架构如下图所示,而整个网络层内最主要的实现类就是SocketServer,它实现了一个基于NIO的socket服务器,负责kafka Broker的网络请求的接收与发送。我们再根据图中的流程详细看一看kafka的网络通信机制。


image

1、实例化SocketServer对象

       KafkaServer在启动时会实例化一个SocketServer对象,这个类有如下属性:

image

       首先,图中的config对象对应是kafka Broker的配置属性,然后我们看一看几个关键的属性:
       endpoints是一个map,其中的元素是config.listeners配置的网络地址列表,也就是SocketServer监听的端口地址,此外Acceptor线程的个数这个map的size,即一个Acceptor线程监听一个网络地址;
       numProcessorThreads的值等于config.numNetworkThreads,表示一个Acceptor线程所拥有的Processor线程数;
       totalProcessorThreads表示总共所需的工作线程数,即Processor线程数;
       requestChannel是RequestChannel类的一个引用,这个类负责维护消息缓存队列requestQueue和responseQueues,包括队列的生成以及消息的put和poll;
       processors是实际的工作线程,后面还会详细介绍;
       acceptors是Endpoint和Acceptor的Map集合,后面也会详细介绍。

2、启动SocketServer

       实例化SocketServer对象后,KafkaServer调用其startup()方法,启动这个Socket服务器。我们来看一看startup()方法都做了哪些事情。以下是startup()方法的主要代码片段:


image

       可以看到首先会读取一些配置项信息,然后遍历endpoints,每次遍历都会new一个新的Acceptor线程对象以及对应个数的Processor线程对象,然后调用工具类来启动Acceptor线程。

3、Acceptor线程

       Acceptor线程负责监听指定的网络地址端口。在初始化一个Acceptor线程对象过程中,这个类首先会打开一个Selector选择器,然后创建一个ServerSocketChannel,最后遍历它所拥有的processor线程数组,将其中的processor线程对象启动。代码如下:


image

       SocketServer的startup()方法中会启动所有的Acceptor线程,所以我们接下来看一看Acceptor线程的run()方法。首先,Acceptor线程会将nioSelector注册到serverChannel中,注册的事件类型为OP_ACCEPT。随后进入while循环,调用nioSelector的select()方法,从该selecor注册的所有channel中查找已经做好io操作准备的channel,该方法将返回符合条件的channel的数量,如果返回值大于0,就认为有新的连接请求进来,就调用selectedKeys()方法,这个方法将返回一个Set,这个Set是所有符合条件的channel的key的集合。接着遍历这个set,每循环一次就将一个key和一个processor线程交给Acceptor线程的accept()方法去处理。代码片段如下:


image

       然后来看accept()方法怎么继续处理。这个方法会根据key(一个SelectedKey对象)提供的方法得到一个已经准备好的ServerSocketChannel对象,之后将这个ServerSocketChannel对象交给processor的accept()对象去处理,processor的accept()方法会将这个ServerSocketChannel加入到他自己的消息缓存队列中,然后唤醒processor线程自身的一个selector。到这里为止,Acceptor线程的任务就完成了,整体来看,它负责建立对网络地址端口的监听,并且当有新连接进入时,Acceptor线程负责选取对应的ServerSocketChannel,然后将这个channel交给processor工作线程去处理。
4、Processor线程

       每个Processor线程属于一个Acceptor线程,它只会去处理它对应的Acceptor线程接收到的请求,同时每个Processor线程都有一个自己的消息缓存队列,所有的新进来的连接都会先被缓存在这个队列里,这个队列在创建Processor对象时就初始化完毕:

1
private val newConnections = new ConcurrentLinkedQueue[SocketChannel]()

       我们知道,在创建Acceptor线程对象时,这个Acceptor所拥有的processor线程就已经启动了,我们先看一下它的run()方法:


image

       可以看到它的run()方法也是有一个一直循环的while循环,循环体内调用的几个方法是我们重点关注的对象,依次看一下这几个方法的作用。
       第一个是configureNewConnections()方法。这个方法的主要作用是从消息缓存队列newConnections()内拉取SocketChannel对象,并将SocketChannel交给kafka自己实现的一个Selector去完成selector和channel的注册步骤。
       第二个是processNewResponse()方法,这个方法负责发送response,后面再做介绍。
       第三个是poll()方法,这个也是Kafka自身的Selector的一个方法,它负责封装这个Selector中准备好的请求连接,然后将连接对象添加到CompletedReceives这个List中。
       第四个processCompleteReceives()方法,它遍历CompletedReceives这个List,每次循环都会根据list内的连接对象的属性生成一个新的RequestChannel.Request对象,然后通过RequestChannel的sendRequest()方法将这个Request对象put到requestQueue队列中。同时,我们在启动KafkaServer时,会启动KafkaRequestHandler线程,这个线程的run()方法中的while循环体,从requestQueue队列中拉取Request对象,如果有,就将这个Request对象交给KafkaApis的handle方法去处理,接着这个请求会被一层一层的解析,根据请求信息去完成其对应的业务需求。
       后面两个方法则就负责处理一些后续工作,如相关消息缓存队列内对象的remove等等(未仔细研究)。
       最后再看一看之前提到的processNewResponse()方法。前面的processCompleteReceives()方法已经将封装好的请求对象交给KafkaApis去handle,而API层处理请求后也会生成调用相应的回调函数,回调函数会将处理请求过程中生成的Response对象put到responseQueues队列中,这时processNewResponse()方法就会从队列中取出Response对象,然后再通过Kafka的Selector发送出去。
       以上就是kafka的网络通信机制处理请求和返回消息的具体流程,通过SocketServer服务器来接收请求和分发请求,具体分别由Acceptor线程和Processor线程来负责。同时增加消息缓存队列来解耦网络层和API层,使他们可以异步的完成各自的任务。
       最后,Kafka还通过java语言实现了一套自己的NIO网络通信组件,比如Kafka有自己的Selector类和自己的KafkaChannel类。SocketSever从外部网络获取到ServerSocketChannel后,Kafka自身的Selector也会在这个Channel上进行注册,之后从channel读取请求到完成请求的封装都由kafka自身的Selector去实现,这样也减轻了SocketServer服务器的压力。