in worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala [633:694]
def cleanupExpiredShuffleKey(
expiredShuffleKeys: util.HashSet[String],
cleanDB: Boolean = true): Unit = {
expiredShuffleKeys.asScala.foreach { shuffleKey =>
logInfo(s"Cleanup expired shuffle $shuffleKey.")
if (diskFileInfos.containsKey(shuffleKey)) {
val removedFileInfos = diskFileInfos.remove(shuffleKey)
var isDfsExpired = false
var isHdfs = false
var isOss = false
if (removedFileInfos != null) {
removedFileInfos.asScala.foreach {
case (_, fileInfo) =>
if (cleanFileInternal(shuffleKey, fileInfo)) {
isDfsExpired = true
isHdfs = fileInfo.isHdfs
isOss = fileInfo.isOSS
}
}
}
val (appId, shuffleId) = Utils.splitShuffleKey(shuffleKey)
disksSnapshot().filter(diskInfo =>
diskInfo.status == DiskStatus.HEALTHY
|| diskInfo.status == DiskStatus.HIGH_DISK_USAGE).foreach { diskInfo =>
diskInfo.dirs.foreach { dir =>
val file = new File(dir, s"$appId/$shuffleId")
deleteDirectory(file, diskOperators.get(diskInfo.mountPoint))
}
}
if (isDfsExpired) {
try {
val dir =
if (hasHDFSStorage && isHdfs) hdfsDir
else if (hasOssStorage && isOss) ossDir
else s3Dir
val storageInfo =
if (hasHDFSStorage && isHdfs) StorageInfo.Type.HDFS
else if (hasOssStorage && isOss) StorageInfo.Type.OSS
else StorageInfo.Type.S3
StorageManager.hadoopFs.get(storageInfo).delete(
new Path(new Path(dir, conf.workerWorkingDir), s"$appId/$shuffleId"),
true)
} catch {
case e: Exception => logWarning("Clean expired DFS shuffle failed.", e)
}
}
if (workerGracefulShutdown) {
committedFileInfos.remove(shuffleKey)
if (cleanDB) {
db.delete(dbShuffleKey(shuffleKey))
}
}
}
if (memoryFileInfos.containsKey(shuffleKey)) {
val memoryFileMaps = memoryFileInfos.remove(shuffleKey)
memoryFileMaps.asScala.foreach(u => {
cleanFileInternal(shuffleKey, u._2)
MemoryManager.instance().releaseMemoryFileStorage(u._2.releaseMemoryBuffers())
})
}
}
}