in core/src/main/scala/kafka/server/ReplicaManager.scala [2040:2223]
def getLogConfig(topicPartition: TopicPartition): Option[LogConfig] = localLog(topicPartition).map(_.config)
def becomeLeaderOrFollower(correlationId: Int,
leaderAndIsrRequest: LeaderAndIsrRequest,
onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): LeaderAndIsrResponse = {
val startMs = time.milliseconds()
replicaStateChangeLock synchronized {
val controllerId = leaderAndIsrRequest.controllerId
val requestPartitionStates = leaderAndIsrRequest.partitionStates.asScala
stateChangeLogger.info(s"Handling LeaderAndIsr request correlationId $correlationId from controller " +
s"$controllerId for ${requestPartitionStates.size} partitions")
if (stateChangeLogger.isTraceEnabled)
requestPartitionStates.foreach { partitionState =>
stateChangeLogger.trace(s"Received LeaderAndIsr request $partitionState " +
s"correlation id $correlationId from controller $controllerId " +
s"epoch ${leaderAndIsrRequest.controllerEpoch}")
}
val topicIds = leaderAndIsrRequest.topicIds()
def topicIdFromRequest(topicName: String): Option[Uuid] = {
val topicId = topicIds.get(topicName)
// if invalid topic ID return None
if (topicId == null || topicId == Uuid.ZERO_UUID)
None
else
Some(topicId)
}
val response = {
if (leaderAndIsrRequest.controllerEpoch < controllerEpoch) {
stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller $controllerId with " +
s"correlation id $correlationId since its controller epoch ${leaderAndIsrRequest.controllerEpoch} is old. " +
s"Latest known controller epoch is $controllerEpoch")
leaderAndIsrRequest.getErrorResponse(Errors.STALE_CONTROLLER_EPOCH.exception)
} else {
val responseMap = new mutable.HashMap[TopicPartition, Errors]
controllerEpoch = leaderAndIsrRequest.controllerEpoch
val partitions = new mutable.HashSet[Partition]()
val partitionsToBeLeader = new mutable.HashMap[Partition, LeaderAndIsrRequest.PartitionState]()
val partitionsToBeFollower = new mutable.HashMap[Partition, LeaderAndIsrRequest.PartitionState]()
val topicIdUpdateFollowerPartitions = new mutable.HashSet[Partition]()
val allTopicPartitionsInRequest = new mutable.HashSet[TopicPartition]()
// First create the partition if it doesn't exist already
requestPartitionStates.foreach { partitionState =>
val topicPartition = new TopicPartition(partitionState.topicName, partitionState.partitionIndex)
allTopicPartitionsInRequest += topicPartition
val partitionOpt = getPartition(topicPartition) match {
case HostedPartition.Offline(_) =>
stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +
s"controller $controllerId with correlation id $correlationId " +
s"epoch $controllerEpoch for partition $topicPartition as the local replica for the " +
"partition is in an offline log directory")
responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
None
case HostedPartition.Online(partition) =>
Some(partition)
case HostedPartition.None =>
val partition = Partition(topicPartition, time, this)
allPartitions.putIfAbsent(topicPartition, HostedPartition.Online(partition))
Some(partition)
}
// Next check the topic ID and the partition's leader epoch
partitionOpt.foreach { partition =>
val currentLeaderEpoch = partition.getLeaderEpoch
val requestLeaderEpoch = partitionState.leaderEpoch
val requestTopicId = topicIdFromRequest(topicPartition.topic)
val logTopicId = partition.topicId
if (!hasConsistentTopicId(requestTopicId, logTopicId)) {
stateChangeLogger.error(s"Topic ID in memory: ${logTopicId.get} does not" +
s" match the topic ID for partition $topicPartition received: " +
s"${requestTopicId.get}.")
responseMap.put(topicPartition, Errors.INCONSISTENT_TOPIC_ID)
} else if (requestLeaderEpoch >= currentLeaderEpoch) {
// If the leader epoch is valid record the epoch of the controller that made the leadership decision.
// This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path
if (partitionState.replicas.contains(localBrokerId)) {
partitions += partition
if (partitionState.leader == localBrokerId) {
partitionsToBeLeader.put(partition, partitionState)
} else {
partitionsToBeFollower.put(partition, partitionState)
}
} else {
stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller $controllerId with " +
s"correlation id $correlationId epoch $controllerEpoch for partition $topicPartition as itself is not " +
s"in assigned replica list ${partitionState.replicas.asScala.mkString(",")}")
responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION)
}
} else if (requestLeaderEpoch < currentLeaderEpoch) {
stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +
s"controller $controllerId with correlation id $correlationId " +
s"epoch $controllerEpoch for partition $topicPartition since its associated " +
s"leader epoch $requestLeaderEpoch is smaller than the current " +
s"leader epoch $currentLeaderEpoch")
responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
} else {
val error = requestTopicId match {
case Some(topicId) if logTopicId.isEmpty =>
// The controller may send LeaderAndIsr to upgrade to using topic IDs without bumping the epoch.
// If we have a matching epoch, we expect the log to be defined.
val log = localLogOrException(partition.topicPartition)
log.assignTopicId(topicId)
stateChangeLogger.info(s"Updating log for $topicPartition to assign topic ID " +
s"$topicId from LeaderAndIsr request from controller $controllerId with correlation " +
s"id $correlationId epoch $controllerEpoch")
if (partitionState.leader != localBrokerId)
topicIdUpdateFollowerPartitions.add(partition)
Errors.NONE
case None if logTopicId.isDefined && partitionState.leader != localBrokerId =>
// If we have a topic ID in the log but not in the request, we must have previously had topic IDs but
// are now downgrading. If we are a follower, remove the topic ID from the PartitionFetchState.
stateChangeLogger.info(s"Updating PartitionFetchState for $topicPartition to remove log topic ID " +
s"${logTopicId.get} since LeaderAndIsr request from controller $controllerId with correlation " +
s"id $correlationId epoch $controllerEpoch did not contain a topic ID")
topicIdUpdateFollowerPartitions.add(partition)
Errors.NONE
case _ =>
stateChangeLogger.info(s"Ignoring LeaderAndIsr request from " +
s"controller $controllerId with correlation id $correlationId " +
s"epoch $controllerEpoch for partition $topicPartition since its associated " +
s"leader epoch $requestLeaderEpoch matches the current leader epoch")
Errors.STALE_CONTROLLER_EPOCH
}
responseMap.put(topicPartition, error)
}
}
}
val highWatermarkCheckpoints = new LazyOffsetCheckpoints(this.highWatermarkCheckpoints.asJava)
val partitionsBecomeLeader = if (partitionsToBeLeader.nonEmpty)
makeLeaders(controllerId, controllerEpoch, partitionsToBeLeader, correlationId, responseMap,
highWatermarkCheckpoints, topicIdFromRequest)
else
Set.empty[Partition]
val partitionsBecomeFollower = if (partitionsToBeFollower.nonEmpty)
makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap,
highWatermarkCheckpoints, topicIdFromRequest)
else
Set.empty[Partition]
val followerTopicSet = partitionsBecomeFollower.map(_.topic).toSet
updateLeaderAndFollowerMetrics(followerTopicSet)
if (topicIdUpdateFollowerPartitions.nonEmpty)
updateTopicIdForFollowers(controllerId, controllerEpoch, topicIdUpdateFollowerPartitions, correlationId, topicIdFromRequest)
// We initialize highwatermark thread after the first LeaderAndIsr request. This ensures that all the partitions
// have been completely populated before starting the checkpointing there by avoiding weird race conditions
startHighWatermarkCheckPointThread()
maybeAddLogDirFetchers(partitions, highWatermarkCheckpoints, topicIdFromRequest)
replicaFetcherManager.shutdownIdleFetcherThreads()
replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
remoteLogManager.foreach(rlm => rlm.onLeadershipChange((partitionsBecomeLeader.toSet: Set[TopicPartitionLog]).asJava, (partitionsBecomeFollower.toSet: Set[TopicPartitionLog]).asJava, topicIds))
onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
val topics = new util.LinkedHashMap[Uuid, util.List[LeaderAndIsrResponse.PartitionError]]
responseMap.foreachEntry { (tp, error) =>
val topicId = topicIds.get(tp.topic)
var partitionErrors = topics.get(topicId)
if (partitionErrors == null) {
partitionErrors = new util.ArrayList[LeaderAndIsrResponse.PartitionError]()
topics.put(topicId, partitionErrors)
}
partitionErrors.add(new LeaderAndIsrResponse.PartitionError(tp.partition(), error.code))
}
new LeaderAndIsrResponse(Errors.NONE, topics)
}
}
val endMs = time.milliseconds()
val elapsedMs = endMs - startMs
stateChangeLogger.info(s"Finished LeaderAndIsr request in ${elapsedMs}ms correlationId $correlationId from controller " +
s"$controllerId for ${requestPartitionStates.size} partitions")
response
}
}