def createDiskFile()

in worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala [1065:1212]


  def createDiskFile(
      location: PartitionLocation,
      appId: String,
      shuffleId: Int,
      fileName: String,
      userIdentifier: UserIdentifier,
      partitionType: PartitionType,
      partitionSplitEnabled: Boolean): (Flusher, DiskFileInfo, File) = {
    val suggestedMountPoint = location.getStorageInfo.getMountPoint
    var retryCount = 0
    var exception: IOException = null
    val shuffleKey = Utils.makeShuffleKey(appId, shuffleId)
    while (retryCount < conf.workerCreateWriterMaxAttempts) {
      val diskInfo = diskInfos.get(suggestedMountPoint)
      val dirs =
        if (diskInfo != null && diskInfo.status.equals(DiskStatus.HEALTHY)) {
          diskInfo.dirs
        } else {
          if (suggestedMountPoint.isEmpty) {
            logDebug(s"Location suggestedMountPoint is not set, return all healthy working dirs.")
          } else {
            logInfo(s"Disk(${diskInfo.mountPoint}) unavailable for $suggestedMountPoint, return all healthy" +
              s" working dirs.")
          }
          healthyWorkingDirs()
        }
      if (dirs.isEmpty && hdfsFlusher.isEmpty && s3Flusher.isEmpty && ossFlusher.isEmpty) {
        throw new IOException(s"No available disks! suggested mountPoint $suggestedMountPoint")
      }

      if (dirs.isEmpty && location.getStorageInfo.HDFSAvailable()) {
        val shuffleDir =
          new Path(new Path(hdfsDir, conf.workerWorkingDir), s"$appId/$shuffleId")
        FileSystem.mkdirs(
          StorageManager.hadoopFs.get(StorageInfo.Type.HDFS),
          shuffleDir,
          hdfsPermission)
        val hdfsFilePath = new Path(shuffleDir, fileName).toString
        val hdfsFileInfo = new DiskFileInfo(
          userIdentifier,
          partitionSplitEnabled,
          new ReduceFileMeta(conf.shuffleChunkSize),
          hdfsFilePath,
          StorageInfo.Type.HDFS)
        diskFileInfos.computeIfAbsent(shuffleKey, diskFileInfoMapFunc).put(
          fileName,
          hdfsFileInfo)
        return (hdfsFlusher.get, hdfsFileInfo, null)
      } else if (dirs.isEmpty && location.getStorageInfo.S3Available()) {
        val shuffleDir =
          new Path(new Path(s3Dir, conf.workerWorkingDir), s"$appId/$shuffleId")
        FileSystem.mkdirs(
          StorageManager.hadoopFs.get(StorageInfo.Type.S3),
          shuffleDir,
          hdfsPermission)
        val s3FilePath = new Path(shuffleDir, fileName).toString
        val s3FileInfo = new DiskFileInfo(
          userIdentifier,
          partitionSplitEnabled,
          new ReduceFileMeta(conf.shuffleChunkSize),
          s3FilePath,
          StorageInfo.Type.S3)
        diskFileInfos.computeIfAbsent(shuffleKey, diskFileInfoMapFunc).put(
          fileName,
          s3FileInfo)
        return (s3Flusher.get, s3FileInfo, null)
      } else if (hasOssStorage && location.getStorageInfo.OSSAvailable()) {
        val shuffleDir =
          new Path(new Path(ossDir, conf.workerWorkingDir), s"$appId/$shuffleId")
        FileSystem.mkdirs(
          StorageManager.hadoopFs.get(StorageInfo.Type.OSS),
          shuffleDir,
          hdfsPermission)
        val ossFilePath = new Path(shuffleDir, fileName).toString
        val ossFileInfo = new DiskFileInfo(
          userIdentifier,
          partitionSplitEnabled,
          new ReduceFileMeta(conf.shuffleChunkSize),
          ossFilePath,
          StorageInfo.Type.OSS)
        diskFileInfos.computeIfAbsent(shuffleKey, diskFileInfoMapFunc).put(
          fileName,
          ossFileInfo)
        return (ossFlusher.get, ossFileInfo, null)
      } else if (dirs.nonEmpty && location.getStorageInfo.localDiskAvailable()) {
        val dir = dirs(getNextIndex % dirs.size)
        val mountPoint = DeviceInfo.getMountPoint(dir.getAbsolutePath, mountPoints)
        val shuffleDir = new File(dir, s"$appId/$shuffleId")
        shuffleDir.mkdirs()
        val file = new File(shuffleDir, fileName)
        try {
          if (file.exists()) {
            throw new FileAlreadyExistsException(
              s"Shuffle data file ${file.getAbsolutePath} already exists.")
          } else {
            val createFileSuccess = file.createNewFile()
            if (!createFileSuccess) {
              throw new CelebornException(
                s"Create shuffle data file ${file.getAbsolutePath} failed!")
            }
          }
          val filePath = file.getAbsolutePath
          val fileMeta = partitionType match {
            case PartitionType.REDUCE =>
              new ReduceFileMeta(conf.shuffleChunkSize)
            case PartitionType.MAP =>
              val mapFileMeta = new MapFileMeta()
              mapFileMeta.setMountPoint(mountPoint)
              mapFileMeta
            case PartitionType.MAPGROUP =>
              throw new NotImplementedError("Map group is not implemented")
          }
          val diskFileInfo = new DiskFileInfo(
            userIdentifier,
            partitionSplitEnabled,
            fileMeta,
            filePath,
            StorageInfo.Type.HDD)
          logInfo(s"created file at $filePath")
          diskFileInfos.computeIfAbsent(shuffleKey, diskFileInfoMapFunc).put(
            fileName,
            diskFileInfo)
          return (
            localFlushers.get(mountPoint),
            diskFileInfo,
            dir)
        } catch {
          case fe: FileAlreadyExistsException =>
            logError("Failed to create fileWriter because of existed file", fe)
            throw fe
          case t: Throwable =>
            logError(
              s"Create FileWriter for ${file.getAbsolutePath} of mount $mountPoint " +
                s"failed, report to DeviceMonitor",
              t)
            deviceMonitor.reportNonCriticalError(
              mountPoint,
              new IOException(t),
              DiskStatus.READ_OR_WRITE_FAILURE)
            throw t
        }
      } else {
        exception = new IOException("No storage available for location:" + location.toString)
      }
      retryCount += 1
    }
    throw exception
  }