in worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala [1065:1212]
def createDiskFile(
location: PartitionLocation,
appId: String,
shuffleId: Int,
fileName: String,
userIdentifier: UserIdentifier,
partitionType: PartitionType,
partitionSplitEnabled: Boolean): (Flusher, DiskFileInfo, File) = {
val suggestedMountPoint = location.getStorageInfo.getMountPoint
var retryCount = 0
var exception: IOException = null
val shuffleKey = Utils.makeShuffleKey(appId, shuffleId)
while (retryCount < conf.workerCreateWriterMaxAttempts) {
val diskInfo = diskInfos.get(suggestedMountPoint)
val dirs =
if (diskInfo != null && diskInfo.status.equals(DiskStatus.HEALTHY)) {
diskInfo.dirs
} else {
if (suggestedMountPoint.isEmpty) {
logDebug(s"Location suggestedMountPoint is not set, return all healthy working dirs.")
} else {
logInfo(s"Disk(${diskInfo.mountPoint}) unavailable for $suggestedMountPoint, return all healthy" +
s" working dirs.")
}
healthyWorkingDirs()
}
if (dirs.isEmpty && hdfsFlusher.isEmpty && s3Flusher.isEmpty && ossFlusher.isEmpty) {
throw new IOException(s"No available disks! suggested mountPoint $suggestedMountPoint")
}
if (dirs.isEmpty && location.getStorageInfo.HDFSAvailable()) {
val shuffleDir =
new Path(new Path(hdfsDir, conf.workerWorkingDir), s"$appId/$shuffleId")
FileSystem.mkdirs(
StorageManager.hadoopFs.get(StorageInfo.Type.HDFS),
shuffleDir,
hdfsPermission)
val hdfsFilePath = new Path(shuffleDir, fileName).toString
val hdfsFileInfo = new DiskFileInfo(
userIdentifier,
partitionSplitEnabled,
new ReduceFileMeta(conf.shuffleChunkSize),
hdfsFilePath,
StorageInfo.Type.HDFS)
diskFileInfos.computeIfAbsent(shuffleKey, diskFileInfoMapFunc).put(
fileName,
hdfsFileInfo)
return (hdfsFlusher.get, hdfsFileInfo, null)
} else if (dirs.isEmpty && location.getStorageInfo.S3Available()) {
val shuffleDir =
new Path(new Path(s3Dir, conf.workerWorkingDir), s"$appId/$shuffleId")
FileSystem.mkdirs(
StorageManager.hadoopFs.get(StorageInfo.Type.S3),
shuffleDir,
hdfsPermission)
val s3FilePath = new Path(shuffleDir, fileName).toString
val s3FileInfo = new DiskFileInfo(
userIdentifier,
partitionSplitEnabled,
new ReduceFileMeta(conf.shuffleChunkSize),
s3FilePath,
StorageInfo.Type.S3)
diskFileInfos.computeIfAbsent(shuffleKey, diskFileInfoMapFunc).put(
fileName,
s3FileInfo)
return (s3Flusher.get, s3FileInfo, null)
} else if (hasOssStorage && location.getStorageInfo.OSSAvailable()) {
val shuffleDir =
new Path(new Path(ossDir, conf.workerWorkingDir), s"$appId/$shuffleId")
FileSystem.mkdirs(
StorageManager.hadoopFs.get(StorageInfo.Type.OSS),
shuffleDir,
hdfsPermission)
val ossFilePath = new Path(shuffleDir, fileName).toString
val ossFileInfo = new DiskFileInfo(
userIdentifier,
partitionSplitEnabled,
new ReduceFileMeta(conf.shuffleChunkSize),
ossFilePath,
StorageInfo.Type.OSS)
diskFileInfos.computeIfAbsent(shuffleKey, diskFileInfoMapFunc).put(
fileName,
ossFileInfo)
return (ossFlusher.get, ossFileInfo, null)
} else if (dirs.nonEmpty && location.getStorageInfo.localDiskAvailable()) {
val dir = dirs(getNextIndex % dirs.size)
val mountPoint = DeviceInfo.getMountPoint(dir.getAbsolutePath, mountPoints)
val shuffleDir = new File(dir, s"$appId/$shuffleId")
shuffleDir.mkdirs()
val file = new File(shuffleDir, fileName)
try {
if (file.exists()) {
throw new FileAlreadyExistsException(
s"Shuffle data file ${file.getAbsolutePath} already exists.")
} else {
val createFileSuccess = file.createNewFile()
if (!createFileSuccess) {
throw new CelebornException(
s"Create shuffle data file ${file.getAbsolutePath} failed!")
}
}
val filePath = file.getAbsolutePath
val fileMeta = partitionType match {
case PartitionType.REDUCE =>
new ReduceFileMeta(conf.shuffleChunkSize)
case PartitionType.MAP =>
val mapFileMeta = new MapFileMeta()
mapFileMeta.setMountPoint(mountPoint)
mapFileMeta
case PartitionType.MAPGROUP =>
throw new NotImplementedError("Map group is not implemented")
}
val diskFileInfo = new DiskFileInfo(
userIdentifier,
partitionSplitEnabled,
fileMeta,
filePath,
StorageInfo.Type.HDD)
logInfo(s"created file at $filePath")
diskFileInfos.computeIfAbsent(shuffleKey, diskFileInfoMapFunc).put(
fileName,
diskFileInfo)
return (
localFlushers.get(mountPoint),
diskFileInfo,
dir)
} catch {
case fe: FileAlreadyExistsException =>
logError("Failed to create fileWriter because of existed file", fe)
throw fe
case t: Throwable =>
logError(
s"Create FileWriter for ${file.getAbsolutePath} of mount $mountPoint " +
s"failed, report to DeviceMonitor",
t)
deviceMonitor.reportNonCriticalError(
mountPoint,
new IOException(t),
DiskStatus.READ_OR_WRITE_FAILURE)
throw t
}
} else {
exception = new IOException("No storage available for location:" + location.toString)
}
retryCount += 1
}
throw exception
}