in common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala [784:877]
def clientPushLimitInFlightSleepDeltaMs: Long = get(CLIENT_PUSH_LIMIT_IN_FLIGHT_SLEEP_INTERVAL)
def clientPushSplitPartitionThreads: Int = get(CLIENT_PUSH_SPLIT_PARTITION_THREADS)
def clientPushTakeTaskWaitIntervalMs: Long = get(CLIENT_PUSH_TAKE_TASK_WAIT_INTERVAL)
def clientPushTakeTaskMaxWaitAttempts: Int = get(CLIENT_PUSH_TAKE_TASK_MAX_WAIT_ATTEMPTS)
def clientPushSendBufferPoolExpireTimeout: Long = get(CLIENT_PUSH_SENDBUFFERPOOL_EXPIRETIMEOUT)
def clientPushSendBufferPoolExpireCheckInterval: Long =
get(CLIENT_PUSH_SENDBUFFERPOOL_CHECKEXPIREINTERVAL)
// //////////////////////////////////////////////////////
// Client Shuffle //
// //////////////////////////////////////////////////////
def shuffleWriterMode: ShuffleMode = ShuffleMode.valueOf(get(SPARK_SHUFFLE_WRITER_MODE))
def shufflePartitionType: PartitionType = PartitionType.valueOf(get(SHUFFLE_PARTITION_TYPE))
def shuffleRangeReadFilterEnabled: Boolean = get(SHUFFLE_RANGE_READ_FILTER_ENABLED)
def shuffleForceFallbackEnabled: Boolean = get(SPARK_SHUFFLE_FORCE_FALLBACK_ENABLED)
def shuffleForceFallbackPartitionThreshold: Long =
get(SPARK_SHUFFLE_FORCE_FALLBACK_PARTITION_THRESHOLD)
def shuffleExpiredCheckIntervalMs: Long = get(SHUFFLE_EXPIRED_CHECK_INTERVAL)
def shuffleManagerPort: Int = get(CLIENT_SHUFFLE_MANAGER_PORT)
def shuffleChunkSize: Long = get(SHUFFLE_CHUNK_SIZE)
def shufflePartitionSplitMode: PartitionSplitMode =
PartitionSplitMode.valueOf(get(SHUFFLE_PARTITION_SPLIT_MODE))
def shufflePartitionSplitThreshold: Long = get(SHUFFLE_PARTITION_SPLIT_THRESHOLD)
def batchHandleChangePartitionEnabled: Boolean = get(CLIENT_BATCH_HANDLE_CHANGE_PARTITION_ENABLED)
def batchHandleChangePartitionNumThreads: Int = get(CLIENT_BATCH_HANDLE_CHANGE_PARTITION_THREADS)
def batchHandleChangePartitionRequestInterval: Long =
get(CLIENT_BATCH_HANDLE_CHANGE_PARTITION_INTERVAL)
def batchHandleCommitPartitionEnabled: Boolean = get(CLIENT_BATCH_HANDLE_COMMIT_PARTITION_ENABLED)
def batchHandleCommitPartitionNumThreads: Int = get(CLIENT_BATCH_HANDLE_COMMIT_PARTITION_THREADS)
def batchHandleCommitPartitionRequestInterval: Long =
get(CLIENT_BATCH_HANDLED_COMMIT_PARTITION_INTERVAL)
def batchHandleReleasePartitionEnabled: Boolean =
get(CLIENT_BATCH_HANDLE_RELEASE_PARTITION_ENABLED)
def batchHandleReleasePartitionNumThreads: Int =
get(CLIENT_BATCH_HANDLE_RELEASE_PARTITION_THREADS)
def batchHandleReleasePartitionRequestInterval: Long =
get(CLIENT_BATCH_HANDLED_RELEASE_PARTITION_INTERVAL)
// //////////////////////////////////////////////////////
// Worker //
// //////////////////////////////////////////////////////
/**
* @return workingDir, usable space, flusher thread count, disk type
* check more details at CONFIGURATION_GUIDE.md
*/
def workerBaseDirs: Seq[(String, Long, Int, Type)] = {
// I assume there is no disk is bigger than 1 PB in recent days.
val defaultMaxCapacity = Utils.byteStringAsBytes("1PB")
get(WORKER_STORAGE_DIRS).map { storageDirs: Seq[String] =>
storageDirs.map { str =>
var maxCapacity = defaultMaxCapacity
var diskType = HDD
var flushThread = get(WORKER_FLUSHER_THREADS)
val (dir, attributes) = str.split(":").toList match {
case _dir :: tail => (_dir, tail)
case nil => throw new IllegalArgumentException(s"Illegal storage dir: $nil")
}
var flushThreadsDefined = false
attributes.foreach {
case capacityStr if capacityStr.toLowerCase.startsWith("capacity=") =>
maxCapacity = Utils.byteStringAsBytes(capacityStr.split("=")(1))
case diskTypeStr if diskTypeStr.toLowerCase.startsWith("disktype=") =>
diskType = Type.valueOf(diskTypeStr.split("=")(1))
if (diskType == Type.MEMORY) {
throw new IOException(s"Invalid diskType: $diskType")
}
if (!flushThreadsDefined) {
flushThread = diskType match {
case HDD => workerHddFlusherThreads
case SSD => workerSsdFlusherThreads
case _ => flushThread
}
}
case threadCountStr if threadCountStr.toLowerCase.startsWith("flushthread=") =>
flushThread = threadCountStr.split("=")(1).toInt
flushThreadsDefined = true
case illegal =>
throw new IllegalArgumentException(s"Illegal attribute: $illegal")
}
(dir, maxCapacity, flushThread, diskType)
}
}.getOrElse {
if (!hasHDFSStorage) {
val prefix = workerStorageBaseDirPrefix
val number = workerStorageBaseDirNumber
(1 to number).map { i =>
(s"$prefix$i", defaultMaxCapacity, workerHddFlusherThreads, HDD)
}
} else {
Seq.empty
}
}
}