def createFileWriter()

in worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala [70:199]


  def createFileWriter(
      partitionDataWriterContext: PartitionDataWriterContext,
      partitionType: PartitionType,
      numPendingWrites: AtomicInteger,
      notifier: FlushNotifier,
      order: Option[List[String]] = createFileOrder): TierWriterBase = {
    logDebug(
      s"create file for ${partitionDataWriterContext.getShuffleKey} ${partitionDataWriterContext.getPartitionLocation.getFileName}")
    val location = partitionDataWriterContext.getPartitionLocation
    if (location == null) {
      throw new IllegalStateException(
        "Partition data writer context can not have null partition location")
    }

    def getPartitionMetaHandler(fileInfo: FileInfo) = {
      partitionType match {
        case PartitionType.REDUCE =>
          new ReducePartitionMetaHandler(partitionDataWriterContext.isRangeReadFilter, fileInfo)
        case PartitionType.MAP =>
          if (partitionDataWriterContext.isSegmentGranularityVisible) {
            new SegmentMapPartitionMetaHandler(fileInfo.asInstanceOf[DiskFileInfo], notifier)
          } else {
            new MapPartitionMetaHandler(
              fileInfo.asInstanceOf[DiskFileInfo],
              notifier)
          }
        case PartitionType.MAPGROUP =>
          null
      }
    }

    def tryCreateFileByType(storageInfoType: StorageInfo.Type): TierWriterBase = {
      try {
        storageInfoType match {
          case StorageInfo.Type.MEMORY =>
            if (location.getStorageInfo.memoryAvailable() && MemoryManager.instance().memoryFileStorageAvailable()) {
              logDebug(s"Create memory file for ${partitionDataWriterContext.getShuffleKey} ${partitionDataWriterContext.getPartitionLocation.getFileName}")
              val memoryFileInfo = storageManager.createMemoryFileInfo(
                partitionDataWriterContext.getAppId,
                partitionDataWriterContext.getShuffleId,
                location.getFileName,
                partitionDataWriterContext.getUserIdentifier,
                partitionDataWriterContext.getPartitionType,
                partitionDataWriterContext.isPartitionSplitEnabled)
              val metaHandler = getPartitionMetaHandler(memoryFileInfo)
              new MemoryTierWriter(
                conf,
                metaHandler,
                numPendingWrites,
                notifier,
                source,
                memoryFileInfo,
                storageInfoType,
                partitionDataWriterContext,
                storageManager)
            } else {
              null
            }
          case StorageInfo.Type.HDD | StorageInfo.Type.SSD | StorageInfo.Type.HDFS | StorageInfo.Type.OSS | StorageInfo.Type.S3 =>
            if (storageManager.localOrDfsStorageAvailable) {
              logDebug(s"create non-memory file for ${partitionDataWriterContext.getShuffleKey} ${partitionDataWriterContext.getPartitionLocation.getFileName}")
              val (flusher, diskFileInfo, workingDir) = storageManager.createDiskFile(
                location,
                partitionDataWriterContext.getAppId,
                partitionDataWriterContext.getShuffleId,
                location.getFileName,
                partitionDataWriterContext.getUserIdentifier,
                partitionDataWriterContext.getPartitionType,
                partitionDataWriterContext.isPartitionSplitEnabled)
              partitionDataWriterContext.setWorkingDir(workingDir)
              val metaHandler = getPartitionMetaHandler(diskFileInfo)
              if (storageInfoType == StorageInfo.Type.HDD || storageInfoType == StorageInfo.Type.SSD) {
                new LocalTierWriter(
                  conf,
                  metaHandler,
                  numPendingWrites,
                  notifier,
                  flusher,
                  source,
                  diskFileInfo,
                  storageInfoType,
                  partitionDataWriterContext,
                  storageManager)
              } else {
                new DfsTierWriter(
                  conf,
                  metaHandler,
                  numPendingWrites,
                  notifier,
                  flusher,
                  source,
                  diskFileInfo,
                  storageInfoType,
                  partitionDataWriterContext,
                  storageManager)
              }
            } else {
              null
            }
        }
      } catch {
        case e: Exception =>
          logError(s"create celeborn file for storage $storageInfoType failed", e)
          null
      }
    }

    // the fallback order is MEMORY -> LOCAL -> DFS
    if (order.isEmpty) {
      throw new CelebornIOException("Create file order can not be empty, check your configs")
    }

    val tryCreateFileTypeIndex = order.get.indexOf(
      partitionDataWriterContext.getPartitionLocation.getStorageInfo.getType.name())
    val maxSize = order.get.length
    for (i <- tryCreateFileTypeIndex until maxSize) {
      val storageStr = order.get(i)
      val storageInfoType = StorageInfo.fromStrToType(storageStr)
      val file = tryCreateFileByType(storageInfoType)
      if (file != null) {
        return file
      }
    }

    logError(
      s"Could not create file for storage type ${location.getStorageInfo.getType}")

    throw new CelebornIOException(
      s"Create file failed for context ${partitionDataWriterContext.toString}")
  }