def close()

in core/src/main/scala/kafka/server/ControllerApis.scala [91:161]


  def close(): Unit = aclApis.close()

  override def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
    try {
      val handlerFuture: CompletableFuture[Unit] = request.header.apiKey match {
        case ApiKeys.FETCH => handleFetch(request)
        case ApiKeys.FETCH_SNAPSHOT => handleFetchSnapshot(request)
        case ApiKeys.CREATE_TOPICS => handleCreateTopics(request)
        case ApiKeys.DELETE_TOPICS => handleDeleteTopics(request)
        case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
        case ApiKeys.ALTER_CONFIGS => handleLegacyAlterConfigs(request)
        case ApiKeys.VOTE => handleVote(request)
        case ApiKeys.BEGIN_QUORUM_EPOCH => handleBeginQuorumEpoch(request)
        case ApiKeys.END_QUORUM_EPOCH => handleEndQuorumEpoch(request)
        case ApiKeys.DESCRIBE_QUORUM => handleDescribeQuorum(request)
        case ApiKeys.ALTER_PARTITION => handleAlterPartitionRequest(request)
        case ApiKeys.BROKER_REGISTRATION => handleBrokerRegistration(request)
        case ApiKeys.BROKER_HEARTBEAT => handleBrokerHeartBeatRequest(request)
        case ApiKeys.UNREGISTER_BROKER => handleUnregisterBroker(request)
        case ApiKeys.ALTER_CLIENT_QUOTAS => handleAlterClientQuotas(request)
        case ApiKeys.INCREMENTAL_ALTER_CONFIGS => handleIncrementalAlterConfigs(request)
        case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => handleAlterPartitionReassignments(request)
        case ApiKeys.LIST_PARTITION_REASSIGNMENTS => handleListPartitionReassignments(request)
        case ApiKeys.ALTER_USER_SCRAM_CREDENTIALS => handleAlterUserScramCredentials(request)
        case ApiKeys.CREATE_DELEGATION_TOKEN => handleCreateDelegationTokenRequest(request)
        case ApiKeys.RENEW_DELEGATION_TOKEN => handleRenewDelegationTokenRequest(request)
        case ApiKeys.EXPIRE_DELEGATION_TOKEN => handleExpireDelegationTokenRequest(request)
        case ApiKeys.ENVELOPE => handleEnvelopeRequest(request, requestLocal)
        case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
        case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
        case ApiKeys.ALLOCATE_PRODUCER_IDS => handleAllocateProducerIdsRequest(request)
        case ApiKeys.CREATE_PARTITIONS => handleCreatePartitions(request)
        case ApiKeys.DESCRIBE_CONFIGS => handleDescribeConfigsRequest(request)
        case ApiKeys.DESCRIBE_ACLS => aclApis.handleDescribeAcls(request)
        case ApiKeys.CREATE_ACLS => aclApis.handleCreateAcls(request)
        case ApiKeys.DELETE_ACLS => aclApis.handleDeleteAcls(request)
        case ApiKeys.ELECT_LEADERS => handleElectLeaders(request)
        case ApiKeys.UPDATE_FEATURES => handleUpdateFeatures(request)
        case ApiKeys.DESCRIBE_CLUSTER => handleDescribeCluster(request)
        case ApiKeys.CONTROLLER_REGISTRATION => handleControllerRegistration(request)
        case ApiKeys.ASSIGN_REPLICAS_TO_DIRS => handleAssignReplicasToDirs(request)
        case ApiKeys.ADD_RAFT_VOTER => handleAddRaftVoter(request)
        case ApiKeys.REMOVE_RAFT_VOTER => handleRemoveRaftVoter(request)
        case ApiKeys.UPDATE_RAFT_VOTER => handleUpdateRaftVoter(request)
        case _ => throw new ApiException(s"Unsupported ApiKey ${request.context.header.apiKey}")
      }

      // This catches exceptions in the future and subsequent completion stages returned by the request handlers.
      handlerFuture.whenComplete { (_, exception) =>
        if (exception != null) {
          // CompletionException does not include the stack frames in its "cause" exception, so we need to
          // log the original exception here
          error(s"Unexpected error handling request ${request.requestDesc(true)} " +
            s"with context ${request.context}", exception)
          requestHelper.handleError(request, exception)
        }
      }
    } catch {
      case e: FatalExitError => throw e
      case t: Throwable =>
        // This catches exceptions in the blocking parts of the request handlers
        error(s"Unexpected error handling request ${request.requestDesc(true)} " +
          s"with context ${request.context}", t)
        requestHelper.handleError(request, t)
    } finally {
      // Only record local completion time if it is unset.
      if (request.apiLocalCompleteTimeNanos < 0) {
        request.apiLocalCompleteTimeNanos = time.nanoseconds
      }
    }
  }