ControllerChannelManager分析

xiaoxiao2021-02-28  20

ControllerChannelManager是用来管理KafkaController Leader和集群中其他broker之间的网络交互,其中ControllerBrokerStateInfo表示与一个broker连接的各种信息,

ControllerBrokerStateInfo的一些字段:

networkClient:NetworkClient 负责维护controller 和 对应broker之间的网络连接

brokerNode: 维护对应的broker的网络位置信息,其中保存了host、ip、port和机架信息

requestSendThread:RequestSendThread负责发送请求的线程

 messageQueue: BlockingQueue[QueueItem] 缓冲队列,缓冲发往borker的请求

 

一 RequestSendThread

RequestSendThread:送请求的线程,它继承了ShutdownableThread,会循环执行doWork方法

override def doWork(): Unit = {   def backoff(): Unit = CoreUtils.swallowTrace(Thread.sleep(100))   // 获取请求缓冲队列的QueueItem   val QueueItem(apiKey, apiVersion, request, callback) = queue.take()   import NetworkClientBlockingOps._   var clientResponse: ClientResponse = null   try {     lock synchronized {       var isSendSuccessful = false       while (isRunning.get() && !isSendSuccessful) {         // broker宕机,会触发zookeeper的监听器,调用removeBroker方法将当前线程停止,在stop之前会尝试重试         try {           if (!brokerReady()) {// 阻塞等待满足发送条件,如果没有做准备好             isSendSuccessful= false             backoff() // 等待一段时间           } // 已经准备好了           else {             val requestHeader = apiVersion.fold(networkClient.nextRequestHeader(apiKey))(networkClient.nextRequestHeader(apiKey, _))             val send = new RequestSend(brokerNode.idString, requestHeader, request.toStruct)             // 创建ClientRequest对象             val clientRequest = new ClientRequest(time.milliseconds(), true, send, null)             // 发送请求并阻塞等待响应             clientResponse= networkClient.blockingSendAndReceive(clientRequest)(time)             isSendSuccessful= true           }         } catch {           case e: Throwable => // if the sendwas not successful, reconnect to broker and resend the message             warn(("Controller %d epoch %d fails to send request %s to broker %s." +               "Reconnectingto broker.").format(controllerId, controllerContext.epoch,                 request.toString, brokerNode.toString()), e)             networkClient.close(brokerNode.idString)             isSendSuccessful= false             backoff()         }       }       if (clientResponse != null) {// 如果响应不为空         // 检查请求类型,controller只能发送LeaderAndIsrRequest,StopReplicaRequestUpdateMetadataCacheRequest3种请求         val response = ApiKeys.forId(clientResponse.request.request.header.apiKey) match {           case ApiKeys.LEADER_AND_ISR => new LeaderAndIsrResponse(clientResponse.responseBody)           case ApiKeys.STOP_REPLICA => new StopReplicaResponse(clientResponse.responseBody)           case ApiKeys.UPDATE_METADATA_KEY => new UpdateMetadataResponse(clientResponse.responseBody)           case apiKey => throw new KafkaException(s"Unexpected apiKey received: $apiKey")         }         stateChangeLogger.trace("Controller %d epoch %d received response %s for a request sent tobroker %s"           .format(controllerId, controllerContext.epoch, response.toString, brokerNode.toString))         if (callback != null) {           callback(response)         }       }     }   } catch {     case e: Throwable =>       error("Controller %d fails to send a request to broker %s".format(controllerId, brokerNode.toString()), e)       // If there isany socket error (eg, socket timeout), the connection is no longer usable and needsto be recreated.       networkClient.close(brokerNode.idString)   } }

以上方法主要干以下几件事:

# 从请求队列获取请求元素QueueItem

# 判断broker是否准备好,如果没有准备好阻塞等待

# 如果准备好了创建ClientRequest

# 发送请求并阻塞等待响应

# 对返回的响应进行类型检测,因为KafkaController只能接受三类请求:LeaderAndIsrRequest,StopReplicaRequest,UpdateMetadataCache

Request

 

二 ControllerChannelManager

2.1 addNewBroker 构建队列,创建brokerNode,创建NetworkClient,

创建请求发送线程,把broker连接相关的信息封装到BrokerStateInfo

private def addNewBroker(broker: Broker) {   // 创建请求缓冲队列   val messageQueue = new LinkedBlockingQueue[QueueItem]   debug("Controller %d trying to connect to broker %d".format(config.brokerId, broker.id))   val brokerEndPoint = broker.getBrokerEndPoint(config.interBrokerSecurityProtocol)   // 创建broker节点   val brokerNode = new Node(broker.id, brokerEndPoint.host, brokerEndPoint.port)   // 创建NetworkClient   val networkClient = {     val channelBuilder = ChannelBuilders.create(       config.interBrokerSecurityProtocol,       Mode.CLIENT,       LoginType.SERVER,       config.values,       config.saslMechanismInterBrokerProtocol,       config.saslInterBrokerHandshakeRequestEnable     )     val selector = new Selector(       NetworkReceive.UNLIMITED,       Selector.NO_IDLE_TIMEOUT_MS,       metrics,       time,       "controller-channel",       Map("broker-id" -> broker.id.toString).asJava,       false,       channelBuilder     )     new NetworkClient(       selector,       new ManualMetadataUpdater(Seq(brokerNode).asJava),       config.brokerId.toString,       1,       0,       Selectable.USE_DEFAULT_BUFFER_SIZE,       Selectable.USE_DEFAULT_BUFFER_SIZE,       config.requestTimeoutMs,       time     )   }   val threadName = threadNamePrefix match {     case None => "Controller-%d-to-broker-%d-send-thread".format(config.brokerId, broker.id)     case Some(name) => "%s:Controller-%d-to-broker-%d-send-thread".format(name, config.brokerId, broker.id)   }   // 构建RequestSendThread,用于发送请求   val requestThread = new RequestSendThread(config.brokerId, controllerContext, messageQueue, networkClient,     brokerNode, config, time, threadName)   requestThread.setDaemon(false)// 设置为后台进程   // broker连接相关的信息封装到BrokerStateInfo   brokerStateInfo.put(broker.id, new ControllerBrokerStateInfo(networkClient, brokerNode, messageQueue, requestThread)) }

 

2.2 sendRequest 往队列添加QueueItem

// 往请求队列里添加QueueItem def sendRequest(brokerId: Int, apiKey: ApiKeys, apiVersion: Option[Short], request: AbstractRequest, callback: AbstractRequestResponse => Unit = null) {   brokerLock synchronized {     val stateInfoOpt = brokerStateInfo.get(brokerId)     stateInfoOpt match {       case Some(stateInfo) =>         stateInfo.messageQueue.put(QueueItem(apiKey, apiVersion, request, callback))       case None =>         warn("Not sending request %s to broker %d, since it is offline.".format(request, brokerId))     }   } }

 

2.3 addBroker添加broker,启动线程

def addBroker(broker: Broker) {   // be careful here. Maybe the startup() API has already started the request send thread   brokerLock synchronized {     if(!brokerStateInfo.contains(broker.id)) {       addNewBroker(broker)       startRequestSendThread(broker.id)     }   } }

 

2.4 removeBroker 删除broker,关闭连接,清理请求队列,关闭线程,把broker相关信息从brokerStateInfo里移除

def removeBroker(brokerId: Int) {   brokerLock synchronized {     removeExistingBroker(brokerStateInfo(brokerId))   } } private def removeExistingBroker(brokerState: ControllerBrokerStateInfo) {   try {     brokerState.networkClient.close()     brokerState.messageQueue.clear()     brokerState.requestSendThread.shutdown()     brokerStateInfo.remove(brokerState.brokerNode.id)   } catch {     case e: Throwable => error("Error while removing broker by the controller", e)   } }

 

 

转载请注明原文地址: https://www.6miu.com/read-1000268.html

最新回复(0)