in core/src/main/scala/kafka/server/ReplicaManager.scala [343:469]
def stopReplicas(correlationId: Int,
controllerId: Int,
controllerEpoch: Int,
brokerEpoch: Long,
partitionStates: Map[TopicPartition, StopReplicaPartitionState]
): (mutable.Map[TopicPartition, Errors], Errors) = {
replicaStateChangeLock synchronized {
stateChangeLogger.info(s"Handling StopReplica request correlationId $correlationId from controller " +
s"$controllerId for ${partitionStates.size} partitions")
if (stateChangeLogger.isTraceEnabled)
partitionStates.foreach { case (topicPartition, partitionState) =>
stateChangeLogger.trace(s"Received StopReplica request $partitionState " +
s"correlation id $correlationId from controller $controllerId " +
s"epoch $controllerEpoch for partition $topicPartition")
}
val responseMap = new collection.mutable.HashMap[TopicPartition, Errors]
if (controllerEpoch < this.controllerEpoch) {
stateChangeLogger.warn(s"Ignoring StopReplica request from " +
s"controller $controllerId with correlation id $correlationId " +
s"since its controller epoch $controllerEpoch is old. " +
s"Latest known controller epoch is ${this.controllerEpoch}")
(responseMap, Errors.STALE_CONTROLLER_EPOCH)
} else {
this.controllerEpoch = controllerEpoch
val stoppedPartitions = mutable.Map.empty[TopicPartition, StopReplicaPartitionState]
partitionStates.foreach { case (topicPartition, partitionState) =>
val deletePartition = partitionState.deletePartition
getPartition(topicPartition) match {
case HostedPartition.Offline =>
stateChangeLogger.warn(s"Ignoring StopReplica request (delete=$deletePartition) from " +
s"controller $controllerId with correlation id $correlationId " +
s"epoch $controllerEpoch for partition $topicPartition as the local replica for the " +
"partition is in an offline log directory")
responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
case HostedPartition.Online(partition) =>
val currentLeaderEpoch = partition.getLeaderEpoch
val requestLeaderEpoch = partitionState.leaderEpoch
// When a topic is deleted, the leader epoch is not incremented. To circumvent this,
// a sentinel value (EpochDuringDelete) overwriting any previous epoch is used.
// When an older version of the StopReplica request which does not contain the leader
// epoch, a sentinel value (NoEpoch) is used and bypass the epoch validation.
if (requestLeaderEpoch == LeaderAndIsr.EpochDuringDelete ||
requestLeaderEpoch == LeaderAndIsr.NoEpoch ||
requestLeaderEpoch > currentLeaderEpoch) {
stoppedPartitions += topicPartition -> partitionState
// Assume that everything will go right. It is overwritten in case of an error.
responseMap.put(topicPartition, Errors.NONE)
} else if (requestLeaderEpoch < currentLeaderEpoch) {
stateChangeLogger.warn(s"Ignoring StopReplica request (delete=$deletePartition) from " +
s"controller $controllerId with correlation id $correlationId " +
s"epoch $controllerEpoch for partition $topicPartition since its associated " +
s"leader epoch $requestLeaderEpoch is smaller than the current " +
s"leader epoch $currentLeaderEpoch")
responseMap.put(topicPartition, Errors.FENCED_LEADER_EPOCH)
} else {
stateChangeLogger.info(s"Ignoring StopReplica request (delete=$deletePartition) from " +
s"controller $controllerId with correlation id $correlationId " +
s"epoch $controllerEpoch for partition $topicPartition since its associated " +
s"leader epoch $requestLeaderEpoch matches the current leader epoch")
responseMap.put(topicPartition, Errors.FENCED_LEADER_EPOCH)
}
case HostedPartition.None =>
// Delete log and corresponding folders in case replica manager doesn't hold them anymore.
// This could happen when topic is being deleted while broker is down and recovers.
stoppedPartitions += topicPartition -> partitionState
responseMap.put(topicPartition, Errors.NONE)
}
}
// First stop fetchers for all partitions.
val partitions = stoppedPartitions.keySet
replicaFetcherManager.removeFetcherForPartitions(partitions)
replicaAlterLogDirsManager.removeFetcherForPartitions(partitions)
// Second remove deleted partitions from the partition map. Fetchers rely on the
// ReplicaManager to get Partition's information so they must be stopped first.
val deletedPartitions = mutable.Set.empty[TopicPartition]
stoppedPartitions.foreach { case (topicPartition, partitionState) =>
if (partitionState.deletePartition) {
getPartition(topicPartition) match {
case hostedPartition@HostedPartition.Online(partition) =>
if (allPartitions.remove(topicPartition, hostedPartition)) {
maybeRemoveTopicMetrics(topicPartition.topic)
// Logs are not deleted here. They are deleted in a single batch later on.
// This is done to avoid having to checkpoint for every deletions.
partition.delete()
}
case _ =>
}
deletedPartitions += topicPartition
}
// If we were the leader, we may have some operations still waiting for completion.
// We force completion to prevent them from timing out.
completeDelayedFetchOrProduceRequests(topicPartition)
}
// Third delete the logs and checkpoint.
logManager.asyncDelete(deletedPartitions, (topicPartition, exception) => {
exception match {
case e: KafkaStorageException =>
stateChangeLogger.error(s"Ignoring StopReplica request (delete=true) from " +
s"controller $controllerId with correlation id $correlationId " +
s"epoch $controllerEpoch for partition $topicPartition as the local replica for the " +
"partition is in an offline log directory")
responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
case e =>
stateChangeLogger.error(s"Ignoring StopReplica request (delete=true) from " +
s"controller $controllerId with correlation id $correlationId " +
s"epoch $controllerEpoch for partition $topicPartition due to an unexpected " +
s"${e.getClass.getName} exception: ${e.getMessage}")
responseMap.put(topicPartition, Errors.forException(e))
}
})
(responseMap, Errors.NONE)
}
}
}