in core/src/main/scala/kafka/log/LogManager.scala [406:542]
private[log] def loadLogs(defaultConfig: LogConfig, topicConfigOverrides: Map[String, LogConfig], isStray: UnifiedLog => Boolean): Unit = {
info(s"Loading logs from log dirs $liveLogDirs")
val startMs = time.hiResClockMs()
val threadPools = ArrayBuffer.empty[ExecutorService]
val offlineDirs = mutable.Set.empty[(String, IOException)]
val jobs = ArrayBuffer.empty[Seq[Future[_]]]
var numTotalLogs = 0
// log dir path -> number of Remaining logs map for remainingLogsToRecover metric
val numRemainingLogs: ConcurrentMap[String, Int] = new ConcurrentHashMap[String, Int]
// log recovery thread name -> number of remaining segments map for remainingSegmentsToRecover metric
val numRemainingSegments: ConcurrentMap[String, Integer] = new ConcurrentHashMap[String, Integer]
def handleIOException(logDirAbsolutePath: String, e: IOException): Unit = {
offlineDirs.add((logDirAbsolutePath, e))
error(s"Error while loading log dir $logDirAbsolutePath", e)
}
val uncleanLogDirs = mutable.Buffer.empty[String]
for (dir <- liveLogDirs) {
val logDirAbsolutePath = dir.getAbsolutePath
var hadCleanShutdown: Boolean = false
try {
val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir,
new LogRecoveryThreadFactory(logDirAbsolutePath))
threadPools.append(pool)
val cleanShutdownFileHandler = new CleanShutdownFileHandler(dir.getPath)
if (cleanShutdownFileHandler.exists()) {
// Cache the clean shutdown status and use that for rest of log loading workflow. Delete the CleanShutdownFile
// so that if broker crashes while loading the log, it is considered hard shutdown during the next boot up. KAFKA-10471
cleanShutdownFileHandler.delete()
hadCleanShutdown = true
}
hadCleanShutdownFlags.put(logDirAbsolutePath, hadCleanShutdown)
val recoveryPoints: util.Map[TopicPartition, JLong] = try {
this.recoveryPointCheckpoints(dir).read()
} catch {
case e: Exception =>
warn(s"Error occurred while reading recovery-point-offset-checkpoint file of directory " +
s"$logDirAbsolutePath, resetting the recovery checkpoint to 0", e)
Collections.emptyMap[TopicPartition, JLong]
}
val logStartOffsets: util.Map[TopicPartition, JLong] = try {
this.logStartOffsetCheckpoints(dir).read()
} catch {
case e: Exception =>
warn(s"Error occurred while reading log-start-offset-checkpoint file of directory " +
s"$logDirAbsolutePath, resetting to the base offset of the first segment", e)
Collections.emptyMap[TopicPartition, JLong]
}
val logsToLoad = Option(dir.listFiles).getOrElse(Array.empty).filter(logDir =>
logDir.isDirectory &&
// Ignore remote-log-index-cache directory as that is index cache maintained by tiered storage subsystem
// but not any topic-partition dir.
!logDir.getName.equals(RemoteIndexCache.DIR_NAME) &&
UnifiedLog.parseTopicPartitionName(logDir).topic != KafkaRaftServer.MetadataTopic)
numTotalLogs += logsToLoad.length
numRemainingLogs.put(logDirAbsolutePath, logsToLoad.length)
loadLogsCompletedFlags.put(logDirAbsolutePath, logsToLoad.isEmpty)
if (logsToLoad.isEmpty) {
info(s"No logs found to be loaded in $logDirAbsolutePath")
} else if (hadCleanShutdown) {
info(s"Skipping recovery of ${logsToLoad.length} logs from $logDirAbsolutePath since " +
"clean shutdown file was found")
} else {
info(s"Recovering ${logsToLoad.length} logs from $logDirAbsolutePath since no " +
"clean shutdown file was found")
uncleanLogDirs.append(logDirAbsolutePath)
}
val jobsForDir = logsToLoad.map { logDir =>
val runnable: Runnable = () => {
debug(s"Loading log $logDir")
var log = None: Option[UnifiedLog]
val logLoadStartMs = time.hiResClockMs()
try {
log = Some(loadLog(logDir, hadCleanShutdown, recoveryPoints, logStartOffsets,
defaultConfig, topicConfigOverrides, numRemainingSegments, isStray))
} catch {
case e: IOException =>
handleIOException(logDirAbsolutePath, e)
case e: KafkaStorageException if e.getCause.isInstanceOf[IOException] =>
// KafkaStorageException might be thrown, ex: during writing LeaderEpochFileCache
// And while converting IOException to KafkaStorageException, we've already handled the exception. So we can ignore it here.
} finally {
val logLoadDurationMs = time.hiResClockMs() - logLoadStartMs
val remainingLogs = decNumRemainingLogs(numRemainingLogs, logDirAbsolutePath)
val currentNumLoaded = logsToLoad.length - remainingLogs
log match {
case Some(loadedLog) => info(s"Completed load of $loadedLog with ${loadedLog.numberOfSegments} segments, " +
s"local-log-start-offset ${loadedLog.localLogStartOffset()} and log-end-offset ${loadedLog.logEndOffset} in ${logLoadDurationMs}ms " +
s"($currentNumLoaded/${logsToLoad.length} completed in $logDirAbsolutePath)")
case None => info(s"Error while loading logs in $logDir in ${logLoadDurationMs}ms ($currentNumLoaded/${logsToLoad.length} completed in $logDirAbsolutePath)")
}
if (remainingLogs == 0) {
// loadLog is completed for all logs under the logDir, mark it.
loadLogsCompletedFlags.put(logDirAbsolutePath, true)
}
}
}
runnable
}
jobs += jobsForDir.map(pool.submit)
} catch {
case e: IOException =>
handleIOException(logDirAbsolutePath, e)
}
}
try {
addLogRecoveryMetrics(numRemainingLogs, numRemainingSegments)
for (dirJobs <- jobs) {
dirJobs.foreach(_.get)
}
offlineDirs.foreach { case (dir, e) =>
logDirFailureChannel.maybeAddOfflineLogDir(dir, s"Error while loading log dir $dir", e)
}
} catch {
case e: ExecutionException =>
error(s"There was an error in one of the threads during logs loading: ${e.getCause}")
throw e.getCause
} finally {
removeLogRecoveryMetrics()
threadPools.foreach(_.shutdown())
}
val elapsedMs = time.hiResClockMs() - startMs
val printedUncleanLogDirs = if (uncleanLogDirs.isEmpty) "" else s" (unclean log dirs = $uncleanLogDirs)"
info(s"Loaded $numTotalLogs logs in ${elapsedMs}ms$printedUncleanLogDirs")
}