in common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala [810:947]
def workerHttpPort: Int = get(WORKER_HTTP_PORT)
def workerHttpMaxWorkerThreads: Int = get(WORKER_HTTP_MAX_WORKER_THREADS)
def workerHttpStopTimeout: Long = get(WORKER_HTTP_STOP_TIMEOUT)
def workerHttpIdleTimeout: Long = get(WORKER_HTTP_IDLE_TIMEOUT)
def workerRpcPort: Int = get(WORKER_RPC_PORT)
def workerPushPort: Int = get(WORKER_PUSH_PORT)
def workerFetchPort: Int = get(WORKER_FETCH_PORT)
def workerReplicatePort: Int = get(WORKER_REPLICATE_PORT)
def workerPushIoThreads: Option[Int] = get(WORKER_PUSH_IO_THREADS)
def workerFetchIoThreads: Option[Int] = get(WORKER_FETCH_IO_THREADS)
def workerReplicateIoThreads: Option[Int] = get(WORKER_REPLICATE_IO_THREADS)
def registerWorkerTimeout: Long = get(WORKER_REGISTER_TIMEOUT)
def workerWorkingDir: String = get(WORKER_WORKING_DIR)
def workerCloseIdleConnections: Boolean = get(WORKER_CLOSE_IDLE_CONNECTIONS)
def workerReplicateFastFailDuration: Long = get(WORKER_REPLICATE_FAST_FAIL_DURATION)
def workerReplicateRandomConnectionEnabled: Boolean =
get(WORKER_REPLICATE_RANDOM_CONNECTION_ENABLED)
def workerCheckFileCleanMaxRetries: Int = get(WORKER_CHECK_FILE_CLEAN_MAX_RETRIES)
def workerCheckFileCleanTimeout: Long = get(WORKER_CHECK_FILE_CLEAN_TIMEOUT)
def workerHeartbeatTimeout: Long = get(WORKER_HEARTBEAT_TIMEOUT)
def workerUnavailableInfoExpireTimeout: Long = get(WORKER_UNAVAILABLE_INFO_EXPIRE_TIMEOUT)
def allowWorkerHostPattern: Option[Regex] = get(ALLOW_WORKER_HOST_PATTERN)
def denyWorkerHostPattern: Option[Regex] = get(DENY_WORKER_HOST_PATTERN)
def workerReplicateThreads: Int = get(WORKER_REPLICATE_THREADS)
def workerCommitThreads: Int =
if (hasHDFSStorage) Math.max(128, get(WORKER_COMMIT_THREADS)) else get(WORKER_COMMIT_THREADS)
def workerCommitFilesCheckInterval: Long = get(WORKER_COMMIT_FILES_CHECK_INTERVAL)
def workerCleanThreads: Int = get(WORKER_CLEAN_THREADS)
def workerShuffleCommitTimeout: Long = get(WORKER_SHUFFLE_COMMIT_TIMEOUT)
def maxPartitionSizeToEstimate: Long =
get(ESTIMATED_PARTITION_SIZE_MAX_SIZE).getOrElse(partitionSplitMaximumSize * 2)
def minPartitionSizeToEstimate: Long = get(ESTIMATED_PARTITION_SIZE_MIN_SIZE)
def workerPartitionSorterSortPartitionTimeout: Long = get(WORKER_PARTITION_SORTER_SORT_TIMEOUT)
def workerPartitionSorterPrefetchEnabled: Boolean =
get(WORKER_PARTITION_SORTER_PREFETCH_ENABLED)
def workerPartitionSorterShuffleBlockCompactionFactor: Double =
get(WORKER_SHUFFLE_BLOCK_COMPACTION_FACTOR)
def workerPartitionSorterReservedMemoryPerPartition: Long =
get(WORKER_PARTITION_SORTER_RESERVED_MEMORY_PER_PARTITION)
def workerPartitionSorterThreads: Int =
get(WORKER_PARTITION_SORTER_THREADS).getOrElse(Runtime.getRuntime.availableProcessors)
def workerPartitionSorterIndexCacheMaxWeight: Long =
get(WORKER_PARTITION_SORTER_INDEX_CACHE_MAX_WEIGHT)
def workerPartitionSorterIndexExpire: Long = get(WORKER_PARTITION_SORTER_INDEX_CACHE_EXPIRE)
def workerPushHeartbeatEnabled: Boolean = get(WORKER_PUSH_HEARTBEAT_ENABLED)
def workerPushMaxComponents: Int = get(WORKER_PUSH_COMPOSITEBUFFER_MAXCOMPONENTS)
def workerFetchHeartbeatEnabled: Boolean = get(WORKER_FETCH_HEARTBEAT_ENABLED)
def workerPartitionSplitEnabled: Boolean = get(WORKER_PARTITION_SPLIT_ENABLED)
def workerActiveConnectionMax: Option[Long] = get(WORKER_ACTIVE_CONNECTION_MAX)
def workerJvmProfilerEnabled: Boolean = get(WORKER_JVM_PROFILER_ENABLED)
def workerJvmProfilerOptions: String = get(WORKER_JVM_PROFILER_OPTIONS)
def workerJvmProfilerLocalDir: String = get(WORKER_JVM_PROFILER_LOCAL_DIR)
def workerJvmQuakeEnabled: Boolean = get(WORKER_JVM_QUAKE_ENABLED)
def workerJvmQuakeCheckInterval: Long = get(WORKER_JVM_QUAKE_CHECK_INTERVAL)
def workerJvmQuakeRuntimeWeight: Double = get(WORKER_JVM_QUAKE_RUNTIME_WEIGHT)
def workerJvmQuakeDumpEnabled: Boolean = get(WORKER_JVM_QUAKE_DUMP_ENABLED)
def workerJvmQuakeDumpPath: String = get(WORKER_JVM_QUAKE_DUMP_PATH)
def workerJvmQuakeDumpThreshold: Duration =
getTimeAsMs(
WORKER_JVM_QUAKE_DUMP_THRESHOLD.key,
WORKER_JVM_QUAKE_DUMP_THRESHOLD.defaultValueString).microsecond
def workerJvmQuakeKillThreshold: Duration =
getTimeAsMs(
WORKER_JVM_QUAKE_KILL_THRESHOLD.key,
WORKER_JVM_QUAKE_KILL_THRESHOLD.defaultValueString).microsecond
def workerJvmQuakeExitCode: Int = get(WORKER_JVM_QUAKE_EXIT_CODE)
// //////////////////////////////////////////////////////
// Metrics System //
// //////////////////////////////////////////////////////
def metricsConf: Option[String] = get(METRICS_CONF)
def metricsSystemEnable: Boolean = get(METRICS_ENABLED)
def metricsSampleRate: Double = get(METRICS_SAMPLE_RATE)
def metricsSlidingWindowSize: Int = get(METRICS_SLIDING_WINDOW_SIZE)
def metricsCollectCriticalEnabled: Boolean = get(METRICS_COLLECT_CRITICAL_ENABLED)
def metricsCapacity: Int = get(METRICS_CAPACITY)
def metricsExtraLabels: Map[String, String] =
get(METRICS_EXTRA_LABELS).map(Utils.parseMetricLabels).toMap
def metricsWorkerAppTopResourceConsumptionCount: Int =
get(METRICS_WORKER_APP_TOP_RESOURCE_CONSUMPTION_COUNT)
def metricsWorkerAppTopResourceConsumptionBytesWrittenThreshold: Long =
get(METRICS_WORKER_APP_TOP_RESOURCE_CONSUMPTION_BYTES_WRITTEN_THRESHOLD)
def metricsWorkerForceAppendPauseSpentTimeThreshold: Int =
get(METRICS_WORKER_PAUSE_SPENT_TIME_FORCE_APPEND_THRESHOLD)
def metricsJsonPrettyEnabled: Boolean = get(METRICS_JSON_PRETTY_ENABLED)
// //////////////////////////////////////////////////////
// Quota //
// //////////////////////////////////////////////////////
def quotaEnabled: Boolean = get(QUOTA_ENABLED)
def clusterQuotaEnabled: Boolean = get(CLUSTER_QUOTA_ENABLED)
def tenantQuotaEnabled: Boolean = get(TENANT_QUOTA_ENABLED)
def userQuotaEnabled: Boolean = get(USER_QUOTA_ENABLED)
def quotaInterruptShuffleEnabled: Boolean = get(QUOTA_INTERRUPT_SHUFFLE_ENABLED)
// //////////////////////////////////////////////////////
// Identity //
// //////////////////////////////////////////////////////
def identityProviderClass: String = get(IDENTITY_PROVIDER)
def userSpecificTenant: String = get(USER_SPECIFIC_TENANT)
def userSpecificUserName: String = get(USER_SPECIFIC_USERNAME)
// //////////////////////////////////////////////////////
// Client //
// //////////////////////////////////////////////////////
def clientCloseIdleConnections: Boolean = get(CLIENT_CLOSE_IDLE_CONNECTIONS)
def clientRegisterShuffleMaxRetry: Int = get(CLIENT_REGISTER_SHUFFLE_MAX_RETRIES)
def clientRegisterShuffleRetryWaitMs: Long = get(CLIENT_REGISTER_SHUFFLE_RETRY_WAIT)
def clientReserveSlotsRackAwareEnabled: Boolean = get(CLIENT_RESERVE_SLOTS_RACKAWARE_ENABLED)
def clientReserveSlotsMaxRetries: Int = get(CLIENT_RESERVE_SLOTS_MAX_RETRIES)
def clientReserveSlotsRetryWait: Long = get(CLIENT_RESERVE_SLOTS_RETRY_WAIT)
def clientRequestCommitFilesMaxRetries: Int = get(CLIENT_COMMIT_FILE_REQUEST_MAX_RETRY)
def clientCommitFilesIgnoreExcludedWorkers: Boolean = get(CLIENT_COMMIT_IGNORE_EXCLUDED_WORKERS)
def clientShuffleDynamicResourceEnabled: Boolean =
get(CLIENT_SHUFFLE_DYNAMIC_RESOURCE_ENABLED)
def clientShuffleDynamicResourceFactor: Double = get(CLIENT_SHUFFLE_DYNAMIC_RESOURCE_FACTOR)
def appHeartbeatTimeoutMs: Long = get(APPLICATION_HEARTBEAT_TIMEOUT)
def hdfsExpireDirsTimeoutMS: Long = get(HDFS_EXPIRE_DIRS_TIMEOUT)
def dfsExpireDirsTimeoutMS: Long = get(DFS_EXPIRE_DIRS_TIMEOUT)
def appHeartbeatIntervalMs: Long = get(APPLICATION_HEARTBEAT_INTERVAL)
def applicationUnregisterEnabled: Boolean = get(APPLICATION_UNREGISTER_ENABLED)
def clientCheckedUseAllocatedWorkers: Boolean = get(CLIENT_CHECKED_USE_ALLOCATED_WORKERS)
def clientExcludedWorkerExpireTimeout: Long = get(CLIENT_EXCLUDED_WORKER_EXPIRE_TIMEOUT)
def clientExcludeReplicaOnFailureEnabled: Boolean =
get(CLIENT_EXCLUDE_PEER_WORKER_ON_FAILURE_ENABLED)
def clientMrMaxPushData: Long = get(CLIENT_MR_PUSH_DATA_MAX)
def clientApplicationUUIDSuffixEnabled: Boolean = get(CLIENT_APPLICATION_UUID_SUFFIX_ENABLED)
def appUniqueIdWithUUIDSuffix(appId: String): String = {
if (clientApplicationUUIDSuffixEnabled) {
appId + "-" + UUID.randomUUID().toString.replaceAll("-", "")
} else {
appId
}
}