前面我们介绍了消费者和生产者,从这篇文章开始我们来看一下它的服务端的设计。 Kafka的网络层是采用多线程,多个Selector的设计实现的。核心类是SocketServer,其中包含一个Acceptor 用于接收并处理所有的新连接,每个Acceptor对应多个Processor线程,每个Processor线程拥有自己的Selector,主要用于从连接中读取请求和写回响应。每个Acceptor对应多个Handler线程,主要用于处理请求并将产生的响应返回给Processor线程。Processor线程与Handler线程之间通过RequestChannel进行通信。 下面我们来看SocketServer的具体实现: endpoints:Endpoint集合。一般的服务器都有多块网卡,可以配置多个IP,Kafka可以同时监听多个端口。EndPoint类中封装了需要监听的host、port及使用的网络协议。每个EndPoint都会创建一个对应的Acceptor对象。 numProcessorThreads:Processor线程的个数 totalProcessorThreads:Processor线程的总个数 maxQueuedRequests:在RequestChannel的requestQueue中缓存的最大请求个数 maxConnectionsPerIp:每个IP上能创建的最大连接数 maxConnectionsPerIpOverrides:Map<String,Int>类型,具体指定某IP上最大的连接数,这里指定的最大连接数会覆盖上面的maxConnectionsPerIp字段的值 requestChannel:Processor线程与Handler线程之间交换数据的队列 acceptors:Acceptor对象集合 processors:Processor线程的集合 connectionQuotas:ConnectionQuotas类型的对象,提供了控制每个IP上的最大连接数的功能。 我们先来看一下SocketServer的初始化流程:
//创建RequestChannel,其中有totalProcessorThreads个responseQueue个队列 val requestChannel = new RequestChannel(totalProcessorThreads, maxQueuedRequests) //创建Processor数组 private val processors = new Array[Processor](totalProcessorThreads) //创建Acceptor的集合 private[network] val acceptors = mutable.Map[EndPoint, Acceptor]() //向RequestChannel中添加一个监听器。此监听器实现的功能是:当Handler线程向某个ResponseQueue中写入数据时,会唤醒对应的Processor线程进行处理 requestChannel.addResponseListener(id => processors(id).wakeup()) //SocketServer初始化的核心代码 def startup() { this.synchronized { //创建connectionQuotas connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides) //Socket的sendBuffer大小 val sendBufferSize = config.socketSendBufferBytes //Socket的recvBuffer大小 val recvBufferSize = config.socketReceiveBufferBytes val brokerId = config.brokerId var processorBeginIndex = 0 //遍历endpoints集合 endpoints.values.foreach { endpoint => val protocol = endpoint.protocolType val processorEndIndex = processorBeginIndex + numProcessorThreads //从processorBeginIndex 到processorEndIndex都是当前endpoint对应的Processor for (i <- processorBeginIndex until processorEndIndex) processors(i) = newProcessor(i, connectionQuotas, protocol) //为当前endPoint创建一个acceptor val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId, processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas) acceptors.put(endpoint, acceptor) //启动acceptor' Utils.newThread("kafka-socket-acceptor-%s-%d".format(protocol.toString, endpoint.port), acceptor, false).start() //阻塞等待acceptor启动完成 acceptor.awaitStartup() processorBeginIndex = processorEndIndex } }在初始化代码中,主要就是对我们前面介绍的那几个字段进行了初始化
def shutdown() = { info("Shutting down") this.synchronized { acceptors.values.foreach(_.shutdown) processors.foreach(_.shutdown) } info("Shutdown completed") }关闭操作会把所有的acceptor和processor关闭 Acceptor和Processor都继承了AbstractServerThread,他是实现了Runnable接口的抽象类,提供了一些启动关闭相关的控制类方法,他有四个关键字段: alive:标识当前线程是否存活 shutdownLatch:标识当前线程shutdown是否完成 startupLatch:标识了当前线程的startup操作是否完成 connectionQuotas:在close方法中,根据传入的ConnectionId,关闭SocketChannel并减少ConnectionQuotas中记录的连接数 下main看一下几个常用的方法:
def shutdown(): Unit = { //标志为关闭状态 alive.set(false) //唤醒当前线程 wakeup() //阻塞到shutdown完成 shutdownLatch.await() } //阻塞等待启动完成 def awaitStartup(): Unit = startupLatch.await //启动完成,唤醒线程 protected def startupComplete() = { startupLatch.countDown() }//shutdown完成,唤醒线程
protected def shutdownComplete() = shutdownLatch.countDown() //关闭连接 def close(selector: KSelector, connectionId: String) { //获取到当前要关闭的channel val channel = selector.channel(connectionId) if (channel != null) { debug(s"Closing selector connection $connectionId") val address = channel.socketAddress //修改连接数 if (address != null) connectionQuotas.dec(address) //关闭连接 selector.close(connectionId) } }Acceptor的作用是用来接收连接请求,创建Socket并为它分配一个Processor
//创建NIOSelector private val nioSelector = NSelector.open() //创建一个ServerSocketChannel val serverChannel = openServerSocket(endPoint.host, endPoint.port) this.synchronized { //为没个Processor创建并启动一个线程 processors.foreach { processor => Utils.newThread("kafka-network-thread-%d-%s-%d".format(brokerId, endPoint.protocolType.toString, processor.id), processor, false).start() } }下面看一下Acceptor的核心逻辑run方法:
def run() { //将当前ServerSocketChannel注册到selector上,对OP_ACCEPT事件感兴趣 serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT) //标识启动操作已经完成 startupComplete() try { var currentProcessor = 0 //检测运行状态 while (isRunning) { try { //阻塞500ms轮询io事件 val ready = nioSelector.select(500) if (ready > 0) { val keys = nioSelector.selectedKeys() val iter = keys.iterator() //遍历轮询到的io事件 while (iter.hasNext && isRunning) { try { val key = iter.next iter.remove() if (key.isAcceptable) accept(key, processors(currentProcessor)) else throw new IllegalStateException("Unrecognized key state for acceptor thread.") // round robin to the next processor thread currentProcessor = (currentProcessor + 1) % processors.length } catch { case e: Throwable => error("Error while accepting connection", e) } } } } catch { case e: ControlThrowable => throw e case e: Throwable => error("Error occurred", e) } } } finally { debug("Closing server socket and selector.") swallowError(serverChannel.close()) swallowError(nioSelector.close()) shutdownComplete() } }在方法里面首先实现了对OP_ACCEPT事件的监听,然后在一个循环中不断轮询io事件和处理io事件,主要是对accpet事件的处理:
def accept(key: SelectionKey, processor: Processor) { val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel] //创建socketChannel val socketChannel = serverSocketChannel.accept() try { //增加连接数 connectionQuotas.inc(socketChannel.socket().getInetAddress) //设置为非阻塞 socketChannel.configureBlocking(false) socketChannel.socket().setTcpNoDelay(true) socketChannel.socket().setKeepAlive(true) socketChannel.socket().setSendBufferSize(sendBufferSize) debug("Accepted connection from %s on %s and assigned it to processor %d, sendBufferSize [actual|requested]: [%d|%d] recvBufferSize [actual|requested]: [%d|%d]" .format(socketChannel.socket.getRemoteSocketAddress, socketChannel.socket.getLocalSocketAddress, processor.id, socketChannel.socket.getSendBufferSize, sendBufferSize, socketChannel.socket.getReceiveBufferSize, recvBufferSize)) //将当前socketChannel交给processor处理 processor.accept(socketChannel) } catch { case e: TooManyConnectionsException => info("Rejected connection from %s, address already has the configured maximum of %d connections.".format(e.ip, e.count)) close(socketChannel) } }Processor主要用于完成读取请求和写回响应的操作,Processor不参与具体逻辑的处理 newConnections:保存了由此Processor处理的新建的SocketChannel inflightResponses:保存未发送的响应 selector:KSelector类型 requestChannel:Processor与Handler线程之间传递数据的队列
def accept(socketChannel: SocketChannel) { newConnections.add(socketChannel) wakeup() }首先processor会将分配的socketchannel添加到自己的队列中,然后唤醒线程来工作:
def wakeup = selector.wakeup()这里是KSelector,最终会调用NSelector来完成唤醒工作 下面我们先来看一下processor的run方法的整体流程: 1、首先调用startComplete方法,标识已经初始化完成 2、处理newConnections队列中的新建的SocketChannel
private def configureNewConnections() { while (!newConnections.isEmpty) { //从队列中弹出一个未处理的socketchannel val channel = newConnections.poll() try { debug(s"Processor $id listening to new connection from ${channel.socket.getRemoteSocketAddress}") //获取channel上的一些信息 val localHost = channel.socket().getLocalAddress.getHostAddress val localPort = channel.socket().getLocalPort val remoteHost = channel.socket().getInetAddress.getHostAddress val remotePort = channel.socket().getPort val connectionId = ConnectionId(localHost, localPort, remoteHost, remotePort).toString //注册到selector上 selector.register(connectionId, channel) } catch { // We explicitly catch all non fatal exceptions and close the socket to avoid a socket leak. The other // throwables will be caught in processor and logged as uncaught exceptions. case NonFatal(e) => // need to close the channel here to avoid a socket leak. close(channel) error(s"Processor $id closed connection from ${channel.getRemoteAddress}", e) } } } public void register(String id, SocketChannel socketChannel) throws ClosedChannelException { //注册OP_READ事件 SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_READ); KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize); key.attach(channel); //添加到集合中 this.channels.put(id, channel); }3、获取RequestChannel中对应的responseQueue队列,并处理其中缓存的Response。如果Response是SendAction类型,表示该Resposne需要发送给客户端,则查找对应的KafkaChannel,为其注册OP_WRITE事件,并将KafkaChannel.send字段指向待发送的Response对象。同时还会将Response从responseQueue队列移除,放入inflightResponses中。如果Response是NoOpAction类型,表示此连接暂无响应需要发送,则为KafkaChannel注册OP_READ,允许其继续读取请求。如果Response是CloseConnectionAction类型,则关闭对应的连接
private def processNewResponses() { //获取当前processor对应的Response var curr = requestChannel.receiveResponse(id) while (curr != null) { try { //根据Response不同的模式来选择不同的处理方法 curr.responseAction match { case RequestChannel.NoOpAction => curr.request.updateRequestMetrics trace("Socket server received empty response to send, registering for read: " + curr) selector.unmute(curr.request.connectionId) case RequestChannel.SendAction => sendResponse(curr) case RequestChannel.CloseConnectionAction => curr.request.updateRequestMetrics trace("Closing socket connection actively according to the response code.") close(selector, curr.request.connectionId) } } finally { curr = requestChannel.receiveResponse(id) } } }4、调用SocketServer.poll方法读取请求,发送响应。poll方法底层调用的是KSelector.poll方法
private def poll() { try selector.poll(300) catch { case e @ (_: IllegalStateException | _: IOException) => error(s"Closing processor $id due to illegal state or IO exception") swallow(closeAll()) shutdownComplete() throw e } }每次调用都会将读取的请求、发送成功的请求以及断开的连接放入其completedReceives、completeSends、disconnected队列中等待处理 5、调用processCompletedReceives方法处理KSelector.completedReceives队列。
private def processCompletedReceives() { //遍历这个队列 selector.completedReceives.asScala.foreach { receive => try { //获取对应的KafkaChannel val channel = selector.channel(receive.source) //创建KafkaChannel对应的Session对象,与权限控制相关 val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName), channel.socketAddress) //将NetworkReceive、ProcessorId、身份认证信息封装成RequestChannel.Request对象 val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol) //将RequestChannel.Request放入RequestChannel.requestQueue队列中等待处理 requestChannel.sendRequest(req) //取消注册的OP_READ事件,连接不再读取数据 selector.mute(receive.source) } catch { case e @ (_: InvalidRequestException | _: SchemaException) => // note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier error(s"Closing socket for ${receive.source} because of error", e) close(selector, receive.source) } } }6、调用processCompletedSends方法处理KSelector.completedSends队列。首先inflightResponses中保存的对应的Response删除。之后,为对应连接重新注册OP_READ事件,允许从该连接读取数据
private def processCompletedSends() { //遍历队列 selector.completedSends.asScala.foreach { send => //此相应已经发送出去,从队列中删除 val resp = inflightResponses.remove(send.destination).getOrElse { throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`") } resp.request.updateRequestMetrics() //允许此连接读取数据 selector.unmute(send.destination) } }7、调用processDisconnected方法处理KSelector.disconnected队列。先从inflightResponses中删除该连接对应的所有Response。然后,减少ConnectionQuotas中记录的连接数。
private def processDisconnected() { selector.disconnected.asScala.foreach { connectionId => val remoteHost = ConnectionId.fromString(connectionId).getOrElse { throw new IllegalStateException(s"connectionId has unexpected format: $connectionId") }.remoteHost inflightResponses.remove(connectionId).foreach(_.request.updateRequestMetrics()) // the channel has been closed by the selector but the quotas still need to be updated connectionQuotas.dec(InetAddress.getByName(remoteHost)) } }8、当调用SocketServer.shutdown关闭整个SocketServer时,将alive字段设置为false。 下面我们来看具体的实现:
override def run() { //标识processor启动完成 startupComplete() while (isRunning) { try { // 处理新的连接 configureNewConnections() // 处理RequestChannel中缓存的响应 processNewResponses() //读取请求 poll() //处理不同队列中的请求 processCompletedReceives() processCompletedSends() processDisconnected() } catch { case e: ControlThrowable => throw e case e: Throwable => error("Processor got uncaught exception.", e) } } debug("Closing selector - processor " + id) swallowError(closeAll()) shutdownComplete() }RequestChannel Processor线程与Handler线程之间传递数据是通过RequestChannel完成的。在RequestChannel中包含了一个requestQueue队列和多个responseQueues队列,每个Processor线程对应一个responseQueue。Processor线程将读取到的请求存入requestQueue中,Handler线程从requestQueue中,取出请求进行处理;Handler线程处理请求产生的响应会存放到Processor对应的responseQueue中,Processor线程会从对应的responseQueue中取出响应并发送给客户端。 我们先来看一下RequestChannel的几个核心字段: requestQueue:Processor线程向Handler线程传递请求的队列。 responseQueues:一个数组,Handler线程向Processor线程传递响应的队列 numProcessors:Processor线程的数目 queueSize:缓存请求的最大个数 responseListeners:监听器列表
//向responseQueue队列中添加SendAction类型的Response def sendResponse(response: RequestChannel.Response) { responseQueues(response.processor).put(response) //调用监听器 for(onResponse <- responseListeners) onResponse(response.processor) }