def haMasterRatisRpcType: String = get()

in common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala [594:783]


  def haMasterRatisRpcType: String = get(HA_MASTER_RATIS_RPC_TYPE)
  def haMasterRatisStorageDir: String = get(HA_MASTER_RATIS_STORAGE_DIR)
  def haMasterRatisLogSegmentSizeMax: Long = get(HA_MASTER_RATIS_LOG_SEGMENT_SIZE_MAX)
  def haMasterRatisLogPreallocatedSize: Long = get(HA_MASTER_RATIS_LOG_PREALLOCATED_SIZE)
  def haMasterRatisLogAppenderQueueNumElements: Int =
    get(HA_MASTER_RATIS_LOG_APPENDER_QUEUE_NUM_ELEMENTS)
  def haMasterRatisLogAppenderQueueBytesLimit: Long =
    get(HA_MASTER_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT)
  def haMasterRatisLogPurgeGap: Int = get(HA_MASTER_RATIS_LOG_PURGE_GAP)
  def haMasterRatisLogInstallSnapshotEnabled: Boolean =
    get(HA_MASTER_RATIS_LOG_INSTALL_SNAPSHOT_ENABLED)
  def haMasterRatisRpcRequestTimeout: Long = get(HA_MASTER_RATIS_RPC_REQUEST_TIMEOUT)
  def haMasterRatisRetryCacheExpiryTime: Long = get(HA_MASTER_RATIS_SERVER_RETRY_CACHE_EXPIRY_TIME)
  def haMasterRatisRpcTimeoutMin: Long = get(HA_MASTER_RATIS_RPC_TIMEOUT_MIN)
  def haMasterRatisRpcTimeoutMax: Long = get(HA_MASTER_RATIS_RPC_TIMEOUT_MAX)
  def haMasterRatisFirstElectionTimeoutMin: Long = get(HA_MASTER_RATIS_FIRSTELECTION_TIMEOUT_MIN)
  def haMasterRatisFristElectionTimeoutMax: Long = get(HA_MASTER_RATIS_FIRSTELECTION_TIMEOUT_MAX)
  def haMasterRatisNotificationNoLeaderTimeout: Long =
    get(HA_MASTER_RATIS_NOTIFICATION_NO_LEADER_TIMEOUT)
  def haMasterRatisRpcSlownessTimeout: Long = get(HA_MASTER_RATIS_RPC_SLOWNESS_TIMEOUT)
  def haMasterRatisRoleCheckInterval: Long = get(HA_MASTER_RATIS_ROLE_CHECK_INTERVAL)
  def haMasterRatisSnapshotAutoTriggerEnabled: Boolean =
    get(HA_MASTER_RATIS_SNAPSHOT_AUTO_TRIGGER_ENABLED)
  def haMasterRatisSnapshotAutoTriggerThreshold: Long =
    get(HA_MASTER_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD)
  def haMasterRatisSnapshotRetentionFileNum: Int = get(HA_MASTER_RATIS_SNAPSHOT_RETENTION_FILE_NUM)

  // //////////////////////////////////////////////////////
  //                      Worker                         //
  // //////////////////////////////////////////////////////
  def workerRpcPort: Int = get(WORKER_RPC_PORT)
  def workerPushPort: Int = get(WORKER_PUSH_PORT)
  def workerFetchPort: Int = get(WORKER_FETCH_PORT)
  def workerReplicatePort: Int = get(WORKER_REPLICATE_PORT)
  def workerPushIoThreads: Option[Int] = get(WORKER_PUSH_IO_THREADS)
  def workerFetchIoThreads: Option[Int] = get(WORKER_FETCH_IO_THREADS)
  def workerReplicateIoThreads: Option[Int] = get(WORKER_REPLICATE_IO_THREADS)
  def registerWorkerTimeout: Long = get(WORKER_REGISTER_TIMEOUT)
  def workerWorkingDir: String = get(WORKER_WORKING_DIR)
  def workerCloseIdleConnections: Boolean = get(WORKER_CLOSE_IDLE_CONNECTIONS)
  def workerReplicateFastFailDuration: Long = get(WORKER_REPLICATE_FAST_FAIL_DURATION)
  def workerReplicateRandomConnectionEnabled: Boolean =
    get(WORKER_REPLICATE_RANDOM_CONNECTION_ENABLED)
  def workerCheckFileCleanMaxRetries: Int = get(WORKER_CHECK_FILE_CLEAN_MAX_RETRIES)
  def workerCheckFileCleanTimeout: Long = get(WORKER_CHECK_FILE_CLEAN_TIMEOUT)
  def workerHeartbeatTimeout: Long = get(WORKER_HEARTBEAT_TIMEOUT)
  def workerReplicateThreads: Int = get(WORKER_REPLICATE_THREADS)
  def workerCommitThreads: Int =
    if (hasHDFSStorage) Math.max(128, get(WORKER_COMMIT_THREADS)) else get(WORKER_COMMIT_THREADS)
  def workerShuffleCommitTimeout: Long = get(WORKER_SHUFFLE_COMMIT_TIMEOUT)
  def minPartitionSizeToEstimate: Long = get(ESTIMATED_PARTITION_SIZE_MIN_SIZE)
  def partitionSorterSortPartitionTimeout: Long = get(PARTITION_SORTER_SORT_TIMEOUT)
  def partitionSorterReservedMemoryPerPartition: Long =
    get(WORKER_PARTITION_SORTER_PER_PARTITION_RESERVED_MEMORY)
  def partitionSorterThreads: Int =
    get(PARTITION_SORTER_THREADS).getOrElse(Runtime.getRuntime.availableProcessors)
  def workerPushHeartbeatEnabled: Boolean = get(WORKER_PUSH_HEARTBEAT_ENABLED)
  def workerPushMaxComponents: Int = get(WORKER_PUSH_COMPOSITEBUFFER_MAXCOMPONENTS)
  def workerFetchHeartbeatEnabled: Boolean = get(WORKER_FETCH_HEARTBEAT_ENABLED)
  def workerPartitionSplitEnabled: Boolean = get(WORKER_PARTITION_SPLIT_ENABLED)

  // //////////////////////////////////////////////////////
  //                 Metrics System                      //
  // //////////////////////////////////////////////////////
  def metricsConf: Option[String] = get(METRICS_CONF)
  def metricsSystemEnable: Boolean = get(METRICS_ENABLED)
  def metricsSampleRate: Double = get(METRICS_SAMPLE_RATE)
  def metricsSlidingWindowSize: Int = get(METRICS_SLIDING_WINDOW_SIZE)
  def metricsCollectCriticalEnabled: Boolean = get(METRICS_COLLECT_CRITICAL_ENABLED)
  def metricsCapacity: Int = get(METRICS_CAPACITY)
  def masterPrometheusMetricHost: String =
    get(MASTER_PROMETHEUS_HOST).replace("<localhost>", Utils.localHostName(this))
  def masterPrometheusMetricPort: Int = get(MASTER_PROMETHEUS_PORT)
  def workerPrometheusMetricHost: String =
    get(WORKER_PROMETHEUS_HOST).replace("<localhost>", Utils.localHostName(this))
  def workerPrometheusMetricPort: Int = get(WORKER_PROMETHEUS_PORT)
  def metricsExtraLabels: Map[String, String] =
    get(METRICS_EXTRA_LABELS).map(Utils.parseMetricLabels).toMap
  def metricsAppTopDiskUsageCount: Int = get(METRICS_APP_TOP_DISK_USAGE_COUNT)
  def metricsAppTopDiskUsageWindowSize: Int = get(METRICS_APP_TOP_DISK_USAGE_WINDOW_SIZE)
  def metricsAppTopDiskUsageInterval: Long = get(METRICS_APP_TOP_DISK_USAGE_INTERVAL)

  // //////////////////////////////////////////////////////
  //                      Quota                         //
  // //////////////////////////////////////////////////////
  def quotaEnabled: Boolean = get(QUOTA_ENABLED)
  def quotaIdentityProviderClass: String = get(QUOTA_IDENTITY_PROVIDER)
  def quotaManagerClass: String = get(QUOTA_MANAGER)
  def quotaConfigurationPath: Option[String] = get(QUOTA_CONFIGURATION_PATH)
  def quotaUserSpecificTenant: String = get(QUOTA_USER_SPECIFIC_TENANT)
  def quotaUserSpecificUserName: String = get(QUOTA_USER_SPECIFIC_USERNAME)

  // //////////////////////////////////////////////////////
  //                      Client                         //
  // //////////////////////////////////////////////////////
  def clientCloseIdleConnections: Boolean = get(CLIENT_CLOSE_IDLE_CONNECTIONS)
  def clientRegisterShuffleMaxRetry: Int = get(CLIENT_REGISTER_SHUFFLE_MAX_RETRIES)
  def clientRegisterShuffleRetryWaitMs: Long = get(CLIENT_REGISTER_SHUFFLE_RETRY_WAIT)
  def clientReserveSlotsRackAwareEnabled: Boolean = get(CLIENT_RESERVE_SLOTS_RACKAWARE_ENABLED)
  def clientReserveSlotsMaxRetries: Int = get(CLIENT_RESERVE_SLOTS_MAX_RETRIES)
  def clientReserveSlotsRetryWait: Long = get(CLIENT_RESERVE_SLOTS_RETRY_WAIT)
  def clientRequestCommitFilesMaxRetries: Int = get(CLIENT_COMMIT_FILE_REQUEST_MAX_RETRY)
  def clientCommitFilesIgnoreExcludedWorkers: Boolean = get(CLIENT_COMMIT_IGNORE_EXCLUDED_WORKERS)
  def clientRpcMaxParallelism: Int = get(CLIENT_RPC_MAX_PARALLELISM)
  def appHeartbeatTimeoutMs: Long = get(APPLICATION_HEARTBEAT_TIMEOUT)
  def hdfsExpireDirsTimeoutMS: Long = get(HDFS_EXPIRE_DIRS_TIMEOUT)
  def appHeartbeatIntervalMs: Long = get(APPLICATION_HEARTBEAT_INTERVAL)
  def clientCheckedUseAllocatedWorkers: Boolean = get(CLIENT_CHECKED_USE_ALLOCATED_WORKERS)
  def clientExcludedWorkerExpireTimeout: Long = get(CLIENT_EXCLUDED_WORKER_EXPIRE_TIMEOUT)
  def clientExcludeReplicaOnFailureEnabled: Boolean =
    get(CLIENT_EXCLUDE_PEER_WORKER_ON_FAILURE_ENABLED)

  // //////////////////////////////////////////////////////
  //               Shuffle Compression                   //
  // //////////////////////////////////////////////////////
  def shuffleCompressionCodec: CompressionCodec =
    CompressionCodec.valueOf(get(SHUFFLE_COMPRESSION_CODEC))
  def shuffleCompressionZstdCompressLevel: Int = get(SHUFFLE_COMPRESSION_ZSTD_LEVEL)

  // //////////////////////////////////////////////////////
  //                Shuffle Client RPC                   //
  // //////////////////////////////////////////////////////
  def clientRpcCacheSize: Int = get(CLIENT_RPC_CACHE_SIZE)
  def clientRpcCacheConcurrencyLevel: Int = get(CLIENT_RPC_CACHE_CONCURRENCY_LEVEL)
  def clientRpcReserveSlotsRpcTimeout: RpcTimeout =
    new RpcTimeout(
      get(CLIENT_RESERVE_SLOTS_RPC_TIMEOUT).milli,
      CLIENT_RESERVE_SLOTS_RPC_TIMEOUT.key)

  def clientRpcRegisterShuffleRpcAskTimeout: RpcTimeout =
    new RpcTimeout(
      get(CLIENT_RPC_REGISTER_SHUFFLE_RPC_ASK_TIMEOUT).milli,
      CLIENT_RPC_REGISTER_SHUFFLE_RPC_ASK_TIMEOUT.key)

  def clientRpcRequestPartitionLocationRpcAskTimeout: RpcTimeout =
    new RpcTimeout(
      get(CLIENT_RPC_REQUEST_PARTITION_LOCATION_RPC_ASK_TIMEOUT).milli,
      CLIENT_RPC_REQUEST_PARTITION_LOCATION_RPC_ASK_TIMEOUT.key)

  def clientRpcGetReducerFileGroupRpcAskTimeout: RpcTimeout =
    new RpcTimeout(
      get(CLIENT_RPC_GET_REDUCER_FILE_GROUP_RPC_ASK_TIMEOUT).milli,
      CLIENT_RPC_GET_REDUCER_FILE_GROUP_RPC_ASK_TIMEOUT.key)

  // //////////////////////////////////////////////////////
  //               Shuffle Client Fetch                  //
  // //////////////////////////////////////////////////////
  def clientFetchTimeoutMs: Long = get(CLIENT_FETCH_TIMEOUT)
  def clientFetchMaxReqsInFlight: Int = get(CLIENT_FETCH_MAX_REQS_IN_FLIGHT)
  def clientFetchMaxRetriesForEachReplica: Int = get(CLIENT_FETCH_MAX_RETRIES_FOR_EACH_REPLICA)
  def clientFetchExcludeWorkerOnFailureEnabled: Boolean =
    get(CLIENT_FETCH_EXCLUDE_WORKER_ON_FAILURE_ENABLED)
  def clientFetchExcludedWorkerExpireTimeout: Long =
    get(CLIENT_FETCH_EXCLUDED_WORKER_EXPIRE_TIMEOUT)

  // //////////////////////////////////////////////////////
  //               Shuffle Client Push                   //
  // //////////////////////////////////////////////////////
  def clientPushReplicateEnabled: Boolean = get(CLIENT_PUSH_REPLICATE_ENABLED)
  def clientPushBufferInitialSize: Int = get(CLIENT_PUSH_BUFFER_INITIAL_SIZE).toInt
  def clientPushBufferMaxSize: Int = get(CLIENT_PUSH_BUFFER_MAX_SIZE).toInt
  def clientPushQueueCapacity: Int = get(CLIENT_PUSH_QUEUE_CAPACITY)
  def clientPushExcludeWorkerOnFailureEnabled: Boolean =
    get(CLIENT_PUSH_EXCLUDE_WORKER_ON_FAILURE_ENABLED)
  def clientPushMaxReqsInFlightPerWorker: Int = get(CLIENT_PUSH_MAX_REQS_IN_FLIGHT_PERWORKER)
  def clientPushMaxReqsInFlightTotal: Int = get(CLIENT_PUSH_MAX_REQS_IN_FLIGHT_TOTAL)
  def clientPushMaxReviveTimes: Int = get(CLIENT_PUSH_MAX_REVIVE_TIMES)
  def clientPushReviveInterval: Long = get(CLIENT_PUSH_REVIVE_INTERVAL)
  def clientPushReviveBatchSize: Int = get(CLIENT_PUSH_REVIVE_BATCHSIZE)
  def clientPushSortMemoryThreshold: Long = get(CLIENT_PUSH_SORT_MEMORY_THRESHOLD)
  def clientPushSortPipelineEnabled: Boolean = get(CLIENT_PUSH_SORT_PIPELINE_ENABLED)
  def clientPushSortRandomizePartitionIdEnabled: Boolean =
    get(CLIENT_PUSH_SORT_RANDOMIZE_PARTITION_ENABLED)
  def clientPushRetryThreads: Int = get(CLIENT_PUSH_RETRY_THREADS)
  def clientPushStageEndTimeout: Long = get(CLIENT_PUSH_STAGE_END_TIMEOUT)
  def clientPushUnsafeRowFastWrite: Boolean = get(CLIENT_PUSH_UNSAFEROW_FASTWRITE_ENABLED)
  def clientRpcCacheExpireTime: Long = get(CLIENT_RPC_CACHE_EXPIRE_TIME)
  def pushDataTimeoutMs: Long = get(CLIENT_PUSH_DATA_TIMEOUT)
  def clientPushLimitStrategy: String = get(CLIENT_PUSH_LIMIT_STRATEGY)
  def clientPushSlowStartInitialSleepTime: Long = get(CLIENT_PUSH_SLOW_START_INITIAL_SLEEP_TIME)
  def clientSlotAssignMaxWorkers: Int = get(CLIENT_SLOT_ASSIGN_MAX_WORKERS)
  def clientPushSlowStartMaxSleepMills: Long = get(CLIENT_PUSH_SLOW_START_MAX_SLEEP_TIME)
  def clientPushLimitInFlightTimeoutMs: Long =
    if (clientPushReplicateEnabled) {
      get(CLIENT_PUSH_LIMIT_IN_FLIGHT_TIMEOUT).getOrElse(
        pushDataTimeoutMs * clientPushMaxReviveTimes * 4)
    } else {
      get(CLIENT_PUSH_LIMIT_IN_FLIGHT_TIMEOUT).getOrElse(
        pushDataTimeoutMs * clientPushMaxReviveTimes * 2)
    }