def clientPushLimitInFlightSleepDeltaMs: Long = get()

in common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala [784:877]


  def clientPushLimitInFlightSleepDeltaMs: Long = get(CLIENT_PUSH_LIMIT_IN_FLIGHT_SLEEP_INTERVAL)
  def clientPushSplitPartitionThreads: Int = get(CLIENT_PUSH_SPLIT_PARTITION_THREADS)
  def clientPushTakeTaskWaitIntervalMs: Long = get(CLIENT_PUSH_TAKE_TASK_WAIT_INTERVAL)
  def clientPushTakeTaskMaxWaitAttempts: Int = get(CLIENT_PUSH_TAKE_TASK_MAX_WAIT_ATTEMPTS)
  def clientPushSendBufferPoolExpireTimeout: Long = get(CLIENT_PUSH_SENDBUFFERPOOL_EXPIRETIMEOUT)
  def clientPushSendBufferPoolExpireCheckInterval: Long =
    get(CLIENT_PUSH_SENDBUFFERPOOL_CHECKEXPIREINTERVAL)

  // //////////////////////////////////////////////////////
  //                   Client Shuffle                    //
  // //////////////////////////////////////////////////////
  def shuffleWriterMode: ShuffleMode = ShuffleMode.valueOf(get(SPARK_SHUFFLE_WRITER_MODE))
  def shufflePartitionType: PartitionType = PartitionType.valueOf(get(SHUFFLE_PARTITION_TYPE))
  def shuffleRangeReadFilterEnabled: Boolean = get(SHUFFLE_RANGE_READ_FILTER_ENABLED)
  def shuffleForceFallbackEnabled: Boolean = get(SPARK_SHUFFLE_FORCE_FALLBACK_ENABLED)
  def shuffleForceFallbackPartitionThreshold: Long =
    get(SPARK_SHUFFLE_FORCE_FALLBACK_PARTITION_THRESHOLD)
  def shuffleExpiredCheckIntervalMs: Long = get(SHUFFLE_EXPIRED_CHECK_INTERVAL)
  def shuffleManagerPort: Int = get(CLIENT_SHUFFLE_MANAGER_PORT)
  def shuffleChunkSize: Long = get(SHUFFLE_CHUNK_SIZE)
  def shufflePartitionSplitMode: PartitionSplitMode =
    PartitionSplitMode.valueOf(get(SHUFFLE_PARTITION_SPLIT_MODE))
  def shufflePartitionSplitThreshold: Long = get(SHUFFLE_PARTITION_SPLIT_THRESHOLD)
  def batchHandleChangePartitionEnabled: Boolean = get(CLIENT_BATCH_HANDLE_CHANGE_PARTITION_ENABLED)
  def batchHandleChangePartitionNumThreads: Int = get(CLIENT_BATCH_HANDLE_CHANGE_PARTITION_THREADS)
  def batchHandleChangePartitionRequestInterval: Long =
    get(CLIENT_BATCH_HANDLE_CHANGE_PARTITION_INTERVAL)
  def batchHandleCommitPartitionEnabled: Boolean = get(CLIENT_BATCH_HANDLE_COMMIT_PARTITION_ENABLED)
  def batchHandleCommitPartitionNumThreads: Int = get(CLIENT_BATCH_HANDLE_COMMIT_PARTITION_THREADS)
  def batchHandleCommitPartitionRequestInterval: Long =
    get(CLIENT_BATCH_HANDLED_COMMIT_PARTITION_INTERVAL)
  def batchHandleReleasePartitionEnabled: Boolean =
    get(CLIENT_BATCH_HANDLE_RELEASE_PARTITION_ENABLED)
  def batchHandleReleasePartitionNumThreads: Int =
    get(CLIENT_BATCH_HANDLE_RELEASE_PARTITION_THREADS)
  def batchHandleReleasePartitionRequestInterval: Long =
    get(CLIENT_BATCH_HANDLED_RELEASE_PARTITION_INTERVAL)

  // //////////////////////////////////////////////////////
  //                       Worker                        //
  // //////////////////////////////////////////////////////

  /**
   * @return workingDir, usable space, flusher thread count, disk type
   *         check more details at CONFIGURATION_GUIDE.md
   */
  def workerBaseDirs: Seq[(String, Long, Int, Type)] = {
    // I assume there is no disk is bigger than 1 PB in recent days.
    val defaultMaxCapacity = Utils.byteStringAsBytes("1PB")
    get(WORKER_STORAGE_DIRS).map { storageDirs: Seq[String] =>
      storageDirs.map { str =>
        var maxCapacity = defaultMaxCapacity
        var diskType = HDD
        var flushThread = get(WORKER_FLUSHER_THREADS)
        val (dir, attributes) = str.split(":").toList match {
          case _dir :: tail => (_dir, tail)
          case nil => throw new IllegalArgumentException(s"Illegal storage dir: $nil")
        }
        var flushThreadsDefined = false
        attributes.foreach {
          case capacityStr if capacityStr.toLowerCase.startsWith("capacity=") =>
            maxCapacity = Utils.byteStringAsBytes(capacityStr.split("=")(1))
          case diskTypeStr if diskTypeStr.toLowerCase.startsWith("disktype=") =>
            diskType = Type.valueOf(diskTypeStr.split("=")(1))
            if (diskType == Type.MEMORY) {
              throw new IOException(s"Invalid diskType: $diskType")
            }
            if (!flushThreadsDefined) {
              flushThread = diskType match {
                case HDD => workerHddFlusherThreads
                case SSD => workerSsdFlusherThreads
                case _ => flushThread
              }
            }
          case threadCountStr if threadCountStr.toLowerCase.startsWith("flushthread=") =>
            flushThread = threadCountStr.split("=")(1).toInt
            flushThreadsDefined = true
          case illegal =>
            throw new IllegalArgumentException(s"Illegal attribute: $illegal")
        }
        (dir, maxCapacity, flushThread, diskType)
      }
    }.getOrElse {
      if (!hasHDFSStorage) {
        val prefix = workerStorageBaseDirPrefix
        val number = workerStorageBaseDirNumber
        (1 to number).map { i =>
          (s"$prefix$i", defaultMaxCapacity, workerHddFlusherThreads, HDD)
        }
      } else {
        Seq.empty
      }
    }
  }