in common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala [894:1026]
def workerStorageBaseDirPrefix: String = get(WORKER_STORAGE_BASE_DIR_PREFIX)
def workerStorageBaseDirNumber: Int = get(WORKER_STORAGE_BASE_DIR_COUNT)
def creditStreamThreadsPerMountpoint: Int = get(WORKER_BUFFERSTREAM_THREADS_PER_MOUNTPOINT)
def workerDirectMemoryRatioForReadBuffer: Double = get(WORKER_DIRECT_MEMORY_RATIO_FOR_READ_BUFFER)
def partitionReadBuffersMin: Int = get(WORKER_PARTITION_READ_BUFFERS_MIN)
def partitionReadBuffersMax: Int = get(WORKER_PARTITION_READ_BUFFERS_MAX)
def readBufferAllocationWait: Long = get(WORKER_READBUFFER_ALLOCATIONWAIT)
def readBufferTargetRatio: Double = get(WORKER_READBUFFER_TARGET_RATIO)
def readBufferTargetUpdateInterval: Long = get(WORKER_READBUFFER_TARGET_UPDATE_INTERVAL)
def readBufferTargetNotifyThreshold: Long = get(WORKER_READBUFFER_TARGET_NOTIFY_THRESHOLD)
def readBuffersToTriggerReadMin: Int = get(WORKER_READBUFFERS_TOTRIGGERREAD_MIN)
// //////////////////////////////////////////////////////
// Decommission //
// //////////////////////////////////////////////////////
def workerDecommissionCheckInterval: Long = get(WORKER_DECOMMISSION_CHECK_INTERVAL)
def workerDecommissionForceExitTimeout: Long = get(WORKER_DECOMMISSION_FORCE_EXIT_TIMEOUT)
// //////////////////////////////////////////////////////
// Graceful Shutdown & Recover //
// //////////////////////////////////////////////////////
def workerGracefulShutdown: Boolean = get(WORKER_GRACEFUL_SHUTDOWN_ENABLED)
def workerGracefulShutdownTimeoutMs: Long = get(WORKER_GRACEFUL_SHUTDOWN_TIMEOUT)
def workerGracefulShutdownCheckSlotsFinishedInterval: Long =
get(WORKER_CHECK_SLOTS_FINISHED_INTERVAL)
def workerGracefulShutdownCheckSlotsFinishedTimeoutMs: Long =
get(WORKER_CHECK_SLOTS_FINISHED_TIMEOUT)
def workerGracefulShutdownRecoverPath: String = get(WORKER_GRACEFUL_SHUTDOWN_RECOVER_PATH)
def workerGracefulShutdownPartitionSorterCloseAwaitTimeMs: Long =
get(WORKER_PARTITION_SORTER_SHUTDOWN_TIMEOUT)
def workerGracefulShutdownFlusherShutdownTimeoutMs: Long = get(WORKER_FLUSHER_SHUTDOWN_TIMEOUT)
def workerGracefulShutdownSaveCommittedFileInfoInterval: Long =
get(WORKER_GRACEFUL_SHUTDOWN_SAVE_COMMITTED_FILEINFO_INTERVAL)
def workerGracefulShutdownSaveCommittedFileInfoSync: Boolean =
get(WORKER_GRACEFUL_SHUTDOWN_SAVE_COMMITTED_FILEINFO_SYNC)
// //////////////////////////////////////////////////////
// Flusher //
// //////////////////////////////////////////////////////
def workerFlusherBufferSize: Long = get(WORKER_FLUSHER_BUFFER_SIZE)
def workerHdfsFlusterBufferSize: Long = get(WORKER_HDFS_FLUSHER_BUFFER_SIZE)
def workerWriterCloseTimeoutMs: Long = get(WORKER_WRITER_CLOSE_TIMEOUT)
def workerHddFlusherThreads: Int = get(WORKER_FLUSHER_HDD_THREADS)
def workerSsdFlusherThreads: Int = get(WORKER_FLUSHER_SSD_THREADS)
def workerHdfsFlusherThreads: Int = get(WORKER_FLUSHER_HDFS_THREADS)
def workerCreateWriterMaxAttempts: Int = get(WORKER_WRITER_CREATE_MAX_ATTEMPTS)
// //////////////////////////////////////////////////////
// Disk Monitor //
// //////////////////////////////////////////////////////
def workerDiskTimeSlidingWindowSize: Int = get(WORKER_DISKTIME_SLIDINGWINDOW_SIZE)
def workerDiskTimeSlidingWindowMinFlushCount: Int =
get(WORKER_DISKTIME_SLIDINGWINDOW_MINFLUSHCOUNT)
def workerDiskTimeSlidingWindowMinFetchCount: Int =
get(WORKER_DISKTIME_SLIDINGWINDOW_MINFETCHCOUNT)
def workerDiskReserveSize: Long = get(WORKER_DISK_RESERVE_SIZE)
def workerDiskMonitorEnabled: Boolean = get(WORKER_DISK_MONITOR_ENABLED)
def workerDiskMonitorCheckList: Seq[String] = get(WORKER_DISK_MONITOR_CHECKLIST)
def workerDiskMonitorCheckInterval: Long = get(WORKER_DISK_MONITOR_CHECK_INTERVAL)
def workerDiskMonitorSysBlockDir: String = get(WORKER_DISK_MONITOR_SYS_BLOCK_DIR)
def workerDiskMonitorNotifyErrorThreshold: Int = get(WORKER_DISK_MONITOR_NOTIFY_ERROR_THRESHOLD)
def workerDiskMonitorNotifyErrorExpireTimeout: Long =
get(WORKER_DISK_MONITOR_NOTIFY_ERROR_EXPIRE_TIMEOUT)
def workerDiskMonitorStatusCheckTimeout: Long = get(WORKER_DEVICE_STATUS_CHECK_TIMEOUT)
// //////////////////////////////////////////////////////
// Memory Manager //
// //////////////////////////////////////////////////////
def workerDirectMemoryRatioToPauseReceive: Double = get(WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE)
def workerDirectMemoryRatioToPauseReplicate: Double =
get(WORKER_DIRECT_MEMORY_RATIO_PAUSE_REPLICATE)
def workerDirectMemoryRatioToResume: Double = get(WORKER_DIRECT_MEMORY_RATIO_RESUME)
def partitionSorterDirectMemoryRatioThreshold: Double =
get(PARTITION_SORTER_DIRECT_MEMORY_RATIO_THRESHOLD)
def workerDirectMemoryPressureCheckIntervalMs: Long = get(WORKER_DIRECT_MEMORY_CHECK_INTERVAL)
def workerDirectMemoryReportIntervalSecond: Long = get(WORKER_DIRECT_MEMORY_REPORT_INTERVAL)
def workerDirectMemoryTrimChannelWaitInterval: Long =
get(WORKER_DIRECT_MEMORY_TRIM_CHANNEL_WAIT_INTERVAL)
def workerDirectMemoryTrimFlushWaitInterval: Long =
get(WORKER_DIRECT_MEMORY_TRIM_FLUSH_WAIT_INTERVAL)
def workerDirectMemoryRatioForShuffleStorage: Double =
get(WORKER_DIRECT_MEMORY_RATIO_FOR_SHUFFLE_STORAGE)
// //////////////////////////////////////////////////////
// Rate Limit controller //
// //////////////////////////////////////////////////////
def workerCongestionControlEnabled: Boolean = get(WORKER_CONGESTION_CONTROL_ENABLED)
def workerCongestionControlSampleTimeWindowSeconds: Long =
get(WORKER_CONGESTION_CONTROL_SAMPLE_TIME_WINDOW)
// TODO related to `WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE`,
// `WORKER_DIRECT_MEMORY_RATIO_PAUSE_REPLICATE`and `WORKER_DIRECT_MEMORY_RATIO_RESUME`,
// we'd better refine the logic among them
def workerCongestionControlLowWatermark: Option[Long] =
get(WORKER_CONGESTION_CONTROL_LOW_WATERMARK)
def workerCongestionControlHighWatermark: Option[Long] =
get(WORKER_CONGESTION_CONTROL_HIGH_WATERMARK)
def workerCongestionControlUserInactiveIntervalMs: Long =
get(WORKER_CONGESTION_CONTROL_USER_INACTIVE_INTERVAL)
// //////////////////////////////////////////////////////
// Columnar Shuffle //
// //////////////////////////////////////////////////////
def columnarShuffleEnabled: Boolean = get(COLUMNAR_SHUFFLE_ENABLED)
def columnarShuffleBatchSize: Int = get(COLUMNAR_SHUFFLE_BATCH_SIZE)
def columnarShuffleOffHeapEnabled: Boolean = get(COLUMNAR_SHUFFLE_OFF_HEAP_ENABLED)
def columnarShuffleDictionaryEnabled: Boolean = get(COLUMNAR_SHUFFLE_DICTIONARY_ENCODING_ENABLED)
def columnarShuffleDictionaryMaxFactor: Double =
get(COLUMNAR_SHUFFLE_DICTIONARY_ENCODING_MAX_FACTOR)
def columnarShuffleCodeGenEnabled: Boolean = get(COLUMNAR_SHUFFLE_CODEGEN_ENABLED)
// //////////////////////////////////////////////////////
// test //
// //////////////////////////////////////////////////////
def testFetchFailure: Boolean = get(TEST_CLIENT_FETCH_FAILURE)
def testRetryCommitFiles: Boolean = get(TEST_CLIENT_RETRY_COMMIT_FILE)
def testPushPrimaryDataTimeout: Boolean = get(TEST_CLIENT_PUSH_PRIMARY_DATA_TIMEOUT)
def testPushReplicaDataTimeout: Boolean = get(TEST_WORKER_PUSH_REPLICA_DATA_TIMEOUT)
def testRetryRevive: Boolean = get(TEST_CLIENT_RETRY_REVIVE)
def testAlternative: String = get(TEST_ALTERNATIVE.key, "celeborn")
def clientFlinkMemoryPerResultPartitionMin: Long = get(CLIENT_MEMORY_PER_RESULT_PARTITION_MIN)
def clientFlinkMemoryPerResultPartition: Long = get(CLIENT_MEMORY_PER_RESULT_PARTITION)
def clientFlinkMemoryPerInputGateMin: Long = get(CLIENT_MEMORY_PER_INPUT_GATE_MIN)
def clientFlinkMemoryPerInputGate: Long = get(CLIENT_MEMORY_PER_INPUT_GATE)
def clientFlinkNumConcurrentReading: Int = get(CLIENT_NUM_CONCURRENT_READINGS)
def clientFlinkInputGateSupportFloatingBuffer: Boolean =
get(CLIENT_INPUT_GATE_SUPPORT_FLOATING_BUFFER)
def clientFlinkResultPartitionSupportFloatingBuffer: Boolean =
get(CLIENT_RESULT_PARTITION_SUPPORT_FLOATING_BUFFER)
def clientFlinkDataCompressionEnabled: Boolean = get(CLIENT_DATA_COMPRESSION_ENABLED)
}
object CelebornConf extends Logging {