def workerStorageBaseDirPrefix: String = get()

in common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala [894:1026]


  def workerStorageBaseDirPrefix: String = get(WORKER_STORAGE_BASE_DIR_PREFIX)
  def workerStorageBaseDirNumber: Int = get(WORKER_STORAGE_BASE_DIR_COUNT)
  def creditStreamThreadsPerMountpoint: Int = get(WORKER_BUFFERSTREAM_THREADS_PER_MOUNTPOINT)
  def workerDirectMemoryRatioForReadBuffer: Double = get(WORKER_DIRECT_MEMORY_RATIO_FOR_READ_BUFFER)
  def partitionReadBuffersMin: Int = get(WORKER_PARTITION_READ_BUFFERS_MIN)
  def partitionReadBuffersMax: Int = get(WORKER_PARTITION_READ_BUFFERS_MAX)
  def readBufferAllocationWait: Long = get(WORKER_READBUFFER_ALLOCATIONWAIT)
  def readBufferTargetRatio: Double = get(WORKER_READBUFFER_TARGET_RATIO)
  def readBufferTargetUpdateInterval: Long = get(WORKER_READBUFFER_TARGET_UPDATE_INTERVAL)
  def readBufferTargetNotifyThreshold: Long = get(WORKER_READBUFFER_TARGET_NOTIFY_THRESHOLD)
  def readBuffersToTriggerReadMin: Int = get(WORKER_READBUFFERS_TOTRIGGERREAD_MIN)

  // //////////////////////////////////////////////////////
  //                   Decommission                      //
  // //////////////////////////////////////////////////////
  def workerDecommissionCheckInterval: Long = get(WORKER_DECOMMISSION_CHECK_INTERVAL)
  def workerDecommissionForceExitTimeout: Long = get(WORKER_DECOMMISSION_FORCE_EXIT_TIMEOUT)

  // //////////////////////////////////////////////////////
  //            Graceful Shutdown & Recover              //
  // //////////////////////////////////////////////////////
  def workerGracefulShutdown: Boolean = get(WORKER_GRACEFUL_SHUTDOWN_ENABLED)
  def workerGracefulShutdownTimeoutMs: Long = get(WORKER_GRACEFUL_SHUTDOWN_TIMEOUT)
  def workerGracefulShutdownCheckSlotsFinishedInterval: Long =
    get(WORKER_CHECK_SLOTS_FINISHED_INTERVAL)
  def workerGracefulShutdownCheckSlotsFinishedTimeoutMs: Long =
    get(WORKER_CHECK_SLOTS_FINISHED_TIMEOUT)
  def workerGracefulShutdownRecoverPath: String = get(WORKER_GRACEFUL_SHUTDOWN_RECOVER_PATH)
  def workerGracefulShutdownPartitionSorterCloseAwaitTimeMs: Long =
    get(WORKER_PARTITION_SORTER_SHUTDOWN_TIMEOUT)
  def workerGracefulShutdownFlusherShutdownTimeoutMs: Long = get(WORKER_FLUSHER_SHUTDOWN_TIMEOUT)
  def workerGracefulShutdownSaveCommittedFileInfoInterval: Long =
    get(WORKER_GRACEFUL_SHUTDOWN_SAVE_COMMITTED_FILEINFO_INTERVAL)
  def workerGracefulShutdownSaveCommittedFileInfoSync: Boolean =
    get(WORKER_GRACEFUL_SHUTDOWN_SAVE_COMMITTED_FILEINFO_SYNC)

  // //////////////////////////////////////////////////////
  //                      Flusher                        //
  // //////////////////////////////////////////////////////
  def workerFlusherBufferSize: Long = get(WORKER_FLUSHER_BUFFER_SIZE)
  def workerHdfsFlusterBufferSize: Long = get(WORKER_HDFS_FLUSHER_BUFFER_SIZE)
  def workerWriterCloseTimeoutMs: Long = get(WORKER_WRITER_CLOSE_TIMEOUT)
  def workerHddFlusherThreads: Int = get(WORKER_FLUSHER_HDD_THREADS)
  def workerSsdFlusherThreads: Int = get(WORKER_FLUSHER_SSD_THREADS)
  def workerHdfsFlusherThreads: Int = get(WORKER_FLUSHER_HDFS_THREADS)
  def workerCreateWriterMaxAttempts: Int = get(WORKER_WRITER_CREATE_MAX_ATTEMPTS)

  // //////////////////////////////////////////////////////
  //                    Disk Monitor                     //
  // //////////////////////////////////////////////////////
  def workerDiskTimeSlidingWindowSize: Int = get(WORKER_DISKTIME_SLIDINGWINDOW_SIZE)
  def workerDiskTimeSlidingWindowMinFlushCount: Int =
    get(WORKER_DISKTIME_SLIDINGWINDOW_MINFLUSHCOUNT)
  def workerDiskTimeSlidingWindowMinFetchCount: Int =
    get(WORKER_DISKTIME_SLIDINGWINDOW_MINFETCHCOUNT)
  def workerDiskReserveSize: Long = get(WORKER_DISK_RESERVE_SIZE)
  def workerDiskMonitorEnabled: Boolean = get(WORKER_DISK_MONITOR_ENABLED)
  def workerDiskMonitorCheckList: Seq[String] = get(WORKER_DISK_MONITOR_CHECKLIST)
  def workerDiskMonitorCheckInterval: Long = get(WORKER_DISK_MONITOR_CHECK_INTERVAL)
  def workerDiskMonitorSysBlockDir: String = get(WORKER_DISK_MONITOR_SYS_BLOCK_DIR)
  def workerDiskMonitorNotifyErrorThreshold: Int = get(WORKER_DISK_MONITOR_NOTIFY_ERROR_THRESHOLD)
  def workerDiskMonitorNotifyErrorExpireTimeout: Long =
    get(WORKER_DISK_MONITOR_NOTIFY_ERROR_EXPIRE_TIMEOUT)
  def workerDiskMonitorStatusCheckTimeout: Long = get(WORKER_DEVICE_STATUS_CHECK_TIMEOUT)

  // //////////////////////////////////////////////////////
  //                  Memory Manager                     //
  // //////////////////////////////////////////////////////
  def workerDirectMemoryRatioToPauseReceive: Double = get(WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE)
  def workerDirectMemoryRatioToPauseReplicate: Double =
    get(WORKER_DIRECT_MEMORY_RATIO_PAUSE_REPLICATE)
  def workerDirectMemoryRatioToResume: Double = get(WORKER_DIRECT_MEMORY_RATIO_RESUME)
  def partitionSorterDirectMemoryRatioThreshold: Double =
    get(PARTITION_SORTER_DIRECT_MEMORY_RATIO_THRESHOLD)
  def workerDirectMemoryPressureCheckIntervalMs: Long = get(WORKER_DIRECT_MEMORY_CHECK_INTERVAL)
  def workerDirectMemoryReportIntervalSecond: Long = get(WORKER_DIRECT_MEMORY_REPORT_INTERVAL)
  def workerDirectMemoryTrimChannelWaitInterval: Long =
    get(WORKER_DIRECT_MEMORY_TRIM_CHANNEL_WAIT_INTERVAL)
  def workerDirectMemoryTrimFlushWaitInterval: Long =
    get(WORKER_DIRECT_MEMORY_TRIM_FLUSH_WAIT_INTERVAL)
  def workerDirectMemoryRatioForShuffleStorage: Double =
    get(WORKER_DIRECT_MEMORY_RATIO_FOR_SHUFFLE_STORAGE)

  // //////////////////////////////////////////////////////
  //                  Rate Limit controller              //
  // //////////////////////////////////////////////////////
  def workerCongestionControlEnabled: Boolean = get(WORKER_CONGESTION_CONTROL_ENABLED)
  def workerCongestionControlSampleTimeWindowSeconds: Long =
    get(WORKER_CONGESTION_CONTROL_SAMPLE_TIME_WINDOW)
  // TODO related to `WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE`,
  // `WORKER_DIRECT_MEMORY_RATIO_PAUSE_REPLICATE`and `WORKER_DIRECT_MEMORY_RATIO_RESUME`,
  // we'd better refine the logic among them
  def workerCongestionControlLowWatermark: Option[Long] =
    get(WORKER_CONGESTION_CONTROL_LOW_WATERMARK)
  def workerCongestionControlHighWatermark: Option[Long] =
    get(WORKER_CONGESTION_CONTROL_HIGH_WATERMARK)
  def workerCongestionControlUserInactiveIntervalMs: Long =
    get(WORKER_CONGESTION_CONTROL_USER_INACTIVE_INTERVAL)

  // //////////////////////////////////////////////////////
  //                 Columnar Shuffle                    //
  // //////////////////////////////////////////////////////
  def columnarShuffleEnabled: Boolean = get(COLUMNAR_SHUFFLE_ENABLED)
  def columnarShuffleBatchSize: Int = get(COLUMNAR_SHUFFLE_BATCH_SIZE)
  def columnarShuffleOffHeapEnabled: Boolean = get(COLUMNAR_SHUFFLE_OFF_HEAP_ENABLED)
  def columnarShuffleDictionaryEnabled: Boolean = get(COLUMNAR_SHUFFLE_DICTIONARY_ENCODING_ENABLED)
  def columnarShuffleDictionaryMaxFactor: Double =
    get(COLUMNAR_SHUFFLE_DICTIONARY_ENCODING_MAX_FACTOR)

  def columnarShuffleCodeGenEnabled: Boolean = get(COLUMNAR_SHUFFLE_CODEGEN_ENABLED)

  // //////////////////////////////////////////////////////
  //                      test                           //
  // //////////////////////////////////////////////////////
  def testFetchFailure: Boolean = get(TEST_CLIENT_FETCH_FAILURE)
  def testRetryCommitFiles: Boolean = get(TEST_CLIENT_RETRY_COMMIT_FILE)
  def testPushPrimaryDataTimeout: Boolean = get(TEST_CLIENT_PUSH_PRIMARY_DATA_TIMEOUT)
  def testPushReplicaDataTimeout: Boolean = get(TEST_WORKER_PUSH_REPLICA_DATA_TIMEOUT)
  def testRetryRevive: Boolean = get(TEST_CLIENT_RETRY_REVIVE)
  def testAlternative: String = get(TEST_ALTERNATIVE.key, "celeborn")
  def clientFlinkMemoryPerResultPartitionMin: Long = get(CLIENT_MEMORY_PER_RESULT_PARTITION_MIN)
  def clientFlinkMemoryPerResultPartition: Long = get(CLIENT_MEMORY_PER_RESULT_PARTITION)
  def clientFlinkMemoryPerInputGateMin: Long = get(CLIENT_MEMORY_PER_INPUT_GATE_MIN)
  def clientFlinkMemoryPerInputGate: Long = get(CLIENT_MEMORY_PER_INPUT_GATE)
  def clientFlinkNumConcurrentReading: Int = get(CLIENT_NUM_CONCURRENT_READINGS)
  def clientFlinkInputGateSupportFloatingBuffer: Boolean =
    get(CLIENT_INPUT_GATE_SUPPORT_FLOATING_BUFFER)
  def clientFlinkResultPartitionSupportFloatingBuffer: Boolean =
    get(CLIENT_RESULT_PARTITION_SUPPORT_FLOATING_BUFFER)
  def clientFlinkDataCompressionEnabled: Boolean = get(CLIENT_DATA_COMPRESSION_ENABLED)
}

object CelebornConf extends Logging {