in core/src/main/scala/kafka/server/BrokerServer.scala [724:838]
override def shutdown(timeout: Duration): Unit = {
if (!maybeChangeStatus(STARTED, SHUTTING_DOWN)) return
try {
val deadline = time.milliseconds() + timeout.toMillis
info("shutting down")
if (config.controlledShutdownEnable) {
if (replicaManager != null)
replicaManager.beginControlledShutdown()
if (lifecycleManager != null) {
lifecycleManager.beginControlledShutdown()
try {
val controlledShutdownTimeoutMs = deadline - time.milliseconds()
lifecycleManager.controlledShutdownFuture.get(controlledShutdownTimeoutMs, TimeUnit.MILLISECONDS)
} catch {
case _: TimeoutException =>
error("Timed out waiting for the controller to approve controlled shutdown")
case e: Throwable =>
error("Got unexpected exception waiting for controlled shutdown future", e)
}
}
}
if (lifecycleManager != null)
lifecycleManager.beginShutdown()
// Stop socket server to stop accepting any more connections and requests.
// Socket server will be shutdown towards the end of the sequence.
if (socketServer != null) {
CoreUtils.swallow(socketServer.stopProcessingRequests(), this)
}
metadataPublishers.forEach(p => sharedServer.loader.removeAndClosePublisher(p).get())
metadataPublishers.clear()
if (dataPlaneRequestHandlerPool != null)
CoreUtils.swallow(dataPlaneRequestHandlerPool.shutdown(), this)
if (dataPlaneRequestProcessor != null)
CoreUtils.swallow(dataPlaneRequestProcessor.close(), this)
authorizerPlugin.foreach(Utils.closeQuietly(_, "authorizer plugin"))
/**
* We must shutdown the scheduler early because otherwise, the scheduler could touch other
* resources that might have been shutdown and cause exceptions.
* For example, if we didn't shutdown the scheduler first, when LogManager was closing
* partitions one by one, the scheduler might concurrently delete old segments due to
* retention. However, the old segments could have been closed by the LogManager, which would
* cause an IOException and subsequently mark logdir as offline. As a result, the broker would
* not flush the remaining partitions or write the clean shutdown marker. Ultimately, the
* broker would have to take hours to recover the log during restart.
*/
if (kafkaScheduler != null)
CoreUtils.swallow(kafkaScheduler.shutdown(), this)
if (transactionCoordinator != null)
CoreUtils.swallow(transactionCoordinator.shutdown(), this)
if (groupConfigManager != null)
CoreUtils.swallow(groupConfigManager.close(), this)
if (groupCoordinator != null)
CoreUtils.swallow(groupCoordinator.shutdown(), this)
if (shareCoordinator != null)
CoreUtils.swallow(shareCoordinator.shutdown(), this)
if (assignmentsManager != null)
CoreUtils.swallow(assignmentsManager.close(), this)
if (replicaManager != null)
CoreUtils.swallow(replicaManager.shutdown(), this)
if (alterPartitionManager != null)
CoreUtils.swallow(alterPartitionManager.shutdown(), this)
if (forwardingManager != null)
CoreUtils.swallow(forwardingManager.close(), this)
if (clientToControllerChannelManager != null)
CoreUtils.swallow(clientToControllerChannelManager.shutdown(), this)
if (logManager != null) {
val brokerEpoch = if (lifecycleManager != null) lifecycleManager.brokerEpoch else -1
CoreUtils.swallow(logManager.shutdown(brokerEpoch), this)
}
// Close remote log manager to give a chance to any of its underlying clients
// (especially in RemoteStorageManager and RemoteLogMetadataManager) to close gracefully.
remoteLogManagerOpt.foreach(Utils.closeQuietly(_, "remote log manager"))
if (quotaManagers != null)
CoreUtils.swallow(quotaManagers.shutdown(), this)
if (socketServer != null)
CoreUtils.swallow(socketServer.shutdown(), this)
Utils.closeQuietly(brokerTopicStats, "broker topic stats")
Utils.closeQuietly(sharePartitionManager, "share partition manager")
if (persister != null)
CoreUtils.swallow(persister.stop(), this)
isShuttingDown.set(false)
if (lifecycleManager != null)
CoreUtils.swallow(lifecycleManager.close(), this)
CoreUtils.swallow(config.dynamicConfig.clear(), this)
Utils.closeQuietly(clientMetricsManager, "client metrics manager")
sharedServer.stopForBroker()
info("shut down completed")
} catch {
case e: Throwable =>
fatal("Fatal error during broker shutdown.", e)
throw e
} finally {
maybeChangeStatus(SHUTTING_DOWN, SHUTDOWN)
}
}