in common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala [594:783]
def haMasterRatisRpcType: String = get(HA_MASTER_RATIS_RPC_TYPE)
def haMasterRatisStorageDir: String = get(HA_MASTER_RATIS_STORAGE_DIR)
def haMasterRatisLogSegmentSizeMax: Long = get(HA_MASTER_RATIS_LOG_SEGMENT_SIZE_MAX)
def haMasterRatisLogPreallocatedSize: Long = get(HA_MASTER_RATIS_LOG_PREALLOCATED_SIZE)
def haMasterRatisLogAppenderQueueNumElements: Int =
get(HA_MASTER_RATIS_LOG_APPENDER_QUEUE_NUM_ELEMENTS)
def haMasterRatisLogAppenderQueueBytesLimit: Long =
get(HA_MASTER_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT)
def haMasterRatisLogPurgeGap: Int = get(HA_MASTER_RATIS_LOG_PURGE_GAP)
def haMasterRatisLogInstallSnapshotEnabled: Boolean =
get(HA_MASTER_RATIS_LOG_INSTALL_SNAPSHOT_ENABLED)
def haMasterRatisRpcRequestTimeout: Long = get(HA_MASTER_RATIS_RPC_REQUEST_TIMEOUT)
def haMasterRatisRetryCacheExpiryTime: Long = get(HA_MASTER_RATIS_SERVER_RETRY_CACHE_EXPIRY_TIME)
def haMasterRatisRpcTimeoutMin: Long = get(HA_MASTER_RATIS_RPC_TIMEOUT_MIN)
def haMasterRatisRpcTimeoutMax: Long = get(HA_MASTER_RATIS_RPC_TIMEOUT_MAX)
def haMasterRatisFirstElectionTimeoutMin: Long = get(HA_MASTER_RATIS_FIRSTELECTION_TIMEOUT_MIN)
def haMasterRatisFristElectionTimeoutMax: Long = get(HA_MASTER_RATIS_FIRSTELECTION_TIMEOUT_MAX)
def haMasterRatisNotificationNoLeaderTimeout: Long =
get(HA_MASTER_RATIS_NOTIFICATION_NO_LEADER_TIMEOUT)
def haMasterRatisRpcSlownessTimeout: Long = get(HA_MASTER_RATIS_RPC_SLOWNESS_TIMEOUT)
def haMasterRatisRoleCheckInterval: Long = get(HA_MASTER_RATIS_ROLE_CHECK_INTERVAL)
def haMasterRatisSnapshotAutoTriggerEnabled: Boolean =
get(HA_MASTER_RATIS_SNAPSHOT_AUTO_TRIGGER_ENABLED)
def haMasterRatisSnapshotAutoTriggerThreshold: Long =
get(HA_MASTER_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD)
def haMasterRatisSnapshotRetentionFileNum: Int = get(HA_MASTER_RATIS_SNAPSHOT_RETENTION_FILE_NUM)
// //////////////////////////////////////////////////////
// Worker //
// //////////////////////////////////////////////////////
def workerRpcPort: Int = get(WORKER_RPC_PORT)
def workerPushPort: Int = get(WORKER_PUSH_PORT)
def workerFetchPort: Int = get(WORKER_FETCH_PORT)
def workerReplicatePort: Int = get(WORKER_REPLICATE_PORT)
def workerPushIoThreads: Option[Int] = get(WORKER_PUSH_IO_THREADS)
def workerFetchIoThreads: Option[Int] = get(WORKER_FETCH_IO_THREADS)
def workerReplicateIoThreads: Option[Int] = get(WORKER_REPLICATE_IO_THREADS)
def registerWorkerTimeout: Long = get(WORKER_REGISTER_TIMEOUT)
def workerWorkingDir: String = get(WORKER_WORKING_DIR)
def workerCloseIdleConnections: Boolean = get(WORKER_CLOSE_IDLE_CONNECTIONS)
def workerReplicateFastFailDuration: Long = get(WORKER_REPLICATE_FAST_FAIL_DURATION)
def workerReplicateRandomConnectionEnabled: Boolean =
get(WORKER_REPLICATE_RANDOM_CONNECTION_ENABLED)
def workerCheckFileCleanMaxRetries: Int = get(WORKER_CHECK_FILE_CLEAN_MAX_RETRIES)
def workerCheckFileCleanTimeout: Long = get(WORKER_CHECK_FILE_CLEAN_TIMEOUT)
def workerHeartbeatTimeout: Long = get(WORKER_HEARTBEAT_TIMEOUT)
def workerReplicateThreads: Int = get(WORKER_REPLICATE_THREADS)
def workerCommitThreads: Int =
if (hasHDFSStorage) Math.max(128, get(WORKER_COMMIT_THREADS)) else get(WORKER_COMMIT_THREADS)
def workerShuffleCommitTimeout: Long = get(WORKER_SHUFFLE_COMMIT_TIMEOUT)
def minPartitionSizeToEstimate: Long = get(ESTIMATED_PARTITION_SIZE_MIN_SIZE)
def partitionSorterSortPartitionTimeout: Long = get(PARTITION_SORTER_SORT_TIMEOUT)
def partitionSorterReservedMemoryPerPartition: Long =
get(WORKER_PARTITION_SORTER_PER_PARTITION_RESERVED_MEMORY)
def partitionSorterThreads: Int =
get(PARTITION_SORTER_THREADS).getOrElse(Runtime.getRuntime.availableProcessors)
def workerPushHeartbeatEnabled: Boolean = get(WORKER_PUSH_HEARTBEAT_ENABLED)
def workerPushMaxComponents: Int = get(WORKER_PUSH_COMPOSITEBUFFER_MAXCOMPONENTS)
def workerFetchHeartbeatEnabled: Boolean = get(WORKER_FETCH_HEARTBEAT_ENABLED)
def workerPartitionSplitEnabled: Boolean = get(WORKER_PARTITION_SPLIT_ENABLED)
// //////////////////////////////////////////////////////
// Metrics System //
// //////////////////////////////////////////////////////
def metricsConf: Option[String] = get(METRICS_CONF)
def metricsSystemEnable: Boolean = get(METRICS_ENABLED)
def metricsSampleRate: Double = get(METRICS_SAMPLE_RATE)
def metricsSlidingWindowSize: Int = get(METRICS_SLIDING_WINDOW_SIZE)
def metricsCollectCriticalEnabled: Boolean = get(METRICS_COLLECT_CRITICAL_ENABLED)
def metricsCapacity: Int = get(METRICS_CAPACITY)
def masterPrometheusMetricHost: String =
get(MASTER_PROMETHEUS_HOST).replace("<localhost>", Utils.localHostName(this))
def masterPrometheusMetricPort: Int = get(MASTER_PROMETHEUS_PORT)
def workerPrometheusMetricHost: String =
get(WORKER_PROMETHEUS_HOST).replace("<localhost>", Utils.localHostName(this))
def workerPrometheusMetricPort: Int = get(WORKER_PROMETHEUS_PORT)
def metricsExtraLabels: Map[String, String] =
get(METRICS_EXTRA_LABELS).map(Utils.parseMetricLabels).toMap
def metricsAppTopDiskUsageCount: Int = get(METRICS_APP_TOP_DISK_USAGE_COUNT)
def metricsAppTopDiskUsageWindowSize: Int = get(METRICS_APP_TOP_DISK_USAGE_WINDOW_SIZE)
def metricsAppTopDiskUsageInterval: Long = get(METRICS_APP_TOP_DISK_USAGE_INTERVAL)
// //////////////////////////////////////////////////////
// Quota //
// //////////////////////////////////////////////////////
def quotaEnabled: Boolean = get(QUOTA_ENABLED)
def quotaIdentityProviderClass: String = get(QUOTA_IDENTITY_PROVIDER)
def quotaManagerClass: String = get(QUOTA_MANAGER)
def quotaConfigurationPath: Option[String] = get(QUOTA_CONFIGURATION_PATH)
def quotaUserSpecificTenant: String = get(QUOTA_USER_SPECIFIC_TENANT)
def quotaUserSpecificUserName: String = get(QUOTA_USER_SPECIFIC_USERNAME)
// //////////////////////////////////////////////////////
// Client //
// //////////////////////////////////////////////////////
def clientCloseIdleConnections: Boolean = get(CLIENT_CLOSE_IDLE_CONNECTIONS)
def clientRegisterShuffleMaxRetry: Int = get(CLIENT_REGISTER_SHUFFLE_MAX_RETRIES)
def clientRegisterShuffleRetryWaitMs: Long = get(CLIENT_REGISTER_SHUFFLE_RETRY_WAIT)
def clientReserveSlotsRackAwareEnabled: Boolean = get(CLIENT_RESERVE_SLOTS_RACKAWARE_ENABLED)
def clientReserveSlotsMaxRetries: Int = get(CLIENT_RESERVE_SLOTS_MAX_RETRIES)
def clientReserveSlotsRetryWait: Long = get(CLIENT_RESERVE_SLOTS_RETRY_WAIT)
def clientRequestCommitFilesMaxRetries: Int = get(CLIENT_COMMIT_FILE_REQUEST_MAX_RETRY)
def clientCommitFilesIgnoreExcludedWorkers: Boolean = get(CLIENT_COMMIT_IGNORE_EXCLUDED_WORKERS)
def clientRpcMaxParallelism: Int = get(CLIENT_RPC_MAX_PARALLELISM)
def appHeartbeatTimeoutMs: Long = get(APPLICATION_HEARTBEAT_TIMEOUT)
def hdfsExpireDirsTimeoutMS: Long = get(HDFS_EXPIRE_DIRS_TIMEOUT)
def appHeartbeatIntervalMs: Long = get(APPLICATION_HEARTBEAT_INTERVAL)
def clientCheckedUseAllocatedWorkers: Boolean = get(CLIENT_CHECKED_USE_ALLOCATED_WORKERS)
def clientExcludedWorkerExpireTimeout: Long = get(CLIENT_EXCLUDED_WORKER_EXPIRE_TIMEOUT)
def clientExcludeReplicaOnFailureEnabled: Boolean =
get(CLIENT_EXCLUDE_PEER_WORKER_ON_FAILURE_ENABLED)
// //////////////////////////////////////////////////////
// Shuffle Compression //
// //////////////////////////////////////////////////////
def shuffleCompressionCodec: CompressionCodec =
CompressionCodec.valueOf(get(SHUFFLE_COMPRESSION_CODEC))
def shuffleCompressionZstdCompressLevel: Int = get(SHUFFLE_COMPRESSION_ZSTD_LEVEL)
// //////////////////////////////////////////////////////
// Shuffle Client RPC //
// //////////////////////////////////////////////////////
def clientRpcCacheSize: Int = get(CLIENT_RPC_CACHE_SIZE)
def clientRpcCacheConcurrencyLevel: Int = get(CLIENT_RPC_CACHE_CONCURRENCY_LEVEL)
def clientRpcReserveSlotsRpcTimeout: RpcTimeout =
new RpcTimeout(
get(CLIENT_RESERVE_SLOTS_RPC_TIMEOUT).milli,
CLIENT_RESERVE_SLOTS_RPC_TIMEOUT.key)
def clientRpcRegisterShuffleRpcAskTimeout: RpcTimeout =
new RpcTimeout(
get(CLIENT_RPC_REGISTER_SHUFFLE_RPC_ASK_TIMEOUT).milli,
CLIENT_RPC_REGISTER_SHUFFLE_RPC_ASK_TIMEOUT.key)
def clientRpcRequestPartitionLocationRpcAskTimeout: RpcTimeout =
new RpcTimeout(
get(CLIENT_RPC_REQUEST_PARTITION_LOCATION_RPC_ASK_TIMEOUT).milli,
CLIENT_RPC_REQUEST_PARTITION_LOCATION_RPC_ASK_TIMEOUT.key)
def clientRpcGetReducerFileGroupRpcAskTimeout: RpcTimeout =
new RpcTimeout(
get(CLIENT_RPC_GET_REDUCER_FILE_GROUP_RPC_ASK_TIMEOUT).milli,
CLIENT_RPC_GET_REDUCER_FILE_GROUP_RPC_ASK_TIMEOUT.key)
// //////////////////////////////////////////////////////
// Shuffle Client Fetch //
// //////////////////////////////////////////////////////
def clientFetchTimeoutMs: Long = get(CLIENT_FETCH_TIMEOUT)
def clientFetchMaxReqsInFlight: Int = get(CLIENT_FETCH_MAX_REQS_IN_FLIGHT)
def clientFetchMaxRetriesForEachReplica: Int = get(CLIENT_FETCH_MAX_RETRIES_FOR_EACH_REPLICA)
def clientFetchExcludeWorkerOnFailureEnabled: Boolean =
get(CLIENT_FETCH_EXCLUDE_WORKER_ON_FAILURE_ENABLED)
def clientFetchExcludedWorkerExpireTimeout: Long =
get(CLIENT_FETCH_EXCLUDED_WORKER_EXPIRE_TIMEOUT)
// //////////////////////////////////////////////////////
// Shuffle Client Push //
// //////////////////////////////////////////////////////
def clientPushReplicateEnabled: Boolean = get(CLIENT_PUSH_REPLICATE_ENABLED)
def clientPushBufferInitialSize: Int = get(CLIENT_PUSH_BUFFER_INITIAL_SIZE).toInt
def clientPushBufferMaxSize: Int = get(CLIENT_PUSH_BUFFER_MAX_SIZE).toInt
def clientPushQueueCapacity: Int = get(CLIENT_PUSH_QUEUE_CAPACITY)
def clientPushExcludeWorkerOnFailureEnabled: Boolean =
get(CLIENT_PUSH_EXCLUDE_WORKER_ON_FAILURE_ENABLED)
def clientPushMaxReqsInFlightPerWorker: Int = get(CLIENT_PUSH_MAX_REQS_IN_FLIGHT_PERWORKER)
def clientPushMaxReqsInFlightTotal: Int = get(CLIENT_PUSH_MAX_REQS_IN_FLIGHT_TOTAL)
def clientPushMaxReviveTimes: Int = get(CLIENT_PUSH_MAX_REVIVE_TIMES)
def clientPushReviveInterval: Long = get(CLIENT_PUSH_REVIVE_INTERVAL)
def clientPushReviveBatchSize: Int = get(CLIENT_PUSH_REVIVE_BATCHSIZE)
def clientPushSortMemoryThreshold: Long = get(CLIENT_PUSH_SORT_MEMORY_THRESHOLD)
def clientPushSortPipelineEnabled: Boolean = get(CLIENT_PUSH_SORT_PIPELINE_ENABLED)
def clientPushSortRandomizePartitionIdEnabled: Boolean =
get(CLIENT_PUSH_SORT_RANDOMIZE_PARTITION_ENABLED)
def clientPushRetryThreads: Int = get(CLIENT_PUSH_RETRY_THREADS)
def clientPushStageEndTimeout: Long = get(CLIENT_PUSH_STAGE_END_TIMEOUT)
def clientPushUnsafeRowFastWrite: Boolean = get(CLIENT_PUSH_UNSAFEROW_FASTWRITE_ENABLED)
def clientRpcCacheExpireTime: Long = get(CLIENT_RPC_CACHE_EXPIRE_TIME)
def pushDataTimeoutMs: Long = get(CLIENT_PUSH_DATA_TIMEOUT)
def clientPushLimitStrategy: String = get(CLIENT_PUSH_LIMIT_STRATEGY)
def clientPushSlowStartInitialSleepTime: Long = get(CLIENT_PUSH_SLOW_START_INITIAL_SLEEP_TIME)
def clientSlotAssignMaxWorkers: Int = get(CLIENT_SLOT_ASSIGN_MAX_WORKERS)
def clientPushSlowStartMaxSleepMills: Long = get(CLIENT_PUSH_SLOW_START_MAX_SLEEP_TIME)
def clientPushLimitInFlightTimeoutMs: Long =
if (clientPushReplicateEnabled) {
get(CLIENT_PUSH_LIMIT_IN_FLIGHT_TIMEOUT).getOrElse(
pushDataTimeoutMs * clientPushMaxReviveTimes * 4)
} else {
get(CLIENT_PUSH_LIMIT_IN_FLIGHT_TIMEOUT).getOrElse(
pushDataTimeoutMs * clientPushMaxReviveTimes * 2)
}