def cleanupExpiredShuffleKey()

in worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala [633:694]


  def cleanupExpiredShuffleKey(
      expiredShuffleKeys: util.HashSet[String],
      cleanDB: Boolean = true): Unit = {
    expiredShuffleKeys.asScala.foreach { shuffleKey =>
      logInfo(s"Cleanup expired shuffle $shuffleKey.")
      if (diskFileInfos.containsKey(shuffleKey)) {
        val removedFileInfos = diskFileInfos.remove(shuffleKey)
        var isDfsExpired = false
        var isHdfs = false
        var isOss = false
        if (removedFileInfos != null) {
          removedFileInfos.asScala.foreach {
            case (_, fileInfo) =>
              if (cleanFileInternal(shuffleKey, fileInfo)) {
                isDfsExpired = true
                isHdfs = fileInfo.isHdfs
                isOss = fileInfo.isOSS
              }
          }
        }
        val (appId, shuffleId) = Utils.splitShuffleKey(shuffleKey)
        disksSnapshot().filter(diskInfo =>
          diskInfo.status == DiskStatus.HEALTHY
            || diskInfo.status == DiskStatus.HIGH_DISK_USAGE).foreach { diskInfo =>
          diskInfo.dirs.foreach { dir =>
            val file = new File(dir, s"$appId/$shuffleId")
            deleteDirectory(file, diskOperators.get(diskInfo.mountPoint))
          }
        }
        if (isDfsExpired) {
          try {
            val dir =
              if (hasHDFSStorage && isHdfs) hdfsDir
              else if (hasOssStorage && isOss) ossDir
              else s3Dir
            val storageInfo =
              if (hasHDFSStorage && isHdfs) StorageInfo.Type.HDFS
              else if (hasOssStorage && isOss) StorageInfo.Type.OSS
              else StorageInfo.Type.S3
            StorageManager.hadoopFs.get(storageInfo).delete(
              new Path(new Path(dir, conf.workerWorkingDir), s"$appId/$shuffleId"),
              true)
          } catch {
            case e: Exception => logWarning("Clean expired DFS shuffle failed.", e)
          }
        }
        if (workerGracefulShutdown) {
          committedFileInfos.remove(shuffleKey)
          if (cleanDB) {
            db.delete(dbShuffleKey(shuffleKey))
          }
        }
      }
      if (memoryFileInfos.containsKey(shuffleKey)) {
        val memoryFileMaps = memoryFileInfos.remove(shuffleKey)
        memoryFileMaps.asScala.foreach(u => {
          cleanFileInternal(shuffleKey, u._2)
          MemoryManager.instance().releaseMemoryFileStorage(u._2.releaseMemoryBuffers())
        })
      }
    }
  }