in common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala [1083:1180]
def shuffleFallbackPartitionThreshold: Long = get(SPARK_SHUFFLE_FALLBACK_PARTITION_THRESHOLD)
def shuffleExpiredCheckIntervalMs: Long = get(SHUFFLE_EXPIRED_CHECK_INTERVAL)
def shuffleManagerPort: Int = get(CLIENT_SHUFFLE_MANAGER_PORT)
def shuffleChunkSize: Long = get(SHUFFLE_CHUNK_SIZE)
def dfsReadChunkSize: Long = get(CLIENT_FETCH_DFS_READ_CHUNK_SIZE)
def shufflePartitionSplitMode: PartitionSplitMode =
PartitionSplitMode.valueOf(get(SHUFFLE_PARTITION_SPLIT_MODE))
def shufflePartitionSplitThreshold: Long = get(SHUFFLE_PARTITION_SPLIT_THRESHOLD)
def batchHandleChangePartitionEnabled: Boolean = get(CLIENT_BATCH_HANDLE_CHANGE_PARTITION_ENABLED)
def batchHandleChangePartitionBuckets: Int =
get(CLIENT_BATCH_HANDLE_CHANGE_PARTITION_BUCKETS)
def batchHandleChangePartitionNumThreads: Int = get(CLIENT_BATCH_HANDLE_CHANGE_PARTITION_THREADS)
def batchHandleChangePartitionRequestInterval: Long =
get(CLIENT_BATCH_HANDLE_CHANGE_PARTITION_INTERVAL)
def batchHandleRemoveExpiredShufflesEnabled: Boolean =
get(CLIENT_BATCH_REMOVE_EXPIRED_SHUFFLE_ENABLED)
def batchHandleCommitPartitionEnabled: Boolean = get(CLIENT_BATCH_HANDLE_COMMIT_PARTITION_ENABLED)
def batchHandleCommitPartitionNumThreads: Int = get(CLIENT_BATCH_HANDLE_COMMIT_PARTITION_THREADS)
def batchHandleCommitPartitionRequestInterval: Long =
get(CLIENT_BATCH_HANDLED_COMMIT_PARTITION_INTERVAL)
def batchHandleReleasePartitionEnabled: Boolean =
get(CLIENT_BATCH_HANDLE_RELEASE_PARTITION_ENABLED)
def batchHandleReleasePartitionNumThreads: Int =
get(CLIENT_BATCH_HANDLE_RELEASE_PARTITION_THREADS)
def batchHandleReleasePartitionRequestInterval: Long =
get(CLIENT_BATCH_HANDLED_RELEASE_PARTITION_INTERVAL)
def enableReadLocalShuffleFile: Boolean = get(READ_LOCAL_SHUFFLE_FILE)
def readLocalShuffleThreads: Int = get(READ_LOCAL_SHUFFLE_THREADS)
def readStreamCreatorPoolThreads: Int = get(READ_STREAM_CREATOR_POOL_THREADS)
def registerShuffleFilterExcludedWorkerEnabled: Boolean =
get(REGISTER_SHUFFLE_FILTER_EXCLUDED_WORKER_ENABLED)
def reviseLostShufflesEnabled: Boolean = get(REVISE_LOST_SHUFFLES_ENABLED)
// //////////////////////////////////////////////////////
// Worker //
// //////////////////////////////////////////////////////
/**
* @return workingDir, usable space, flusher thread count, disk type
* check more details at CONFIGURATION_GUIDE.md
*/
def workerBaseDirs: Seq[(String, Long, Int, Type)] = {
// I assume there is no disk is bigger than 1 PB in recent days.
val defaultMaxCapacity = Utils.byteStringAsBytes("1PB")
get(WORKER_STORAGE_DIRS).map { storageDirs: Seq[String] =>
storageDirs.map { str =>
var maxCapacity = defaultMaxCapacity
var diskType = HDD
var flushThread = get(WORKER_FLUSHER_THREADS)
val (dir, attributes) = str.split(":").toList match {
case _dir :: tail => (_dir, tail)
case nil => throw new IllegalArgumentException(s"Illegal storage dir: $nil")
}
var flushThreadsDefined = false
attributes.foreach {
case capacityStr if capacityStr.toLowerCase.startsWith("capacity=") =>
maxCapacity = Utils.byteStringAsBytes(capacityStr.split("=")(1))
case diskTypeStr if diskTypeStr.toLowerCase.startsWith("disktype=") =>
diskType = Type.valueOf(diskTypeStr.split("=")(1))
if (diskType == Type.MEMORY) {
throw new IOException(s"Invalid diskType: $diskType")
}
if (!flushThreadsDefined) {
flushThread = diskType match {
case HDD => workerHddFlusherThreads
case SSD => workerSsdFlusherThreads
case _ => flushThread
}
}
case threadCountStr if threadCountStr.toLowerCase.startsWith("flushthread=") =>
flushThread = threadCountStr.split("=")(1).toInt
flushThreadsDefined = true
case illegal =>
throw new IllegalArgumentException(s"Illegal attribute: $illegal")
}
(dir, maxCapacity, flushThread, diskType)
}
}.getOrElse {
if (!hasHDFSStorage && !hasS3Storage && !hasOssStorage) {
val prefix = workerStorageBaseDirPrefix
val number = workerStorageBaseDirNumber
val diskType = Type.valueOf(workerStorageBaseDirDiskType)
(1 to number).map { i =>
(
s"$prefix$i",
defaultMaxCapacity,
diskType match {
case SSD => workerSsdFlusherThreads
case _ => workerHddFlusherThreads
},
diskType)
}
} else {
Seq.empty
}
}
}