Kafka源码之服务端分析之API层

xiaoxiao2025-04-30  14

Handler线程属于Kafka的API层,Handler线程对请求的处理通过调用KafkaApis中的方法实现。 KafkaRequestHandler的主要职责是从RequestChannel获取请求并调用KafkaApis的handler方法处理请求:

def run() { while(true) { try { var req : RequestChannel.Request = null while (req == null) { //从队列中获取请求 val startSelectTime = SystemTime.nanoseconds req = requestChannel.receiveRequest(300) val idleTime = SystemTime.nanoseconds - startSelectTime //统计监控指标 aggregateIdleMeter.mark(idleTime / totalHandlerThreads) } //读取到AllDone请求,线程结束 if(req eq RequestChannel.AllDone) { debug("Kafka request handler %d on broker %d received shut down command".format( id, brokerId)) return } req.requestDequeueTimeMs = SystemTime.milliseconds //KafkaApis类中实现了处理请求的逻辑,KafkaApis还负责将响应写回对应的RequestChannel.responseQueue中,唤醒Processor处理 apis.handle(req) } catch { case e: Throwable => error("Exception when handling request", e) } } }

API层使用KafkaRequestHandlerPool来管理所有的KafkaRequestHandler线程:

class KafkaRequestHandlerPool(val brokerId: Int, val requestChannel: RequestChannel, val apis: KafkaApis, numThreads: Int) extends Logging with KafkaMetricsGroup { /* a meter to track the average free capacity of the request handlers */ private val aggregateIdleMeter = newMeter("RequestHandlerAvgIdlePercent", "percent", TimeUnit.NANOSECONDS) this.logIdent = "[Kafka Request Handler on Broker " + brokerId + "], " //用来保存执行handler的线程 val threads = new Array[Thread](numThreads) val runnables = new Array[KafkaRequestHandler](numThreads) //保存创建的handler及对应的线程 for(i <- 0 until numThreads) { runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis) threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i)) threads(i).start() } //停止所有handler线程 def shutdown() { info("shutting down") for(handler <- runnables) handler.shutdown //阻塞等待所有线程释放 for(thread <- threads) thread.join info("shut down completely") } }

KafkaApis是Kafka服务器处理请求的入口类。他负责将KafkaRequestHandler传递过来的请求分发到不同的handle上处理,分发依据是RequestChannel.Request中的requestId,此字段保存了请求的ApiKeys的值:

def handle(request: RequestChannel.Request) { try { trace("Handling request:%s from connection %s;securityProtocol:%s,principal:%s". format(request.requestDesc(true), request.connectionId, request.securityProtocol, request.session.principal)) ApiKeys.forId(request.requestId) match { case ApiKeys.PRODUCE => handleProducerRequest(request) case ApiKeys.FETCH => handleFetchRequest(request) case ApiKeys.LIST_OFFSETS => handleOffsetRequest(request) case ApiKeys.METADATA => handleTopicMetadataRequest(request) case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request) case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request) case ApiKeys.UPDATE_METADATA_KEY => handleUpdateMetadataRequest(request) case ApiKeys.CONTROLLED_SHUTDOWN_KEY => handleControlledShutdownRequest(request) case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request) case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request) case ApiKeys.GROUP_COORDINATOR => handleGroupCoordinatorRequest(request) case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request) case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request) case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request) case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request) case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request) case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request) case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request) case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request) case requestId => throw new KafkaException("Unknown api code " + requestId) } } catch { case e: Throwable => if (request.requestObj != null) { request.requestObj.handleError(e, requestChannel, request) error("Error when handling request %s".format(request.requestObj), e) } else { val response = request.body.getErrorResponse(request.header.apiVersion, e) val respHeader = new ResponseHeader(request.header.correlationId) /* If request doesn't have a default error response, we just close the connection. For example, when produce request has acks set to 0 */ if (response == null) requestChannel.closeConnection(request.processor, request) else requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, respHeader, response))) error("Error when handling request %s".format(request.body), e) } } finally request.apiLocalCompleteTimeMs = SystemTime.milliseconds }
转载请注明原文地址: https://www.6miu.com/read-5029518.html

最新回复(0)