in core/src/main/scala/kafka/server/ReplicaManager.scala [1267:1416]
def becomeLeaderOrFollower(correlationId: Int,
leaderAndIsrRequest: LeaderAndIsrRequest,
onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): LeaderAndIsrResponse = {
val startMs = time.milliseconds()
replicaStateChangeLock synchronized {
val controllerId = leaderAndIsrRequest.controllerId
val requestPartitionStates = leaderAndIsrRequest.partitionStates.asScala
stateChangeLogger.info(s"Handling LeaderAndIsr request correlationId $correlationId from controller " +
s"$controllerId for ${requestPartitionStates.size} partitions")
if (stateChangeLogger.isTraceEnabled)
requestPartitionStates.foreach { partitionState =>
stateChangeLogger.trace(s"Received LeaderAndIsr request $partitionState " +
s"correlation id $correlationId from controller $controllerId " +
s"epoch ${leaderAndIsrRequest.controllerEpoch}")
}
val response = {
if (leaderAndIsrRequest.controllerEpoch < controllerEpoch) {
stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller $controllerId with " +
s"correlation id $correlationId since its controller epoch ${leaderAndIsrRequest.controllerEpoch} is old. " +
s"Latest known controller epoch is $controllerEpoch")
leaderAndIsrRequest.getErrorResponse(0, Errors.STALE_CONTROLLER_EPOCH.exception)
} else {
val responseMap = new mutable.HashMap[TopicPartition, Errors]
controllerEpoch = leaderAndIsrRequest.controllerEpoch
val partitionStates = new mutable.HashMap[Partition, LeaderAndIsrPartitionState]()
// First create the partition if it doesn't exist already
requestPartitionStates.foreach { partitionState =>
val topicPartition = new TopicPartition(partitionState.topicName, partitionState.partitionIndex)
val partitionOpt = getPartition(topicPartition) match {
case HostedPartition.Offline =>
stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +
s"controller $controllerId with correlation id $correlationId " +
s"epoch $controllerEpoch for partition $topicPartition as the local replica for the " +
"partition is in an offline log directory")
responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
None
case HostedPartition.Online(partition) =>
Some(partition)
case HostedPartition.None =>
val partition = Partition(topicPartition, time, this)
allPartitions.putIfNotExists(topicPartition, HostedPartition.Online(partition))
Some(partition)
}
// Next check partition's leader epoch
partitionOpt.foreach { partition =>
val currentLeaderEpoch = partition.getLeaderEpoch
val requestLeaderEpoch = partitionState.leaderEpoch
if (requestLeaderEpoch > currentLeaderEpoch) {
// If the leader epoch is valid record the epoch of the controller that made the leadership decision.
// This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path
if (partitionState.replicas.contains(localBrokerId))
partitionStates.put(partition, partitionState)
else {
stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller $controllerId with " +
s"correlation id $correlationId epoch $controllerEpoch for partition $topicPartition as itself is not " +
s"in assigned replica list ${partitionState.replicas.asScala.mkString(",")}")
responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION)
}
} else if (requestLeaderEpoch < currentLeaderEpoch) {
stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +
s"controller $controllerId with correlation id $correlationId " +
s"epoch $controllerEpoch for partition $topicPartition since its associated " +
s"leader epoch $requestLeaderEpoch is smaller than the current " +
s"leader epoch $currentLeaderEpoch")
responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
} else {
stateChangeLogger.info(s"Ignoring LeaderAndIsr request from " +
s"controller $controllerId with correlation id $correlationId " +
s"epoch $controllerEpoch for partition $topicPartition since its associated " +
s"leader epoch $requestLeaderEpoch matches the current leader epoch")
responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
}
}
}
val partitionsToBeLeader = partitionStates.filter { case (_, partitionState) =>
partitionState.leader == localBrokerId
}
val partitionsToBeFollower = partitionStates.filter { case (k, _) => !partitionsToBeLeader.contains(k) }
val highWatermarkCheckpoints = new LazyOffsetCheckpoints(this.highWatermarkCheckpoints)
val partitionsBecomeLeader = if (partitionsToBeLeader.nonEmpty)
makeLeaders(controllerId, controllerEpoch, partitionsToBeLeader, correlationId, responseMap,
highWatermarkCheckpoints)
else
Set.empty[Partition]
val partitionsBecomeFollower = if (partitionsToBeFollower.nonEmpty)
makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap,
highWatermarkCheckpoints)
else
Set.empty[Partition]
/*
* KAFKA-8392
* For topic partitions of which the broker is no longer a leader, delete metrics related to
* those topics. Note that this means the broker stops being either a replica or a leader of
* partitions of said topics
*/
val leaderTopicSet = leaderPartitionsIterator.map(_.topic).toSet
val followerTopicSet = partitionsBecomeFollower.map(_.topic).toSet
followerTopicSet.diff(leaderTopicSet).foreach(brokerTopicStats.removeOldLeaderMetrics)
// remove metrics for brokers which are not followers of a topic
leaderTopicSet.diff(followerTopicSet).foreach(brokerTopicStats.removeOldFollowerMetrics)
leaderAndIsrRequest.partitionStates.forEach { partitionState =>
val topicPartition = new TopicPartition(partitionState.topicName, partitionState.partitionIndex)
/*
* If there is offline log directory, a Partition object may have been created by getOrCreatePartition()
* before getOrCreateReplica() failed to create local replica due to KafkaStorageException.
* In this case ReplicaManager.allPartitions will map this topic-partition to an empty Partition object.
* we need to map this topic-partition to OfflinePartition instead.
*/
if (localLog(topicPartition).isEmpty)
markPartitionOffline(topicPartition)
}
// we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions
// have been completely populated before starting the checkpointing there by avoiding weird race conditions
startHighWatermarkCheckPointThread()
maybeAddLogDirFetchers(partitionStates.keySet, highWatermarkCheckpoints)
replicaFetcherManager.shutdownIdleFetcherThreads()
replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
val responsePartitions = responseMap.iterator.map { case (tp, error) =>
new LeaderAndIsrPartitionError()
.setTopicName(tp.topic)
.setPartitionIndex(tp.partition)
.setErrorCode(error.code)
}.toBuffer
new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
.setErrorCode(Errors.NONE.code)
.setPartitionErrors(responsePartitions.asJava))
}
}
val endMs = time.milliseconds()
val elapsedMs = endMs - startMs
stateChangeLogger.info(s"Finished LeaderAndIsr request in ${elapsedMs}ms correlationId $correlationId from controller " +
s"$controllerId for ${requestPartitionStates.size} partitions")
response
}
}