in core/src/main/scala/kafka/server/KafkaServer.scala [499:652]
private def controlledShutdown(): Unit = {
def node(broker: Broker): Node = broker.node(config.interBrokerListenerName)
val socketTimeoutMs = config.controllerSocketTimeoutMs
def doControlledShutdown(retries: Int): Boolean = {
val metadataUpdater = new ManualMetadataUpdater()
val networkClient = {
val channelBuilder = ChannelBuilders.clientChannelBuilder(
config.interBrokerSecurityProtocol,
JaasContext.Type.SERVER,
config,
config.interBrokerListenerName,
config.saslMechanismInterBrokerProtocol,
time,
config.saslInterBrokerHandshakeRequestEnable,
logContext)
val selector = new Selector(
NetworkReceive.UNLIMITED,
config.connectionsMaxIdleMs,
metrics,
time,
"kafka-server-controlled-shutdown",
Map.empty.asJava,
false,
channelBuilder,
logContext
)
new NetworkClient(
selector,
metadataUpdater,
config.brokerId.toString,
1,
0,
0,
Selectable.USE_DEFAULT_BUFFER_SIZE,
Selectable.USE_DEFAULT_BUFFER_SIZE,
config.requestTimeoutMs,
config.connectionSetupTimeoutMs,
config.connectionSetupTimeoutMaxMs,
ClientDnsLookup.USE_ALL_DNS_IPS,
time,
false,
new ApiVersions,
logContext)
}
var shutdownSucceeded: Boolean = false
try {
var remainingRetries = retries
var prevController: Broker = null
var ioException = false
while (!shutdownSucceeded && remainingRetries > 0) {
remainingRetries = remainingRetries - 1
// 1. Find the controller and establish a connection to it.
// Get the current controller info. This is to ensure we use the most recent info to issue the
// controlled shutdown request.
// If the controller id or the broker registration are missing, we sleep and retry (if there are remaining retries)
zkClient.getControllerId match {
case Some(controllerId) =>
zkClient.getBroker(controllerId) match {
case Some(broker) =>
// if this is the first attempt, if the controller has changed or if an exception was thrown in a previous
// attempt, connect to the most recent controller
if (ioException || broker != prevController) {
ioException = false
if (prevController != null)
networkClient.close(node(prevController).idString)
prevController = broker
metadataUpdater.setNodes(Seq(node(prevController)).asJava)
}
case None =>
info(s"Broker registration for controller $controllerId is not available (i.e. the Controller's ZK session expired)")
}
case None =>
info("No controller registered in ZooKeeper")
}
// 2. issue a controlled shutdown to the controller
if (prevController != null) {
try {
if (!NetworkClientUtils.awaitReady(networkClient, node(prevController), time, socketTimeoutMs))
throw new SocketTimeoutException(s"Failed to connect within $socketTimeoutMs ms")
// send the controlled shutdown request
val controlledShutdownApiVersion: Short =
if (config.interBrokerProtocolVersion < KAFKA_0_9_0) 0
else if (config.interBrokerProtocolVersion < KAFKA_2_2_IV0) 1
else if (config.interBrokerProtocolVersion < KAFKA_2_4_IV1) 2
else 3
val controlledShutdownRequest = new ControlledShutdownRequest.Builder(
new ControlledShutdownRequestData()
.setBrokerId(config.brokerId)
.setBrokerEpoch(kafkaController.brokerEpoch),
controlledShutdownApiVersion)
val request = networkClient.newClientRequest(node(prevController).idString, controlledShutdownRequest,
time.milliseconds(), true)
val clientResponse = NetworkClientUtils.sendAndReceive(networkClient, request, time)
val shutdownResponse = clientResponse.responseBody.asInstanceOf[ControlledShutdownResponse]
if (shutdownResponse.error == Errors.NONE && shutdownResponse.data.remainingPartitions.isEmpty) {
shutdownSucceeded = true
info("Controlled shutdown succeeded")
}
else {
info(s"Remaining partitions to move: ${shutdownResponse.data.remainingPartitions}")
info(s"Error from controller: ${shutdownResponse.error}")
}
}
catch {
case ioe: IOException =>
ioException = true
warn("Error during controlled shutdown, possibly because leader movement took longer than the " +
s"configured controller.socket.timeout.ms and/or request.timeout.ms: ${ioe.getMessage}")
// ignore and try again
}
}
if (!shutdownSucceeded) {
Thread.sleep(config.controlledShutdownRetryBackoffMs)
warn("Retrying controlled shutdown after the previous attempt failed...")
}
}
}
finally
networkClient.close()
shutdownSucceeded
}
if (startupComplete.get() && config.controlledShutdownEnable) {
// We request the controller to do a controlled shutdown. On failure, we backoff for a configured period
// of time and try again for a configured number of retries. If all the attempt fails, we simply force
// the shutdown.
info("Starting controlled shutdown")
brokerState.newState(PendingControlledShutdown)
val shutdownSucceeded = doControlledShutdown(config.controlledShutdownMaxRetries.intValue)
if (!shutdownSucceeded)
warn("Proceeding to do an unclean shutdown as all the controlled shutdown attempts failed")
}
}