in worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala [70:199]
def createFileWriter(
partitionDataWriterContext: PartitionDataWriterContext,
partitionType: PartitionType,
numPendingWrites: AtomicInteger,
notifier: FlushNotifier,
order: Option[List[String]] = createFileOrder): TierWriterBase = {
logDebug(
s"create file for ${partitionDataWriterContext.getShuffleKey} ${partitionDataWriterContext.getPartitionLocation.getFileName}")
val location = partitionDataWriterContext.getPartitionLocation
if (location == null) {
throw new IllegalStateException(
"Partition data writer context can not have null partition location")
}
def getPartitionMetaHandler(fileInfo: FileInfo) = {
partitionType match {
case PartitionType.REDUCE =>
new ReducePartitionMetaHandler(partitionDataWriterContext.isRangeReadFilter, fileInfo)
case PartitionType.MAP =>
if (partitionDataWriterContext.isSegmentGranularityVisible) {
new SegmentMapPartitionMetaHandler(fileInfo.asInstanceOf[DiskFileInfo], notifier)
} else {
new MapPartitionMetaHandler(
fileInfo.asInstanceOf[DiskFileInfo],
notifier)
}
case PartitionType.MAPGROUP =>
null
}
}
def tryCreateFileByType(storageInfoType: StorageInfo.Type): TierWriterBase = {
try {
storageInfoType match {
case StorageInfo.Type.MEMORY =>
if (location.getStorageInfo.memoryAvailable() && MemoryManager.instance().memoryFileStorageAvailable()) {
logDebug(s"Create memory file for ${partitionDataWriterContext.getShuffleKey} ${partitionDataWriterContext.getPartitionLocation.getFileName}")
val memoryFileInfo = storageManager.createMemoryFileInfo(
partitionDataWriterContext.getAppId,
partitionDataWriterContext.getShuffleId,
location.getFileName,
partitionDataWriterContext.getUserIdentifier,
partitionDataWriterContext.getPartitionType,
partitionDataWriterContext.isPartitionSplitEnabled)
val metaHandler = getPartitionMetaHandler(memoryFileInfo)
new MemoryTierWriter(
conf,
metaHandler,
numPendingWrites,
notifier,
source,
memoryFileInfo,
storageInfoType,
partitionDataWriterContext,
storageManager)
} else {
null
}
case StorageInfo.Type.HDD | StorageInfo.Type.SSD | StorageInfo.Type.HDFS | StorageInfo.Type.OSS | StorageInfo.Type.S3 =>
if (storageManager.localOrDfsStorageAvailable) {
logDebug(s"create non-memory file for ${partitionDataWriterContext.getShuffleKey} ${partitionDataWriterContext.getPartitionLocation.getFileName}")
val (flusher, diskFileInfo, workingDir) = storageManager.createDiskFile(
location,
partitionDataWriterContext.getAppId,
partitionDataWriterContext.getShuffleId,
location.getFileName,
partitionDataWriterContext.getUserIdentifier,
partitionDataWriterContext.getPartitionType,
partitionDataWriterContext.isPartitionSplitEnabled)
partitionDataWriterContext.setWorkingDir(workingDir)
val metaHandler = getPartitionMetaHandler(diskFileInfo)
if (storageInfoType == StorageInfo.Type.HDD || storageInfoType == StorageInfo.Type.SSD) {
new LocalTierWriter(
conf,
metaHandler,
numPendingWrites,
notifier,
flusher,
source,
diskFileInfo,
storageInfoType,
partitionDataWriterContext,
storageManager)
} else {
new DfsTierWriter(
conf,
metaHandler,
numPendingWrites,
notifier,
flusher,
source,
diskFileInfo,
storageInfoType,
partitionDataWriterContext,
storageManager)
}
} else {
null
}
}
} catch {
case e: Exception =>
logError(s"create celeborn file for storage $storageInfoType failed", e)
null
}
}
// the fallback order is MEMORY -> LOCAL -> DFS
if (order.isEmpty) {
throw new CelebornIOException("Create file order can not be empty, check your configs")
}
val tryCreateFileTypeIndex = order.get.indexOf(
partitionDataWriterContext.getPartitionLocation.getStorageInfo.getType.name())
val maxSize = order.get.length
for (i <- tryCreateFileTypeIndex until maxSize) {
val storageStr = order.get(i)
val storageInfoType = StorageInfo.fromStrToType(storageStr)
val file = tryCreateFileByType(storageInfoType)
if (file != null) {
return file
}
}
logError(
s"Could not create file for storage type ${location.getStorageInfo.getType}")
throw new CelebornIOException(
s"Create file failed for context ${partitionDataWriterContext.toString}")
}