def workerDecommissionCheckInterval: Long = get()

in common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala [1264:1438]


  def workerDecommissionCheckInterval: Long = get(WORKER_DECOMMISSION_CHECK_INTERVAL)
  def workerDecommissionForceExitTimeout: Long = get(WORKER_DECOMMISSION_FORCE_EXIT_TIMEOUT)

  // //////////////////////////////////////////////////////
  //            Graceful Shutdown & Recover              //
  // //////////////////////////////////////////////////////
  def workerGracefulShutdown: Boolean = get(WORKER_GRACEFUL_SHUTDOWN_ENABLED)
  def workerGracefulShutdownTimeoutMs: Long = get(WORKER_GRACEFUL_SHUTDOWN_TIMEOUT)
  def workerGracefulShutdownCheckSlotsFinishedInterval: Long =
    get(WORKER_CHECK_SLOTS_FINISHED_INTERVAL)
  def workerGracefulShutdownCheckSlotsFinishedTimeoutMs: Long =
    get(WORKER_CHECK_SLOTS_FINISHED_TIMEOUT)
  def workerGracefulShutdownRecoverPath: String = get(WORKER_GRACEFUL_SHUTDOWN_RECOVER_PATH)
  def workerGracefulShutdownRecoverDbBackend: String =
    get(WORKER_GRACEFUL_SHUTDOWN_RECOVER_DB_BACKEND)
  def workerGracefulShutdownPartitionSorterCloseAwaitTimeMs: Long =
    get(WORKER_PARTITION_SORTER_SHUTDOWN_TIMEOUT)
  def workerGracefulShutdownFlusherShutdownTimeoutMs: Long = get(WORKER_FLUSHER_SHUTDOWN_TIMEOUT)
  def workerGracefulShutdownSaveCommittedFileInfoInterval: Long =
    get(WORKER_GRACEFUL_SHUTDOWN_SAVE_COMMITTED_FILEINFO_INTERVAL)
  def workerGracefulShutdownSaveCommittedFileInfoSync: Boolean =
    get(WORKER_GRACEFUL_SHUTDOWN_SAVE_COMMITTED_FILEINFO_SYNC)

  // //////////////////////////////////////////////////////
  //                      Flusher                        //
  // //////////////////////////////////////////////////////
  def workerFlusherBufferSize: Long = get(WORKER_FLUSHER_BUFFER_SIZE)
  def workerHdfsFlusherBufferSize: Long = get(WORKER_HDFS_FLUSHER_BUFFER_SIZE)
  def workerS3FlusherBufferSize: Long = get(WORKER_S3_FLUSHER_BUFFER_SIZE)
  def workerOssFlusherBufferSize: Long = get(WORKER_OSS_FLUSHER_BUFFER_SIZE)
  def workerWriterCloseTimeoutMs: Long = get(WORKER_WRITER_CLOSE_TIMEOUT)
  def workerHddFlusherThreads: Int = get(WORKER_FLUSHER_HDD_THREADS)
  def workerSsdFlusherThreads: Int = get(WORKER_FLUSHER_SSD_THREADS)
  def workerHdfsFlusherThreads: Int = get(WORKER_FLUSHER_HDFS_THREADS)
  def workerS3FlusherThreads: Int = get(WORKER_FLUSHER_S3_THREADS)
  def workerOssFlusherThreads: Int = get(WORKER_FLUSHER_OSS_THREADS)
  def workerCreateWriterMaxAttempts: Int = get(WORKER_WRITER_CREATE_MAX_ATTEMPTS)
  def workerFlusherLocalGatherAPIEnabled: Boolean = get(WORKER_FLUSHER_LOCAL_GATHER_API_ENABLED)

  // //////////////////////////////////////////////////////
  //                    Disk Monitor                     //
  // //////////////////////////////////////////////////////
  def workerDiskTimeSlidingWindowSize: Int = get(WORKER_DISKTIME_SLIDINGWINDOW_SIZE)
  def workerDiskTimeSlidingWindowMinFlushCount: Int =
    get(WORKER_DISKTIME_SLIDINGWINDOW_MINFLUSHCOUNT)
  def workerDiskTimeSlidingWindowMinFetchCount: Int =
    get(WORKER_DISKTIME_SLIDINGWINDOW_MINFETCHCOUNT)
  def workerDiskReserveSize: Long = get(WORKER_DISK_RESERVE_SIZE)
  def workerDiskReserveRatio: Option[Double] = get(WORKER_DISK_RESERVE_RATIO)
  def workerDiskCleanThreads: Int = get(WORKER_DISK_CLEAN_THREADS)
  def workerDiskMonitorEnabled: Boolean = get(WORKER_DISK_MONITOR_ENABLED)
  def workerDiskMonitorCheckList: Seq[String] = get(WORKER_DISK_MONITOR_CHECKLIST)
  def workerDiskMonitorCheckInterval: Long = get(WORKER_DISK_MONITOR_CHECK_INTERVAL)
  def workerDiskMonitorSysBlockDir: String = get(WORKER_DISK_MONITOR_SYS_BLOCK_DIR)
  def workerDiskMonitorNotifyErrorThreshold: Int = get(WORKER_DISK_MONITOR_NOTIFY_ERROR_THRESHOLD)
  def workerDiskMonitorNotifyErrorExpireTimeout: Long =
    get(WORKER_DISK_MONITOR_NOTIFY_ERROR_EXPIRE_TIMEOUT)
  def workerDiskMonitorStatusCheckTimeout: Long = get(WORKER_DEVICE_STATUS_CHECK_TIMEOUT)

  // //////////////////////////////////////////////////////
  //                  Memory Manager                     //
  // //////////////////////////////////////////////////////
  def workerDirectMemoryRatioToPauseReceive: Double = get(WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE)
  def workerDirectMemoryRatioToPauseReplicate: Double =
    get(WORKER_DIRECT_MEMORY_RATIO_PAUSE_REPLICATE)
  def workerDirectMemoryRatioToResume: Double = get(WORKER_DIRECT_MEMORY_RATIO_RESUME)
  def workerPinnedMemoryRatioToResume: Double = get(WORKER_PINNED_MEMORY_RATIO_RESUME)
  def workerPartitionSorterDirectMemoryRatioThreshold: Double =
    get(WORKER_PARTITION_SORTER_DIRECT_MEMORY_RATIO_THRESHOLD)
  def workerDirectMemoryPressureCheckIntervalMs: Long = get(WORKER_DIRECT_MEMORY_CHECK_INTERVAL)
  def workerPinnedMemoryCheckEnabled: Boolean = get(WORKER_PINNED_MEMORY_CHECK_ENABLED)
  def workerPinnedMemoryCheckIntervalMs: Long = get(WORKER_PINNED_MEMORY_CHECK_INTERVAL)
  def workerPinnedMemoryResumeKeepTime: Long = get(WORKER_PINNED_MEMORY_RESUME_KEEP_TIME)
  def workerDirectMemoryReportIntervalSecond: Long = get(WORKER_DIRECT_MEMORY_REPORT_INTERVAL)
  def workerDirectMemoryTrimChannelWaitInterval: Long =
    get(WORKER_DIRECT_MEMORY_TRIM_CHANNEL_WAIT_INTERVAL)
  def workerDirectMemoryTrimFlushWaitInterval: Long =
    get(WORKER_DIRECT_MEMORY_TRIM_FLUSH_WAIT_INTERVAL)
  def workerDirectMemoryRatioForMemoryFilesStorage: Double =
    get(WORKER_DIRECT_MEMORY_RATIO_FOR_MEMORY_FILE_STORAGE)
  def workerMemoryFileStorageMaxFileSize: Long =
    get(WORKER_MEMORY_FILE_STORAGE_MAX_FILE_SIZE)
  def workerMemoryFileStorageEictAggressiveModeEnabled: Boolean =
    get(WORKER_MEMORY_FILE_STORAGE_EVICT_AGGRESSIVE_MODE_ENABLED)
  def workerMemoryFileStorageEvictRatio: Double =
    get(WORKER_MEMORY_FILE_STORAGE_EVICT_RATIO)

  // //////////////////////////////////////////////////////
  //                  Rate Limit controller              //
  // //////////////////////////////////////////////////////
  def workerCongestionControlEnabled: Boolean = get(WORKER_CONGESTION_CONTROL_ENABLED)
  def workerCongestionControlSampleTimeWindowSeconds: Long =
    get(WORKER_CONGESTION_CONTROL_SAMPLE_TIME_WINDOW)
  // TODO related to `WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE`,
  // `WORKER_DIRECT_MEMORY_RATIO_PAUSE_REPLICATE`and `WORKER_DIRECT_MEMORY_RATIO_RESUME`,
  // we'd better refine the logic among them
  def workerCongestionControlDiskBufferLowWatermark: Long =
    get(WORKER_CONGESTION_CONTROL_DISK_BUFFER_LOW_WATERMARK)
  def workerCongestionControlDiskBufferHighWatermark: Long =
    get(WORKER_CONGESTION_CONTROL_DISK_BUFFER_HIGH_WATERMARK)
  def workerCongestionControlUserProduceSpeedLowWatermark: Long =
    get(WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_LOW_WATERMARK)
  def workerCongestionControlUserProduceSpeedHighWatermark: Long =
    get(WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_HIGH_WATERMARK)
  def workerCongestionControlWorkerProduceSpeedLowWatermark: Long =
    get(WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_LOW_WATERMARK)
  def workerCongestionControlWorkerProduceSpeedHighWatermark: Long =
    get(WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_HIGH_WATERMARK)

  def workerCongestionControlUserInactiveIntervalMs: Long =
    get(WORKER_CONGESTION_CONTROL_USER_INACTIVE_INTERVAL)
  def workerCongestionControlCheckIntervalMs: Long = get(WORKER_CONGESTION_CONTROL_CHECK_INTERVAL)

  // //////////////////////////////////////////////////////
  //                 Columnar Shuffle                    //
  // //////////////////////////////////////////////////////
  def columnarShuffleEnabled: Boolean = get(COLUMNAR_SHUFFLE_ENABLED)
  def columnarShuffleBatchSize: Int = get(COLUMNAR_SHUFFLE_BATCH_SIZE)
  def columnarShuffleOffHeapEnabled: Boolean = get(COLUMNAR_SHUFFLE_OFF_HEAP_ENABLED)
  def columnarShuffleDictionaryEnabled: Boolean = get(COLUMNAR_SHUFFLE_DICTIONARY_ENCODING_ENABLED)
  def columnarShuffleDictionaryMaxFactor: Double =
    get(COLUMNAR_SHUFFLE_DICTIONARY_ENCODING_MAX_FACTOR)

  def columnarShuffleCodeGenEnabled: Boolean = get(COLUMNAR_SHUFFLE_CODEGEN_ENABLED)

  // //////////////////////////////////////////////////////
  //                      test                           //
  // //////////////////////////////////////////////////////
  def testFetchFailure: Boolean = get(TEST_CLIENT_FETCH_FAILURE)
  def testMockDestroySlotsFailure: Boolean = get(TEST_CLIENT_MOCK_DESTROY_SLOTS_FAILURE)
  def testMockCommitFilesFailure: Boolean = get(TEST_MOCK_COMMIT_FILES_FAILURE)
  def testMockShuffleLost: Boolean = get(TEST_CLIENT_MOCK_SHUFFLE_LOST)
  def testMockShuffleLostShuffle: Int = get(TEST_CLIENT_MOCK_SHUFFLE_LOST_SHUFFLE)
  def testPushPrimaryDataTimeout: Boolean = get(TEST_CLIENT_PUSH_PRIMARY_DATA_TIMEOUT)
  def testPushReplicaDataTimeout: Boolean = get(TEST_WORKER_PUSH_REPLICA_DATA_TIMEOUT)
  def testRetryRevive: Boolean = get(TEST_CLIENT_RETRY_REVIVE)
  def testAlternative: String = get(TEST_ALTERNATIVE.key, "celeborn")
  def clientFlinkMemoryPerResultPartition: Long = get(CLIENT_MEMORY_PER_RESULT_PARTITION)
  def clientFlinkMemoryPerInputGate: Long = get(CLIENT_MEMORY_PER_INPUT_GATE)
  def clientFlinkNumConcurrentReading: Int = get(CLIENT_NUM_CONCURRENT_READINGS)
  def clientFlinkInputGateSupportFloatingBuffer: Boolean =
    get(CLIENT_INPUT_GATE_SUPPORT_FLOATING_BUFFER)
  def clientFlinkResultPartitionSupportFloatingBuffer: Boolean =
    get(CLIENT_RESULT_PARTITION_SUPPORT_FLOATING_BUFFER)
  def clientFlinkDataCompressionEnabled: Boolean = get(CLIENT_DATA_COMPRESSION_ENABLED)
  def clientChunkPrefetchEnabled = get(CLIENT_CHUNK_PREFETCH_ENABLED)
  def clientInputStreamCreationWindow = get(CLIENT_INPUTSTREAM_CREATION_WINDOW)

  def tagsEnabled: Boolean = get(TAGS_ENABLED)
  def tagsExpr: String = get(TAGS_EXPR)
  def preferClientTagsExpr: Boolean = get(PREFER_CLIENT_TAGS_EXPR)

  // //////////////////////////////////////////////////////
  //                    kerberos                         //
  // //////////////////////////////////////////////////////
  def hdfsStorageKerberosPrincipal = get(HDFS_STORAGE_KERBEROS_PRINCIPAL)
  def hdfsStorageKerberosKeytab = get(HDFS_STORAGE_KERBEROS_KEYTAB)

  // //////////////////////////////////////////////////////
  //                     TLS                             //
  // //////////////////////////////////////////////////////
  private def getSslConfig[V](config: ConfigEntry[V], module: String): V = {
    val valueOpt = getTransportConfImpl(module, config, config.valueConverter, allowDefault = false)

    if (valueOpt.isDefined) {
      return valueOpt.get
    }

    // Try without <module>, and if missing, use default

    // replace the module wildcard and check for global value
    val globalKey = config.key.replace(".<module>.", ".")
    val defaultValue = if (config.defaultValue.isDefined) config.defaultValueString else null
    config.valueConverter(get(globalKey, defaultValue))
  }