ControllerChannelManager是用来管理KafkaController Leader和集群中其他broker之间的网络交互,其中ControllerBrokerStateInfo表示与一个broker连接的各种信息,
ControllerBrokerStateInfo的一些字段:
networkClient:NetworkClient 负责维护controller 和 对应broker之间的网络连接
brokerNode: 维护对应的broker的网络位置信息,其中保存了host、ip、port和机架信息
requestSendThread:RequestSendThread负责发送请求的线程
messageQueue: BlockingQueue[QueueItem] 缓冲队列,缓冲发往borker的请求
一 RequestSendThread
RequestSendThread:送请求的线程,它继承了ShutdownableThread,会循环执行doWork方法
override def doWork(): Unit = { def backoff(): Unit = CoreUtils.swallowTrace(Thread.sleep(100)) // 获取请求缓冲队列的QueueItem val QueueItem(apiKey, apiVersion, request, callback) = queue.take() import NetworkClientBlockingOps._ var clientResponse: ClientResponse = null try { lock synchronized { var isSendSuccessful = false while (isRunning.get() && !isSendSuccessful) { // 当broker宕机,会触发zookeeper的监听器,调用removeBroker方法将当前线程停止,在stop之前会尝试重试 try { if (!brokerReady()) {// 阻塞等待满足发送条件,如果没有做准备好 isSendSuccessful= false backoff() // 等待一段时间 } // 已经准备好了 else { val requestHeader = apiVersion.fold(networkClient.nextRequestHeader(apiKey))(networkClient.nextRequestHeader(apiKey, _)) val send = new RequestSend(brokerNode.idString, requestHeader, request.toStruct) // 创建ClientRequest对象 val clientRequest = new ClientRequest(time.milliseconds(), true, send, null) // 发送请求并阻塞等待响应 clientResponse= networkClient.blockingSendAndReceive(clientRequest)(time) isSendSuccessful= true } } catch { case e: Throwable => // if the sendwas not successful, reconnect to broker and resend the message warn(("Controller %d epoch %d fails to send request %s to broker %s." + "Reconnectingto broker.").format(controllerId, controllerContext.epoch, request.toString, brokerNode.toString()), e) networkClient.close(brokerNode.idString) isSendSuccessful= false backoff() } } if (clientResponse != null) {// 如果响应不为空 // 检查请求类型,controller只能发送LeaderAndIsrRequest,StopReplicaRequest,UpdateMetadataCacheRequest3种请求 val response = ApiKeys.forId(clientResponse.request.request.header.apiKey) match { case ApiKeys.LEADER_AND_ISR => new LeaderAndIsrResponse(clientResponse.responseBody) case ApiKeys.STOP_REPLICA => new StopReplicaResponse(clientResponse.responseBody) case ApiKeys.UPDATE_METADATA_KEY => new UpdateMetadataResponse(clientResponse.responseBody) case apiKey => throw new KafkaException(s"Unexpected apiKey received: $apiKey") } stateChangeLogger.trace("Controller %d epoch %d received response %s for a request sent tobroker %s" .format(controllerId, controllerContext.epoch, response.toString, brokerNode.toString)) if (callback != null) { callback(response) } } } } catch { case e: Throwable => error("Controller %d fails to send a request to broker %s".format(controllerId, brokerNode.toString()), e) // If there isany socket error (eg, socket timeout), the connection is no longer usable and needsto be recreated. networkClient.close(brokerNode.idString) } }
以上方法主要干以下几件事:
# 从请求队列获取请求元素QueueItem
# 判断broker是否准备好,如果没有准备好阻塞等待
# 如果准备好了创建ClientRequest
# 发送请求并阻塞等待响应
# 对返回的响应进行类型检测,因为KafkaController只能接受三类请求:LeaderAndIsrRequest,StopReplicaRequest,UpdateMetadataCache
Request
二 ControllerChannelManager
2.1 addNewBroker 构建队列,创建brokerNode,创建NetworkClient,
创建请求发送线程,把broker连接相关的信息封装到BrokerStateInfo
private def addNewBroker(broker: Broker) { // 创建请求缓冲队列 val messageQueue = new LinkedBlockingQueue[QueueItem] debug("Controller %d trying to connect to broker %d".format(config.brokerId, broker.id)) val brokerEndPoint = broker.getBrokerEndPoint(config.interBrokerSecurityProtocol) // 创建broker节点 val brokerNode = new Node(broker.id, brokerEndPoint.host, brokerEndPoint.port) // 创建NetworkClient val networkClient = { val channelBuilder = ChannelBuilders.create( config.interBrokerSecurityProtocol, Mode.CLIENT, LoginType.SERVER, config.values, config.saslMechanismInterBrokerProtocol, config.saslInterBrokerHandshakeRequestEnable ) val selector = new Selector( NetworkReceive.UNLIMITED, Selector.NO_IDLE_TIMEOUT_MS, metrics, time, "controller-channel", Map("broker-id" -> broker.id.toString).asJava, false, channelBuilder ) new NetworkClient( selector, new ManualMetadataUpdater(Seq(brokerNode).asJava), config.brokerId.toString, 1, 0, Selectable.USE_DEFAULT_BUFFER_SIZE, Selectable.USE_DEFAULT_BUFFER_SIZE, config.requestTimeoutMs, time ) } val threadName = threadNamePrefix match { case None => "Controller-%d-to-broker-%d-send-thread".format(config.brokerId, broker.id) case Some(name) => "%s:Controller-%d-to-broker-%d-send-thread".format(name, config.brokerId, broker.id) } // 构建RequestSendThread,用于发送请求 val requestThread = new RequestSendThread(config.brokerId, controllerContext, messageQueue, networkClient, brokerNode, config, time, threadName) requestThread.setDaemon(false)// 设置为后台进程 // 把broker连接相关的信息封装到BrokerStateInfo brokerStateInfo.put(broker.id, new ControllerBrokerStateInfo(networkClient, brokerNode, messageQueue, requestThread)) }
2.2 sendRequest 往队列添加QueueItem
// 往请求队列里添加QueueItem def sendRequest(brokerId: Int, apiKey: ApiKeys, apiVersion: Option[Short], request: AbstractRequest, callback: AbstractRequestResponse => Unit = null) { brokerLock synchronized { val stateInfoOpt = brokerStateInfo.get(brokerId) stateInfoOpt match { case Some(stateInfo) => stateInfo.messageQueue.put(QueueItem(apiKey, apiVersion, request, callback)) case None => warn("Not sending request %s to broker %d, since it is offline.".format(request, brokerId)) } } }
2.3 addBroker添加broker,启动线程
def addBroker(broker: Broker) { // be careful here. Maybe the startup() API has already started the request send thread brokerLock synchronized { if(!brokerStateInfo.contains(broker.id)) { addNewBroker(broker) startRequestSendThread(broker.id) } } }
2.4 removeBroker 删除broker,关闭连接,清理请求队列,关闭线程,把broker相关信息从brokerStateInfo里移除
def removeBroker(brokerId: Int) { brokerLock synchronized { removeExistingBroker(brokerStateInfo(brokerId)) } } private def removeExistingBroker(brokerState: ControllerBrokerStateInfo) { try { brokerState.networkClient.close() brokerState.messageQueue.clear() brokerState.requestSendThread.shutdown() brokerStateInfo.remove(brokerState.brokerNode.id) } catch { case e: Throwable => error("Error while removing broker by the controller", e) } }