in core/src/main/scala/kafka/server/KafkaServer.scala [658:747]
def shutdown(): Unit = {
try {
info("shutting down")
if (isStartingUp.get)
throw new IllegalStateException("Kafka server is still starting up, cannot shut down!")
// To ensure correct behavior under concurrent calls, we need to check `shutdownLatch` first since it gets updated
// last in the `if` block. If the order is reversed, we could shutdown twice or leave `isShuttingDown` set to
// `true` at the end of this method.
if (shutdownLatch.getCount > 0 && isShuttingDown.compareAndSet(false, true)) {
CoreUtils.swallow(controlledShutdown(), this)
brokerState.newState(BrokerShuttingDown)
if (dynamicConfigManager != null)
CoreUtils.swallow(dynamicConfigManager.shutdown(), this)
// Stop socket server to stop accepting any more connections and requests.
// Socket server will be shutdown towards the end of the sequence.
if (socketServer != null)
CoreUtils.swallow(socketServer.stopProcessingRequests(), this)
if (dataPlaneRequestHandlerPool != null)
CoreUtils.swallow(dataPlaneRequestHandlerPool.shutdown(), this)
if (controlPlaneRequestHandlerPool != null)
CoreUtils.swallow(controlPlaneRequestHandlerPool.shutdown(), this)
if (kafkaScheduler != null)
CoreUtils.swallow(kafkaScheduler.shutdown(), this)
if (dataPlaneRequestProcessor != null)
CoreUtils.swallow(dataPlaneRequestProcessor.close(), this)
if (controlPlaneRequestProcessor != null)
CoreUtils.swallow(controlPlaneRequestProcessor.close(), this)
CoreUtils.swallow(authorizer.foreach(_.close()), this)
if (adminManager != null)
CoreUtils.swallow(adminManager.shutdown(), this)
if (transactionCoordinator != null)
CoreUtils.swallow(transactionCoordinator.shutdown(), this)
if (groupCoordinator != null)
CoreUtils.swallow(groupCoordinator.shutdown(), this)
if (tokenManager != null)
CoreUtils.swallow(tokenManager.shutdown(), this)
if (replicaManager != null)
CoreUtils.swallow(replicaManager.shutdown(), this)
if (logManager != null)
CoreUtils.swallow(logManager.shutdown(), this)
if (kafkaController != null)
CoreUtils.swallow(kafkaController.shutdown(), this)
if (featureChangeListener != null)
CoreUtils.swallow(featureChangeListener.close(), this)
if (zkClient != null)
CoreUtils.swallow(zkClient.close(), this)
if (quotaManagers != null)
CoreUtils.swallow(quotaManagers.shutdown(), this)
// Even though socket server is stopped much earlier, controller can generate
// response for controlled shutdown request. Shutdown server at the end to
// avoid any failures (e.g. when metrics are recorded)
if (socketServer != null)
CoreUtils.swallow(socketServer.shutdown(), this)
if (metrics != null)
CoreUtils.swallow(metrics.close(), this)
if (brokerTopicStats != null)
CoreUtils.swallow(brokerTopicStats.close(), this)
// Clear all reconfigurable instances stored in DynamicBrokerConfig
config.dynamicConfig.clear()
brokerState.newState(NotRunning)
startupComplete.set(false)
isShuttingDown.set(false)
CoreUtils.swallow(AppInfoParser.unregisterAppInfo(metricsPrefix, config.brokerId.toString, metrics), this)
shutdownLatch.countDown()
info("shut down completed")
}
}
catch {
case e: Throwable =>
fatal("Fatal error during KafkaServer shutdown.", e)
isShuttingDown.set(false)
throw e
}
}