in common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala [1264:1438]
def workerDecommissionCheckInterval: Long = get(WORKER_DECOMMISSION_CHECK_INTERVAL)
def workerDecommissionForceExitTimeout: Long = get(WORKER_DECOMMISSION_FORCE_EXIT_TIMEOUT)
// //////////////////////////////////////////////////////
// Graceful Shutdown & Recover //
// //////////////////////////////////////////////////////
def workerGracefulShutdown: Boolean = get(WORKER_GRACEFUL_SHUTDOWN_ENABLED)
def workerGracefulShutdownTimeoutMs: Long = get(WORKER_GRACEFUL_SHUTDOWN_TIMEOUT)
def workerGracefulShutdownCheckSlotsFinishedInterval: Long =
get(WORKER_CHECK_SLOTS_FINISHED_INTERVAL)
def workerGracefulShutdownCheckSlotsFinishedTimeoutMs: Long =
get(WORKER_CHECK_SLOTS_FINISHED_TIMEOUT)
def workerGracefulShutdownRecoverPath: String = get(WORKER_GRACEFUL_SHUTDOWN_RECOVER_PATH)
def workerGracefulShutdownRecoverDbBackend: String =
get(WORKER_GRACEFUL_SHUTDOWN_RECOVER_DB_BACKEND)
def workerGracefulShutdownPartitionSorterCloseAwaitTimeMs: Long =
get(WORKER_PARTITION_SORTER_SHUTDOWN_TIMEOUT)
def workerGracefulShutdownFlusherShutdownTimeoutMs: Long = get(WORKER_FLUSHER_SHUTDOWN_TIMEOUT)
def workerGracefulShutdownSaveCommittedFileInfoInterval: Long =
get(WORKER_GRACEFUL_SHUTDOWN_SAVE_COMMITTED_FILEINFO_INTERVAL)
def workerGracefulShutdownSaveCommittedFileInfoSync: Boolean =
get(WORKER_GRACEFUL_SHUTDOWN_SAVE_COMMITTED_FILEINFO_SYNC)
// //////////////////////////////////////////////////////
// Flusher //
// //////////////////////////////////////////////////////
def workerFlusherBufferSize: Long = get(WORKER_FLUSHER_BUFFER_SIZE)
def workerHdfsFlusherBufferSize: Long = get(WORKER_HDFS_FLUSHER_BUFFER_SIZE)
def workerS3FlusherBufferSize: Long = get(WORKER_S3_FLUSHER_BUFFER_SIZE)
def workerOssFlusherBufferSize: Long = get(WORKER_OSS_FLUSHER_BUFFER_SIZE)
def workerWriterCloseTimeoutMs: Long = get(WORKER_WRITER_CLOSE_TIMEOUT)
def workerHddFlusherThreads: Int = get(WORKER_FLUSHER_HDD_THREADS)
def workerSsdFlusherThreads: Int = get(WORKER_FLUSHER_SSD_THREADS)
def workerHdfsFlusherThreads: Int = get(WORKER_FLUSHER_HDFS_THREADS)
def workerS3FlusherThreads: Int = get(WORKER_FLUSHER_S3_THREADS)
def workerOssFlusherThreads: Int = get(WORKER_FLUSHER_OSS_THREADS)
def workerCreateWriterMaxAttempts: Int = get(WORKER_WRITER_CREATE_MAX_ATTEMPTS)
def workerFlusherLocalGatherAPIEnabled: Boolean = get(WORKER_FLUSHER_LOCAL_GATHER_API_ENABLED)
// //////////////////////////////////////////////////////
// Disk Monitor //
// //////////////////////////////////////////////////////
def workerDiskTimeSlidingWindowSize: Int = get(WORKER_DISKTIME_SLIDINGWINDOW_SIZE)
def workerDiskTimeSlidingWindowMinFlushCount: Int =
get(WORKER_DISKTIME_SLIDINGWINDOW_MINFLUSHCOUNT)
def workerDiskTimeSlidingWindowMinFetchCount: Int =
get(WORKER_DISKTIME_SLIDINGWINDOW_MINFETCHCOUNT)
def workerDiskReserveSize: Long = get(WORKER_DISK_RESERVE_SIZE)
def workerDiskReserveRatio: Option[Double] = get(WORKER_DISK_RESERVE_RATIO)
def workerDiskCleanThreads: Int = get(WORKER_DISK_CLEAN_THREADS)
def workerDiskMonitorEnabled: Boolean = get(WORKER_DISK_MONITOR_ENABLED)
def workerDiskMonitorCheckList: Seq[String] = get(WORKER_DISK_MONITOR_CHECKLIST)
def workerDiskMonitorCheckInterval: Long = get(WORKER_DISK_MONITOR_CHECK_INTERVAL)
def workerDiskMonitorSysBlockDir: String = get(WORKER_DISK_MONITOR_SYS_BLOCK_DIR)
def workerDiskMonitorNotifyErrorThreshold: Int = get(WORKER_DISK_MONITOR_NOTIFY_ERROR_THRESHOLD)
def workerDiskMonitorNotifyErrorExpireTimeout: Long =
get(WORKER_DISK_MONITOR_NOTIFY_ERROR_EXPIRE_TIMEOUT)
def workerDiskMonitorStatusCheckTimeout: Long = get(WORKER_DEVICE_STATUS_CHECK_TIMEOUT)
// //////////////////////////////////////////////////////
// Memory Manager //
// //////////////////////////////////////////////////////
def workerDirectMemoryRatioToPauseReceive: Double = get(WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE)
def workerDirectMemoryRatioToPauseReplicate: Double =
get(WORKER_DIRECT_MEMORY_RATIO_PAUSE_REPLICATE)
def workerDirectMemoryRatioToResume: Double = get(WORKER_DIRECT_MEMORY_RATIO_RESUME)
def workerPinnedMemoryRatioToResume: Double = get(WORKER_PINNED_MEMORY_RATIO_RESUME)
def workerPartitionSorterDirectMemoryRatioThreshold: Double =
get(WORKER_PARTITION_SORTER_DIRECT_MEMORY_RATIO_THRESHOLD)
def workerDirectMemoryPressureCheckIntervalMs: Long = get(WORKER_DIRECT_MEMORY_CHECK_INTERVAL)
def workerPinnedMemoryCheckEnabled: Boolean = get(WORKER_PINNED_MEMORY_CHECK_ENABLED)
def workerPinnedMemoryCheckIntervalMs: Long = get(WORKER_PINNED_MEMORY_CHECK_INTERVAL)
def workerPinnedMemoryResumeKeepTime: Long = get(WORKER_PINNED_MEMORY_RESUME_KEEP_TIME)
def workerDirectMemoryReportIntervalSecond: Long = get(WORKER_DIRECT_MEMORY_REPORT_INTERVAL)
def workerDirectMemoryTrimChannelWaitInterval: Long =
get(WORKER_DIRECT_MEMORY_TRIM_CHANNEL_WAIT_INTERVAL)
def workerDirectMemoryTrimFlushWaitInterval: Long =
get(WORKER_DIRECT_MEMORY_TRIM_FLUSH_WAIT_INTERVAL)
def workerDirectMemoryRatioForMemoryFilesStorage: Double =
get(WORKER_DIRECT_MEMORY_RATIO_FOR_MEMORY_FILE_STORAGE)
def workerMemoryFileStorageMaxFileSize: Long =
get(WORKER_MEMORY_FILE_STORAGE_MAX_FILE_SIZE)
def workerMemoryFileStorageEictAggressiveModeEnabled: Boolean =
get(WORKER_MEMORY_FILE_STORAGE_EVICT_AGGRESSIVE_MODE_ENABLED)
def workerMemoryFileStorageEvictRatio: Double =
get(WORKER_MEMORY_FILE_STORAGE_EVICT_RATIO)
// //////////////////////////////////////////////////////
// Rate Limit controller //
// //////////////////////////////////////////////////////
def workerCongestionControlEnabled: Boolean = get(WORKER_CONGESTION_CONTROL_ENABLED)
def workerCongestionControlSampleTimeWindowSeconds: Long =
get(WORKER_CONGESTION_CONTROL_SAMPLE_TIME_WINDOW)
// TODO related to `WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE`,
// `WORKER_DIRECT_MEMORY_RATIO_PAUSE_REPLICATE`and `WORKER_DIRECT_MEMORY_RATIO_RESUME`,
// we'd better refine the logic among them
def workerCongestionControlDiskBufferLowWatermark: Long =
get(WORKER_CONGESTION_CONTROL_DISK_BUFFER_LOW_WATERMARK)
def workerCongestionControlDiskBufferHighWatermark: Long =
get(WORKER_CONGESTION_CONTROL_DISK_BUFFER_HIGH_WATERMARK)
def workerCongestionControlUserProduceSpeedLowWatermark: Long =
get(WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_LOW_WATERMARK)
def workerCongestionControlUserProduceSpeedHighWatermark: Long =
get(WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_HIGH_WATERMARK)
def workerCongestionControlWorkerProduceSpeedLowWatermark: Long =
get(WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_LOW_WATERMARK)
def workerCongestionControlWorkerProduceSpeedHighWatermark: Long =
get(WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_HIGH_WATERMARK)
def workerCongestionControlUserInactiveIntervalMs: Long =
get(WORKER_CONGESTION_CONTROL_USER_INACTIVE_INTERVAL)
def workerCongestionControlCheckIntervalMs: Long = get(WORKER_CONGESTION_CONTROL_CHECK_INTERVAL)
// //////////////////////////////////////////////////////
// Columnar Shuffle //
// //////////////////////////////////////////////////////
def columnarShuffleEnabled: Boolean = get(COLUMNAR_SHUFFLE_ENABLED)
def columnarShuffleBatchSize: Int = get(COLUMNAR_SHUFFLE_BATCH_SIZE)
def columnarShuffleOffHeapEnabled: Boolean = get(COLUMNAR_SHUFFLE_OFF_HEAP_ENABLED)
def columnarShuffleDictionaryEnabled: Boolean = get(COLUMNAR_SHUFFLE_DICTIONARY_ENCODING_ENABLED)
def columnarShuffleDictionaryMaxFactor: Double =
get(COLUMNAR_SHUFFLE_DICTIONARY_ENCODING_MAX_FACTOR)
def columnarShuffleCodeGenEnabled: Boolean = get(COLUMNAR_SHUFFLE_CODEGEN_ENABLED)
// //////////////////////////////////////////////////////
// test //
// //////////////////////////////////////////////////////
def testFetchFailure: Boolean = get(TEST_CLIENT_FETCH_FAILURE)
def testMockDestroySlotsFailure: Boolean = get(TEST_CLIENT_MOCK_DESTROY_SLOTS_FAILURE)
def testMockCommitFilesFailure: Boolean = get(TEST_MOCK_COMMIT_FILES_FAILURE)
def testMockShuffleLost: Boolean = get(TEST_CLIENT_MOCK_SHUFFLE_LOST)
def testMockShuffleLostShuffle: Int = get(TEST_CLIENT_MOCK_SHUFFLE_LOST_SHUFFLE)
def testPushPrimaryDataTimeout: Boolean = get(TEST_CLIENT_PUSH_PRIMARY_DATA_TIMEOUT)
def testPushReplicaDataTimeout: Boolean = get(TEST_WORKER_PUSH_REPLICA_DATA_TIMEOUT)
def testRetryRevive: Boolean = get(TEST_CLIENT_RETRY_REVIVE)
def testAlternative: String = get(TEST_ALTERNATIVE.key, "celeborn")
def clientFlinkMemoryPerResultPartition: Long = get(CLIENT_MEMORY_PER_RESULT_PARTITION)
def clientFlinkMemoryPerInputGate: Long = get(CLIENT_MEMORY_PER_INPUT_GATE)
def clientFlinkNumConcurrentReading: Int = get(CLIENT_NUM_CONCURRENT_READINGS)
def clientFlinkInputGateSupportFloatingBuffer: Boolean =
get(CLIENT_INPUT_GATE_SUPPORT_FLOATING_BUFFER)
def clientFlinkResultPartitionSupportFloatingBuffer: Boolean =
get(CLIENT_RESULT_PARTITION_SUPPORT_FLOATING_BUFFER)
def clientFlinkDataCompressionEnabled: Boolean = get(CLIENT_DATA_COMPRESSION_ENABLED)
def clientChunkPrefetchEnabled = get(CLIENT_CHUNK_PREFETCH_ENABLED)
def clientInputStreamCreationWindow = get(CLIENT_INPUTSTREAM_CREATION_WINDOW)
def tagsEnabled: Boolean = get(TAGS_ENABLED)
def tagsExpr: String = get(TAGS_EXPR)
def preferClientTagsExpr: Boolean = get(PREFER_CLIENT_TAGS_EXPR)
// //////////////////////////////////////////////////////
// kerberos //
// //////////////////////////////////////////////////////
def hdfsStorageKerberosPrincipal = get(HDFS_STORAGE_KERBEROS_PRINCIPAL)
def hdfsStorageKerberosKeytab = get(HDFS_STORAGE_KERBEROS_KEYTAB)
// //////////////////////////////////////////////////////
// TLS //
// //////////////////////////////////////////////////////
private def getSslConfig[V](config: ConfigEntry[V], module: String): V = {
val valueOpt = getTransportConfImpl(module, config, config.valueConverter, allowDefault = false)
if (valueOpt.isDefined) {
return valueOpt.get
}
// Try without <module>, and if missing, use default
// replace the module wildcard and check for global value
val globalKey = config.key.replace(".<module>.", ".")
val defaultValue = if (config.defaultValue.isDefined) config.defaultValueString else null
config.valueConverter(get(globalKey, defaultValue))
}