def shuffleFallbackPartitionThreshold: Long = get()

in common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala [1083:1180]


  def shuffleFallbackPartitionThreshold: Long = get(SPARK_SHUFFLE_FALLBACK_PARTITION_THRESHOLD)
  def shuffleExpiredCheckIntervalMs: Long = get(SHUFFLE_EXPIRED_CHECK_INTERVAL)
  def shuffleManagerPort: Int = get(CLIENT_SHUFFLE_MANAGER_PORT)
  def shuffleChunkSize: Long = get(SHUFFLE_CHUNK_SIZE)
  def dfsReadChunkSize: Long = get(CLIENT_FETCH_DFS_READ_CHUNK_SIZE)
  def shufflePartitionSplitMode: PartitionSplitMode =
    PartitionSplitMode.valueOf(get(SHUFFLE_PARTITION_SPLIT_MODE))
  def shufflePartitionSplitThreshold: Long = get(SHUFFLE_PARTITION_SPLIT_THRESHOLD)
  def batchHandleChangePartitionEnabled: Boolean = get(CLIENT_BATCH_HANDLE_CHANGE_PARTITION_ENABLED)
  def batchHandleChangePartitionBuckets: Int =
    get(CLIENT_BATCH_HANDLE_CHANGE_PARTITION_BUCKETS)
  def batchHandleChangePartitionNumThreads: Int = get(CLIENT_BATCH_HANDLE_CHANGE_PARTITION_THREADS)
  def batchHandleChangePartitionRequestInterval: Long =
    get(CLIENT_BATCH_HANDLE_CHANGE_PARTITION_INTERVAL)
  def batchHandleRemoveExpiredShufflesEnabled: Boolean =
    get(CLIENT_BATCH_REMOVE_EXPIRED_SHUFFLE_ENABLED)
  def batchHandleCommitPartitionEnabled: Boolean = get(CLIENT_BATCH_HANDLE_COMMIT_PARTITION_ENABLED)
  def batchHandleCommitPartitionNumThreads: Int = get(CLIENT_BATCH_HANDLE_COMMIT_PARTITION_THREADS)
  def batchHandleCommitPartitionRequestInterval: Long =
    get(CLIENT_BATCH_HANDLED_COMMIT_PARTITION_INTERVAL)
  def batchHandleReleasePartitionEnabled: Boolean =
    get(CLIENT_BATCH_HANDLE_RELEASE_PARTITION_ENABLED)
  def batchHandleReleasePartitionNumThreads: Int =
    get(CLIENT_BATCH_HANDLE_RELEASE_PARTITION_THREADS)
  def batchHandleReleasePartitionRequestInterval: Long =
    get(CLIENT_BATCH_HANDLED_RELEASE_PARTITION_INTERVAL)
  def enableReadLocalShuffleFile: Boolean = get(READ_LOCAL_SHUFFLE_FILE)
  def readLocalShuffleThreads: Int = get(READ_LOCAL_SHUFFLE_THREADS)
  def readStreamCreatorPoolThreads: Int = get(READ_STREAM_CREATOR_POOL_THREADS)

  def registerShuffleFilterExcludedWorkerEnabled: Boolean =
    get(REGISTER_SHUFFLE_FILTER_EXCLUDED_WORKER_ENABLED)
  def reviseLostShufflesEnabled: Boolean = get(REVISE_LOST_SHUFFLES_ENABLED)

  // //////////////////////////////////////////////////////
  //                       Worker                        //
  // //////////////////////////////////////////////////////

  /**
   * @return workingDir, usable space, flusher thread count, disk type
   *         check more details at CONFIGURATION_GUIDE.md
   */
  def workerBaseDirs: Seq[(String, Long, Int, Type)] = {
    // I assume there is no disk is bigger than 1 PB in recent days.
    val defaultMaxCapacity = Utils.byteStringAsBytes("1PB")
    get(WORKER_STORAGE_DIRS).map { storageDirs: Seq[String] =>
      storageDirs.map { str =>
        var maxCapacity = defaultMaxCapacity
        var diskType = HDD
        var flushThread = get(WORKER_FLUSHER_THREADS)
        val (dir, attributes) = str.split(":").toList match {
          case _dir :: tail => (_dir, tail)
          case nil => throw new IllegalArgumentException(s"Illegal storage dir: $nil")
        }
        var flushThreadsDefined = false
        attributes.foreach {
          case capacityStr if capacityStr.toLowerCase.startsWith("capacity=") =>
            maxCapacity = Utils.byteStringAsBytes(capacityStr.split("=")(1))
          case diskTypeStr if diskTypeStr.toLowerCase.startsWith("disktype=") =>
            diskType = Type.valueOf(diskTypeStr.split("=")(1))
            if (diskType == Type.MEMORY) {
              throw new IOException(s"Invalid diskType: $diskType")
            }
            if (!flushThreadsDefined) {
              flushThread = diskType match {
                case HDD => workerHddFlusherThreads
                case SSD => workerSsdFlusherThreads
                case _ => flushThread
              }
            }
          case threadCountStr if threadCountStr.toLowerCase.startsWith("flushthread=") =>
            flushThread = threadCountStr.split("=")(1).toInt
            flushThreadsDefined = true
          case illegal =>
            throw new IllegalArgumentException(s"Illegal attribute: $illegal")
        }
        (dir, maxCapacity, flushThread, diskType)
      }
    }.getOrElse {
      if (!hasHDFSStorage && !hasS3Storage && !hasOssStorage) {
        val prefix = workerStorageBaseDirPrefix
        val number = workerStorageBaseDirNumber
        val diskType = Type.valueOf(workerStorageBaseDirDiskType)
        (1 to number).map { i =>
          (
            s"$prefix$i",
            defaultMaxCapacity,
            diskType match {
              case SSD => workerSsdFlusherThreads
              case _ => workerHddFlusherThreads
            },
            diskType)
        }
      } else {
        Seq.empty
      }
    }
  }