in core/src/main/scala/kafka/server/ControllerApis.scala [91:161]
def close(): Unit = aclApis.close()
override def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
try {
val handlerFuture: CompletableFuture[Unit] = request.header.apiKey match {
case ApiKeys.FETCH => handleFetch(request)
case ApiKeys.FETCH_SNAPSHOT => handleFetchSnapshot(request)
case ApiKeys.CREATE_TOPICS => handleCreateTopics(request)
case ApiKeys.DELETE_TOPICS => handleDeleteTopics(request)
case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
case ApiKeys.ALTER_CONFIGS => handleLegacyAlterConfigs(request)
case ApiKeys.VOTE => handleVote(request)
case ApiKeys.BEGIN_QUORUM_EPOCH => handleBeginQuorumEpoch(request)
case ApiKeys.END_QUORUM_EPOCH => handleEndQuorumEpoch(request)
case ApiKeys.DESCRIBE_QUORUM => handleDescribeQuorum(request)
case ApiKeys.ALTER_PARTITION => handleAlterPartitionRequest(request)
case ApiKeys.BROKER_REGISTRATION => handleBrokerRegistration(request)
case ApiKeys.BROKER_HEARTBEAT => handleBrokerHeartBeatRequest(request)
case ApiKeys.UNREGISTER_BROKER => handleUnregisterBroker(request)
case ApiKeys.ALTER_CLIENT_QUOTAS => handleAlterClientQuotas(request)
case ApiKeys.INCREMENTAL_ALTER_CONFIGS => handleIncrementalAlterConfigs(request)
case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => handleAlterPartitionReassignments(request)
case ApiKeys.LIST_PARTITION_REASSIGNMENTS => handleListPartitionReassignments(request)
case ApiKeys.ALTER_USER_SCRAM_CREDENTIALS => handleAlterUserScramCredentials(request)
case ApiKeys.CREATE_DELEGATION_TOKEN => handleCreateDelegationTokenRequest(request)
case ApiKeys.RENEW_DELEGATION_TOKEN => handleRenewDelegationTokenRequest(request)
case ApiKeys.EXPIRE_DELEGATION_TOKEN => handleExpireDelegationTokenRequest(request)
case ApiKeys.ENVELOPE => handleEnvelopeRequest(request, requestLocal)
case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
case ApiKeys.ALLOCATE_PRODUCER_IDS => handleAllocateProducerIdsRequest(request)
case ApiKeys.CREATE_PARTITIONS => handleCreatePartitions(request)
case ApiKeys.DESCRIBE_CONFIGS => handleDescribeConfigsRequest(request)
case ApiKeys.DESCRIBE_ACLS => aclApis.handleDescribeAcls(request)
case ApiKeys.CREATE_ACLS => aclApis.handleCreateAcls(request)
case ApiKeys.DELETE_ACLS => aclApis.handleDeleteAcls(request)
case ApiKeys.ELECT_LEADERS => handleElectLeaders(request)
case ApiKeys.UPDATE_FEATURES => handleUpdateFeatures(request)
case ApiKeys.DESCRIBE_CLUSTER => handleDescribeCluster(request)
case ApiKeys.CONTROLLER_REGISTRATION => handleControllerRegistration(request)
case ApiKeys.ASSIGN_REPLICAS_TO_DIRS => handleAssignReplicasToDirs(request)
case ApiKeys.ADD_RAFT_VOTER => handleAddRaftVoter(request)
case ApiKeys.REMOVE_RAFT_VOTER => handleRemoveRaftVoter(request)
case ApiKeys.UPDATE_RAFT_VOTER => handleUpdateRaftVoter(request)
case _ => throw new ApiException(s"Unsupported ApiKey ${request.context.header.apiKey}")
}
// This catches exceptions in the future and subsequent completion stages returned by the request handlers.
handlerFuture.whenComplete { (_, exception) =>
if (exception != null) {
// CompletionException does not include the stack frames in its "cause" exception, so we need to
// log the original exception here
error(s"Unexpected error handling request ${request.requestDesc(true)} " +
s"with context ${request.context}", exception)
requestHelper.handleError(request, exception)
}
}
} catch {
case e: FatalExitError => throw e
case t: Throwable =>
// This catches exceptions in the blocking parts of the request handlers
error(s"Unexpected error handling request ${request.requestDesc(true)} " +
s"with context ${request.context}", t)
requestHelper.handleError(request, t)
} finally {
// Only record local completion time if it is unset.
if (request.apiLocalCompleteTimeNanos < 0) {
request.apiLocalCompleteTimeNanos = time.nanoseconds
}
}
}