in common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala [1209:3124]
private def buildConf(key: String): ConfigBuilder = ConfigBuilder(key).onCreate(register)
val NETWORK_BIND_PREFER_IP: ConfigEntry[Boolean] =
buildConf("celeborn.network.bind.preferIpAddress")
.categories("network")
.version("0.3.0")
.doc("When `ture`, prefer to use IP address, otherwise FQDN. This configuration only " +
"takes effects when the bind hostname is not set explicitly, in such case, Celeborn " +
"will find the first non-loopback address to bind.")
.booleanConf
.createWithDefault(true)
val NETWORK_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.network.timeout")
.categories("network")
.version("0.2.0")
.doc("Default timeout for network operations.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("240s")
val NETWORK_CONNECT_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.network.connect.timeout")
.categories("network")
.doc("Default socket connect timeout.")
.version("0.2.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("10s")
val NETWORK_MEMORY_ALLOCATOR_SHARE: ConfigEntry[Boolean] =
buildConf("celeborn.network.memory.allocator.share")
.categories("network")
.internal
.version("0.3.0")
.doc("Whether to share memory allocator.")
.booleanConf
.createWithDefault(true)
val NETWORK_MEMORY_ALLOCATOR_ARENAS: OptionalConfigEntry[Int] =
buildConf("celeborn.network.memory.allocator.numArenas")
.categories("network")
.version("0.3.0")
.doc("Number of arenas for pooled memory allocator. Default value is Runtime.getRuntime.availableProcessors, min value is 2.")
.intConf
.createOptional
val NETWORK_MEMORY_ALLOCATOR_VERBOSE_METRIC: ConfigEntry[Boolean] =
buildConf("celeborn.network.memory.allocator.verbose.metric")
.categories("network")
.version("0.3.0")
.doc("Weather to enable verbose metric for pooled allocator.")
.booleanConf
.createWithDefault(false)
val PORT_MAX_RETRY: ConfigEntry[Int] =
buildConf("celeborn.port.maxRetries")
.categories("network")
.doc("When port is occupied, we will retry for max retry times.")
.version("0.2.0")
.intConf
.createWithDefault(1)
val RPC_IO_THREAD: OptionalConfigEntry[Int] =
buildConf("celeborn.rpc.io.threads")
.categories("network")
.doc("Netty IO thread number of NettyRpcEnv to handle RPC request. " +
"The default threads number is the number of runtime available processors.")
.version("0.2.0")
.intConf
.createOptional
val RPC_CONNECT_THREADS: ConfigEntry[Int] =
buildConf("celeborn.rpc.connect.threads")
.categories("network")
.version("0.2.0")
.intConf
.createWithDefault(64)
val RPC_LOOKUP_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.rpc.lookupTimeout")
.categories("network")
.version("0.2.0")
.doc("Timeout for RPC lookup operations.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("30s")
val RPC_ASK_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.rpc.askTimeout")
.categories("network")
.version("0.2.0")
.doc("Timeout for RPC ask operations. " +
"It's recommended to set at least `240s` when `HDFS` is enabled in `celeborn.storage.activeTypes`")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("60s")
val RPC_DISPATCHER_THREADS: OptionalConfigEntry[Int] =
buildConf("celeborn.rpc.dispatcher.threads")
.withAlternative("celeborn.rpc.dispatcher.numThreads")
.categories("network")
.doc("Threads number of message dispatcher event loop")
.version("0.3.0")
.intConf
.createOptional
val NETWORK_IO_MODE: ConfigEntry[String] =
buildConf("celeborn.<module>.io.mode")
.categories("network")
.doc("Netty EventLoopGroup backend, available options: NIO, EPOLL.")
.stringConf
.transform(_.toUpperCase)
.checkValues(Set("NIO", "EPOLL"))
.createWithDefault("NIO")
val NETWORK_IO_PREFER_DIRECT_BUFS: ConfigEntry[Boolean] =
buildConf("celeborn.<module>.io.preferDirectBufs")
.categories("network")
.doc("If true, we will prefer allocating off-heap byte buffers within Netty.")
.booleanConf
.createWithDefault(true)
val NETWORK_IO_CONNECT_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.<module>.io.connectTimeout")
.categories("network")
.doc("Socket connect timeout.")
.fallbackConf(NETWORK_CONNECT_TIMEOUT)
val NETWORK_IO_CONNECTION_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.<module>.io.connectionTimeout")
.categories("network")
.doc("Connection active timeout.")
.fallbackConf(NETWORK_TIMEOUT)
val NETWORK_IO_NUM_CONNECTIONS_PER_PEER: ConfigEntry[Int] =
buildConf("celeborn.<module>.io.numConnectionsPerPeer")
.categories("network")
.doc("Number of concurrent connections between two nodes.")
.intConf
.createWithDefault(2)
val NETWORK_IO_BACKLOG: ConfigEntry[Int] =
buildConf("celeborn.<module>.io.backLog")
.categories("network")
.doc(
"Requested maximum length of the queue of incoming connections. Default 0 for no backlog.")
.intConf
.createWithDefault(0)
val NETWORK_IO_SERVER_THREADS: ConfigEntry[Int] =
buildConf("celeborn.<module>.io.serverThreads")
.categories("network")
.doc("Number of threads used in the server thread pool. Default to 0, which is 2x#cores.")
.intConf
.createWithDefault(0)
val NETWORK_IO_CLIENT_THREADS: ConfigEntry[Int] =
buildConf("celeborn.<module>.io.clientThreads")
.categories("network")
.doc("Number of threads used in the client thread pool. Default to 0, which is 2x#cores.")
.intConf
.createWithDefault(0)
val NETWORK_IO_RECEIVE_BUFFER: ConfigEntry[Long] =
buildConf("celeborn.<module>.io.receiveBuffer")
.categories("network")
.doc("Receive buffer size (SO_RCVBUF). Note: the optimal size for receive buffer and send buffer " +
"should be latency * network_bandwidth. Assuming latency = 1ms, network_bandwidth = 10Gbps " +
"buffer size should be ~ 1.25MB.")
.version("0.2.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(0)
val NETWORK_IO_SEND_BUFFER: ConfigEntry[Long] =
buildConf("celeborn.<module>.io.sendBuffer")
.categories("network")
.doc("Send buffer size (SO_SNDBUF).")
.version("0.2.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(0)
val NETWORK_IO_MAX_RETRIES: ConfigEntry[Int] =
buildConf("celeborn.<module>.io.maxRetries")
.categories("network")
.doc(
"Max number of times we will try IO exceptions (such as connection timeouts) per request. " +
"If set to 0, we will not do any retries.")
.intConf
.createWithDefault(3)
val NETWORK_IO_RETRY_WAIT: ConfigEntry[Long] =
buildConf("celeborn.<module>.io.retryWait")
.categories("network")
.doc("Time that we will wait in order to perform a retry after an IOException. " +
"Only relevant if maxIORetries > 0.")
.version("0.2.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("5s")
val NETWORK_IO_LAZY_FD: ConfigEntry[Boolean] =
buildConf("celeborn.<module>.io.lazyFD")
.categories("network")
.doc("Whether to initialize FileDescriptor lazily or not. If true, file descriptors are created only " +
"when data is going to be transferred. This can reduce the number of open files.")
.booleanConf
.createWithDefault(true)
val NETWORK_VERBOSE_METRICS: ConfigEntry[Boolean] =
buildConf("celeborn.<module>.io.enableVerboseMetrics")
.categories("network")
.doc("Whether to track Netty memory detailed metrics. If true, the detailed metrics of Netty " +
"PoolByteBufAllocator will be gotten, otherwise only general memory usage will be tracked.")
.booleanConf
.createWithDefault(false)
val NETWORK_IO_STORAGE_MEMORY_MAP_THRESHOLD: ConfigEntry[Long] =
buildConf("celeborn.<module>.storage.memoryMapThreshold")
.withAlternative("celeborn.storage.memoryMapThreshold")
.categories("network")
.internal
.doc("Minimum size of a block that we should start using memory map rather than reading in through " +
"normal IO operations. This prevents Celeborn from memory mapping very small blocks. In general, " +
"memory mapping has high overhead for blocks close to or below the page size of the OS.")
.version("0.3.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("2m")
val MAX_CHUNKS_BEING_TRANSFERRED: OptionalConfigEntry[Long] =
buildConf("celeborn.shuffle.io.maxChunksBeingTransferred")
.categories("network")
.doc("The max number of chunks allowed to be transferred at the same time on shuffle service. Note " +
"that new incoming connections will be closed when the max number is hit. The client will retry " +
"according to the shuffle retry configs (see `celeborn.<module>.io.maxRetries` and " +
"`celeborn.<module>.io.retryWait`), if those limits are reached the task will fail with fetch failure.")
.version("0.2.0")
.longConf
.createOptional
val PUSH_TIMEOUT_CHECK_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.<module>.push.timeoutCheck.interval")
.categories("network")
.doc("Interval for checking push data timeout. " +
s"If setting <module> to `${TransportModuleConstants.DATA_MODULE}`, " +
s"it works for shuffle client push data and should be configured on client side. " +
s"If setting <module> to `${TransportModuleConstants.REPLICATE_MODULE}`, " +
s"it works for worker replicate data to peer worker and should be configured on worker side.")
.version("0.3.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("5s")
val PUSH_TIMEOUT_CHECK_THREADS: ConfigEntry[Int] =
buildConf("celeborn.<module>.push.timeoutCheck.threads")
.categories("network")
.doc("Threads num for checking push data timeout. " +
s"If setting <module> to `${TransportModuleConstants.DATA_MODULE}`, " +
s"it works for shuffle client push data and should be configured on client side. " +
s"If setting <module> to `${TransportModuleConstants.REPLICATE_MODULE}`, " +
s"it works for worker replicate data to peer worker and should be configured on worker side.")
.version("0.3.0")
.intConf
.createWithDefault(4)
val FETCH_TIMEOUT_CHECK_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.<module>.fetch.timeoutCheck.interval")
.categories("network")
.doc("Interval for checking fetch data timeout. " +
s"It only support setting <module> to `${TransportModuleConstants.DATA_MODULE}` " +
s"since it works for shuffle client fetch data and should be configured on client side.")
.version("0.3.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("5s")
val FETCH_TIMEOUT_CHECK_THREADS: ConfigEntry[Int] =
buildConf("celeborn.<module>.fetch.timeoutCheck.threads")
.categories("network")
.doc("Threads num for checking fetch data timeout. " +
s"It only support setting <module> to `${TransportModuleConstants.DATA_MODULE}` " +
s"since it works for shuffle client fetch data and should be configured on client side.")
.version("0.3.0")
.intConf
.createWithDefault(4)
val CHANNEL_HEARTBEAT_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.<module>.heartbeat.interval")
.withAlternative("celeborn.client.heartbeat.interval")
.categories("network")
.version("0.3.0")
.doc("The heartbeat interval between worker and client. " +
s"If setting <module> to `${TransportModuleConstants.DATA_MODULE}`, " +
s"it works for shuffle client push and fetch data and should be configured on client side. " +
s"If setting <module> to `${TransportModuleConstants.REPLICATE_MODULE}`, " +
s"it works for worker replicate data to peer worker and should be configured on worker side.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("60s")
val MASTER_ENDPOINTS: ConfigEntry[Seq[String]] =
buildConf("celeborn.master.endpoints")
.categories("client", "worker")
.doc("Endpoints of master nodes for celeborn client to connect, allowed pattern " +
"is: `<host1>:<port1>[,<host2>:<port2>]*`, e.g. `clb1:9097,clb2:9098,clb3:9099`. " +
"If the port is omitted, 9097 will be used.")
.version("0.2.0")
.stringConf
.toSequence
.checkValue(
endpoints => endpoints.map(_ => Try(Utils.parseHostPort(_))).forall(_.isSuccess),
"Allowed pattern is: `<host1>:<port1>[,<host2>:<port2>]*`")
.createWithDefaultString(s"<localhost>:9097")
val MASTER_CLIENT_RPC_ASK_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.masterClient.rpc.askTimeout")
.withAlternative("celeborn.rpc.haClient.askTimeout")
.internal
.categories("client", "worker")
.version("0.3.0")
.doc("Timeout for HA client RPC ask operations.")
.fallbackConf(RPC_ASK_TIMEOUT)
val MASTER_CLIENT_MAX_RETRIES: ConfigEntry[Int] =
buildConf("celeborn.masterClient.maxRetries")
.withAlternative("celeborn.client.maxRetries")
.internal
.categories("client", "worker")
.doc("Max retry times for client to connect master endpoint")
.version("0.3.0")
.intConf
.createWithDefault(15)
val APPLICATION_HEARTBEAT_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.master.heartbeat.application.timeout")
.withAlternative("celeborn.application.heartbeat.timeout")
.categories("master")
.version("0.3.0")
.doc("Application heartbeat timeout.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("300s")
val HDFS_EXPIRE_DIRS_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.master.hdfs.expireDirs.timeout")
.categories("master")
.version("0.3.0")
.doc("The timeout for a expire dirs to be deleted on HDFS.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1h")
val WORKER_HEARTBEAT_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.master.heartbeat.worker.timeout")
.withAlternative("celeborn.worker.heartbeat.timeout")
.categories("master")
.version("0.3.0")
.doc("Worker heartbeat timeout.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("120s")
val MASTER_HOST: ConfigEntry[String] =
buildConf("celeborn.master.host")
.categories("master")
.version("0.2.0")
.doc("Hostname for master to bind.")
.stringConf
.createWithDefaultString("<localhost>")
val MASTER_PORT: ConfigEntry[Int] =
buildConf("celeborn.master.port")
.categories("master")
.version("0.2.0")
.doc("Port for master to bind.")
.intConf
.checkValue(p => p >= 1024 && p < 65535, "Invalid port")
.createWithDefault(9097)
val HA_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.master.ha.enabled")
.withAlternative("celeborn.ha.enabled")
.categories("ha")
.version("0.3.0")
.doc("When true, master nodes run as Raft cluster mode.")
.booleanConf
.createWithDefault(false)
val HA_MASTER_NODE_ID: OptionalConfigEntry[String] =
buildConf("celeborn.master.ha.node.id")
.withAlternative("celeborn.ha.master.node.id")
.doc("Node id for master raft cluster in HA mode, if not define, " +
"will be inferred by hostname.")
.version("0.3.0")
.stringConf
.createOptional
val HA_MASTER_NODE_HOST: ConfigEntry[String] =
buildConf("celeborn.master.ha.node.<id>.host")
.withAlternative("celeborn.ha.master.node.<id>.host")
.categories("ha")
.doc("Host to bind of master node <id> in HA mode.")
.version("0.3.0")
.stringConf
.createWithDefaultString("<required>")
val HA_MASTER_NODE_PORT: ConfigEntry[Int] =
buildConf("celeborn.master.ha.node.<id>.port")
.withAlternative("celeborn.ha.master.node.<id>.port")
.categories("ha")
.doc("Port to bind of master node <id> in HA mode.")
.version("0.3.0")
.intConf
.checkValue(p => p >= 1024 && p < 65535, "Invalid port")
.createWithDefault(9097)
val HA_MASTER_NODE_RATIS_HOST: OptionalConfigEntry[String] =
buildConf("celeborn.master.ha.node.<id>.ratis.host")
.withAlternative("celeborn.ha.master.node.<id>.ratis.host")
.internal
.categories("ha")
.doc("Ratis host to bind of master node <id> in HA mode. If not provided, " +
s"fallback to ${HA_MASTER_NODE_HOST.key}.")
.version("0.3.0")
.stringConf
.createOptional
val HA_MASTER_NODE_RATIS_PORT: ConfigEntry[Int] =
buildConf("celeborn.master.ha.node.<id>.ratis.port")
.withAlternative("celeborn.ha.master.node.<id>.ratis.port")
.categories("ha")
.doc("Ratis port to bind of master node <id> in HA mode.")
.version("0.3.0")
.intConf
.checkValue(p => p >= 1024 && p < 65535, "Invalid port")
.createWithDefault(9872)
val HA_MASTER_RATIS_RPC_TYPE: ConfigEntry[String] =
buildConf("celeborn.master.ha.ratis.raft.rpc.type")
.withAlternative("celeborn.ha.master.ratis.raft.rpc.type")
.categories("ha")
.doc("RPC type for Ratis, available options: netty, grpc.")
.version("0.3.0")
.stringConf
.transform(_.toLowerCase)
.checkValues(Set("netty", "grpc"))
.createWithDefault("netty")
val HA_MASTER_RATIS_STORAGE_DIR: ConfigEntry[String] =
buildConf("celeborn.master.ha.ratis.raft.server.storage.dir")
.withAlternative("celeborn.ha.master.ratis.raft.server.storage.dir")
.categories("ha")
.version("0.3.0")
.stringConf
.createWithDefault("/tmp/ratis")
val HA_MASTER_RATIS_LOG_SEGMENT_SIZE_MAX: ConfigEntry[Long] =
buildConf("celeborn.master.ha.ratis.raft.server.log.segment.size.max")
.withAlternative("celeborn.ha.master.ratis.raft.server.log.segment.size.max")
.internal
.categories("ha")
.version("0.3.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("4MB")
val HA_MASTER_RATIS_LOG_PREALLOCATED_SIZE: ConfigEntry[Long] =
buildConf("celeborn.master.ha.ratis.raft.server.log.preallocated.size")
.withAlternative("celeborn.ha.master.ratis.raft.server.log.preallocated.size")
.internal
.categories("ha")
.version("0.3.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("4MB")
val HA_MASTER_RATIS_LOG_APPENDER_QUEUE_NUM_ELEMENTS: ConfigEntry[Int] =
buildConf("celeborn.master.ha.ratis.raft.server.log.appender.buffer.element-limit")
.withAlternative("celeborn.ha.master.ratis.raft.server.log.appender.buffer.element-limit")
.internal
.categories("ha")
.version("0.3.0")
.intConf
.createWithDefault(1024)
val HA_MASTER_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT: ConfigEntry[Long] =
buildConf("celeborn.master.ha.ratis.raft.server.log.appender.buffer.byte-limit")
.withAlternative("celeborn.ha.master.ratis.raft.server.log.appender.buffer.byte-limit")
.internal
.categories("ha")
.version("0.3.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("32MB")
val HA_MASTER_RATIS_LOG_INSTALL_SNAPSHOT_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.master.ha.ratis.raft.server.log.appender.install.snapshot.enabled")
.withAlternative("celeborn.ha.master.ratis.raft.server.log.appender.install.snapshot.enabled")
.internal
.categories("ha")
.version("0.3.0")
.booleanConf
.createWithDefault(true)
val HA_MASTER_RATIS_LOG_PURGE_GAP: ConfigEntry[Int] =
buildConf("celeborn.master.ha.ratis.raft.server.log.purge.gap")
.withAlternative("celeborn.ha.master.ratis.raft.server.log.purge.gap")
.internal
.categories("ha")
.version("0.3.0")
.intConf
.createWithDefault(1000000)
val HA_MASTER_RATIS_RPC_REQUEST_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.master.ha.ratis.raft.server.rpc.request.timeout")
.withAlternative("celeborn.ha.master.ratis.raft.server.rpc.request.timeout")
.internal
.categories("ha")
.version("0.3.0")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("3s")
val HA_MASTER_RATIS_SERVER_RETRY_CACHE_EXPIRY_TIME: ConfigEntry[Long] =
buildConf("celeborn.master.ha.ratis.raft.server.retrycache.expirytime")
.withAlternative("celeborn.ha.master.ratis.raft.server.retrycache.expirytime")
.internal
.categories("ha")
.version("0.3.0")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("600s")
val HA_MASTER_RATIS_RPC_TIMEOUT_MIN: ConfigEntry[Long] =
buildConf("celeborn.master.ha.ratis.raft.server.rpc.timeout.min")
.withAlternative("celeborn.ha.master.ratis.raft.server.rpc.timeout.min")
.internal
.categories("ha")
.version("0.3.0")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("3s")
val HA_MASTER_RATIS_RPC_TIMEOUT_MAX: ConfigEntry[Long] =
buildConf("celeborn.master.ha.ratis.raft.server.rpc.timeout.max")
.withAlternative("celeborn.ha.master.ratis.raft.server.rpc.timeout.max")
.internal
.categories("ha")
.version("0.3.0")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("5s")
val HA_MASTER_RATIS_FIRSTELECTION_TIMEOUT_MIN: ConfigEntry[Long] =
buildConf("celeborn.master.ha.ratis.first.election.timeout.min")
.withAlternative("celeborn.ha.master.ratis.first.election.timeout.min")
.internal
.categories("ha")
.version("0.3.0")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("3s")
val HA_MASTER_RATIS_FIRSTELECTION_TIMEOUT_MAX: ConfigEntry[Long] =
buildConf("celeborn.master.ha.ratis.first.election.timeout.max")
.withAlternative("celeborn.ha.master.ratis.first.election.timeout.max")
.internal
.categories("ha")
.version("0.3.0")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("5s")
val HA_MASTER_RATIS_NOTIFICATION_NO_LEADER_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.master.ha.ratis.raft.server.notification.no-leader.timeout")
.withAlternative("celeborn.ha.master.ratis.raft.server.notification.no-leader.timeout")
.internal
.categories("ha")
.version("0.3.0")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("30s")
val HA_MASTER_RATIS_RPC_SLOWNESS_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.master.ha.ratis.raft.server.rpc.slowness.timeout")
.withAlternative("celeborn.ha.master.ratis.raft.server.rpc.slowness.timeout")
.internal
.categories("ha")
.version("0.3.0")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("120s")
val HA_MASTER_RATIS_ROLE_CHECK_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.master.ha.ratis.raft.server.role.check.interval")
.withAlternative("celeborn.ha.master.ratis.raft.server.role.check.interval")
.internal
.categories("ha")
.version("0.3.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1s")
val HA_MASTER_RATIS_SNAPSHOT_AUTO_TRIGGER_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.master.ha.ratis.raft.server.snapshot.auto.trigger.enabled")
.withAlternative("celeborn.ha.master.ratis.raft.server.snapshot.auto.trigger.enabled")
.internal
.categories("ha")
.version("0.3.0")
.booleanConf
.createWithDefault(true)
val HA_MASTER_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD: ConfigEntry[Long] =
buildConf("celeborn.master.ha.ratis.raft.server.snapshot.auto.trigger.threshold")
.withAlternative("celeborn.ha.master.ratis.raft.server.snapshot.auto.trigger.threshold")
.internal
.categories("ha")
.version("0.3.0")
.longConf
.createWithDefault(200000L)
val HA_MASTER_RATIS_SNAPSHOT_RETENTION_FILE_NUM: ConfigEntry[Int] =
buildConf("celeborn.master.ha.ratis.raft.server.snapshot.retention.file.num")
.withAlternative("celeborn.ha.master.ratis.raft.server.snapshot.retention.file.num")
.internal
.categories("ha")
.version("0.3.0")
.intConf
.createWithDefault(3)
val MASTER_SLOT_ASSIGN_POLICY: ConfigEntry[String] =
buildConf("celeborn.master.slot.assign.policy")
.withAlternative("celeborn.slots.assign.policy")
.categories("master")
.version("0.3.0")
.doc("Policy for master to assign slots, Celeborn supports two types of policy: roundrobin and loadaware. " +
"Loadaware policy will be ignored when `HDFS` is enabled in `celeborn.storage.activeTypes`")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValues(Set(
SlotsAssignPolicy.ROUNDROBIN.name,
SlotsAssignPolicy.LOADAWARE.name))
.createWithDefault(SlotsAssignPolicy.ROUNDROBIN.name)
val MASTER_SLOT_ASSIGN_LOADAWARE_DISKGROUP_NUM: ConfigEntry[Int] =
buildConf("celeborn.master.slot.assign.loadAware.numDiskGroups")
.withAlternative("celeborn.slots.assign.loadAware.numDiskGroups")
.categories("master")
.doc("This configuration is a guidance for load-aware slot allocation algorithm. " +
"This value is control how many disk groups will be created.")
.version("0.3.0")
.intConf
.createWithDefault(5)
val MASTER_SLOT_ASSIGN_LOADAWARE_DISKGROUP_GRADIENT: ConfigEntry[Double] =
buildConf("celeborn.master.slot.assign.loadAware.diskGroupGradient")
.withAlternative("celeborn.slots.assign.loadAware.diskGroupGradient")
.categories("master")
.doc("This value means how many more workload will be placed into a faster disk group " +
"than a slower group.")
.version("0.3.0")
.doubleConf
.createWithDefault(0.1)
val MASTER_SLOT_ASSIGN_LOADAWARE_FLUSHTIME_WEIGHT: ConfigEntry[Double] =
buildConf("celeborn.master.slot.assign.loadAware.flushTimeWeight")
.withAlternative("celeborn.slots.assign.loadAware.flushTimeWeight")
.categories("master")
.doc(
"Weight of average flush time when calculating ordering in load-aware assignment strategy")
.version("0.3.0")
.doubleConf
.createWithDefault(0)
val MASTER_SLOT_ASSIGN_LOADAWARE_FETCHTIME_WEIGHT: ConfigEntry[Double] =
buildConf("celeborn.master.slot.assign.loadAware.fetchTimeWeight")
.withAlternative("celeborn.slots.assign.loadAware.fetchTimeWeight")
.categories("master")
.doc(
"Weight of average fetch time when calculating ordering in load-aware assignment strategy")
.version("0.3.0")
.doubleConf
.createWithDefault(1)
val MASTER_SLOT_ASSIGN_EXTRA_SLOTS: ConfigEntry[Int] =
buildConf("celeborn.master.slot.assign.extraSlots")
.withAlternative("celeborn.slots.assign.extraSlots")
.categories("master")
.version("0.3.0")
.doc("Extra slots number when master assign slots.")
.intConf
.createWithDefault(2)
val MASTER_SLOT_ASSIGN_MAX_WORKERS: ConfigEntry[Int] =
buildConf("celeborn.master.slot.assign.maxWorkers")
.categories("master")
.version("0.3.1")
.doc("Max workers that slots of one shuffle can be allocated on. Will choose the smaller positive one " +
s"from Master side and Client side, see `celeborn.client.slot.assign.maxWorkers`.")
.intConf
.createWithDefault(10000)
val ESTIMATED_PARTITION_SIZE_INITIAL_SIZE: ConfigEntry[Long] =
buildConf("celeborn.master.estimatedPartitionSize.initialSize")
.withAlternative("celeborn.shuffle.initialEstimatedPartitionSize")
.categories("master")
.doc("Initial partition size for estimation, it will change according to runtime stats.")
.version("0.3.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("64mb")
val ESTIMATED_PARTITION_SIZE_MIN_SIZE: ConfigEntry[Long] =
buildConf("celeborn.master.estimatedPartitionSize.minSize")
.withAlternative("celeborn.shuffle.minPartitionSizeToEstimate")
.categories("worker")
.doc(
"Ignore partition size smaller than this configuration of partition size for estimation.")
.version("0.3.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("8mb")
val ESTIMATED_PARTITION_SIZE_UPDATE_INITIAL_DELAY: ConfigEntry[Long] =
buildConf("celeborn.master.estimatedPartitionSize.update.initialDelay")
.withAlternative("celeborn.shuffle.estimatedPartitionSize.update.initialDelay")
.categories("master")
.doc("Initial delay time before start updating partition size for estimation.")
.version("0.3.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("5min")
val ESTIMATED_PARTITION_SIZE_UPDATE_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.master.estimatedPartitionSize.update.interval")
.withAlternative("celeborn.shuffle.estimatedPartitionSize.update.interval")
.categories("master")
.doc("Interval of updating partition size for estimation.")
.version("0.3.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("10min")
val MASTER_RESOURCE_CONSUMPTION_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.master.userResourceConsumption.update.interval")
.categories("master")
.doc("Time length for a window about compute user resource consumption.")
.version("0.3.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("30s")
val SHUFFLE_CHUNK_SIZE: ConfigEntry[Long] =
buildConf("celeborn.shuffle.chunk.size")
.categories("client", "worker")
.version("0.2.0")
.doc("Max chunk size of reducer's merged shuffle data. For example, if a reducer's " +
"shuffle data is 128M and the data will need 16 fetch chunk requests to fetch.")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("8m")
val WORKER_PARTITION_SPLIT_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.worker.shuffle.partitionSplit.enabled")
.withAlternative("celeborn.worker.partition.split.enabled")
.categories("worker")
.version("0.3.0")
.doc("enable the partition split on worker side")
.booleanConf
.createWithDefault(true)
val WORKER_PARTITION_SPLIT_MIN_SIZE: ConfigEntry[Long] =
buildConf("celeborn.worker.shuffle.partitionSplit.min")
.withAlternative("celeborn.shuffle.partitionSplit.min")
.categories("worker")
.doc("Min size for a partition to split")
.version("0.3.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("1m")
val WORKER_PARTITION_SPLIT_MAX_SIZE: ConfigEntry[Long] =
buildConf("celeborn.worker.shuffle.partitionSplit.max")
.categories("worker")
.doc("Specify the maximum partition size for splitting, and ensure that individual partition files are always smaller than this limit.")
.version("0.3.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("2g")
val WORKER_STORAGE_DIRS: OptionalConfigEntry[Seq[String]] =
buildConf("celeborn.worker.storage.dirs")
.categories("worker")
.version("0.2.0")
.doc("Directory list to store shuffle data. It's recommended to configure one directory " +
"on each disk. Storage size limit can be set for each directory. For the sake of " +
"performance, there should be no more than 2 flush threads " +
"on the same disk partition if you are using HDD, and should be 8 or more flush threads " +
"on the same disk partition if you are using SSD. For example: " +
"`dir1[:capacity=][:disktype=][:flushthread=],dir2[:capacity=][:disktype=][:flushthread=]`")
.stringConf
.toSequence
.createOptional
val WORKER_WORKING_DIR: ConfigEntry[String] =
buildConf("celeborn.worker.storage.workingDir")
.withAlternative("celeborn.worker.workingDir")
.categories("worker")
.doc("Worker's working dir path name.")
.version("0.3.0")
.stringConf
.createWithDefault("celeborn-worker/shuffle_data")
val WORKER_STORAGE_BASE_DIR_PREFIX: ConfigEntry[String] =
buildConf("celeborn.worker.storage.baseDir.prefix")
.internal
.categories("worker")
.version("0.2.0")
.doc("Base directory for Celeborn worker to write if " +
s"`${WORKER_STORAGE_DIRS.key}` is not set.")
.stringConf
.createWithDefault("/mnt/disk")
val WORKER_STORAGE_BASE_DIR_COUNT: ConfigEntry[Int] =
buildConf("celeborn.worker.storage.baseDir.number")
.internal
.categories("worker")
.version("0.2.0")
.doc(s"How many directories will be used if `${WORKER_STORAGE_DIRS.key}` is not set. " +
s"The directory name is a combination of `${WORKER_STORAGE_BASE_DIR_PREFIX.key}` " +
"and from one(inclusive) to `celeborn.worker.storage.baseDir.number`(inclusive) " +
"step by one.")
.intConf
.createWithDefault(16)
val HDFS_DIR: OptionalConfigEntry[String] =
buildConf("celeborn.storage.hdfs.dir")
.categories("worker", "master", "client")
.version("0.2.0")
.doc("HDFS base directory for Celeborn to store shuffle data.")
.stringConf
.createOptional
val WORKER_DISK_RESERVE_SIZE: ConfigEntry[Long] =
buildConf("celeborn.worker.storage.disk.reserve.size")
.withAlternative("celeborn.worker.disk.reserve.size")
.categories("worker")
.doc("Celeborn worker reserved space for each disk.")
.version("0.3.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("5G")
val WORKER_CHECK_FILE_CLEAN_MAX_RETRIES: ConfigEntry[Int] =
buildConf("celeborn.worker.storage.checkDirsEmpty.maxRetries")
.withAlternative("celeborn.worker.disk.checkFileClean.maxRetries")
.categories("worker")
.doc("The number of retries for a worker to check if the working directory is cleaned up before registering with the master.")
.version("0.3.0")
.intConf
.createWithDefault(3)
val WORKER_CHECK_FILE_CLEAN_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.worker.storage.checkDirsEmpty.timeout")
.withAlternative("celeborn.worker.disk.checkFileClean.timeout")
.categories("worker")
.doc("The wait time per retry for a worker to check if the working directory is cleaned up before registering with the master.")
.version("0.3.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1000ms")
val WORKER_RPC_PORT: ConfigEntry[Int] =
buildConf("celeborn.worker.rpc.port")
.categories("worker")
.doc("Server port for Worker to receive RPC request.")
.version("0.2.0")
.intConf
.createWithDefault(0)
val WORKER_PUSH_PORT: ConfigEntry[Int] =
buildConf("celeborn.worker.push.port")
.categories("worker")
.doc("Server port for Worker to receive push data request from ShuffleClient.")
.version("0.2.0")
.intConf
.createWithDefault(0)
val WORKER_FETCH_PORT: ConfigEntry[Int] =
buildConf("celeborn.worker.fetch.port")
.categories("worker")
.doc("Server port for Worker to receive fetch data request from ShuffleClient.")
.version("0.2.0")
.intConf
.createWithDefault(0)
val WORKER_REPLICATE_PORT: ConfigEntry[Int] =
buildConf("celeborn.worker.replicate.port")
.categories("worker")
.doc("Server port for Worker to receive replicate data request from other Workers.")
.version("0.2.0")
.intConf
.createWithDefault(0)
val WORKER_PUSH_IO_THREADS: OptionalConfigEntry[Int] =
buildConf("celeborn.worker.push.io.threads")
.categories("worker")
.doc("Netty IO thread number of worker to handle client push data. " +
s"The default threads number is the number of flush thread.")
.version("0.2.0")
.intConf
.createOptional
val WORKER_FETCH_IO_THREADS: OptionalConfigEntry[Int] =
buildConf("celeborn.worker.fetch.io.threads")
.categories("worker")
.doc("Netty IO thread number of worker to handle client fetch data. " +
s"The default threads number is the number of flush thread.")
.version("0.2.0")
.intConf
.createOptional
val WORKER_REPLICATE_IO_THREADS: OptionalConfigEntry[Int] =
buildConf("celeborn.worker.replicate.io.threads")
.categories("worker")
.doc("Netty IO thread number of worker to replicate shuffle data. " +
s"The default threads number is the number of flush thread.")
.version("0.2.0")
.intConf
.createOptional
val WORKER_REGISTER_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.worker.register.timeout")
.categories("worker")
.doc("Worker register timeout.")
.version("0.2.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("180s")
val WORKER_CLOSE_IDLE_CONNECTIONS: ConfigEntry[Boolean] =
buildConf("celeborn.worker.closeIdleConnections")
.categories("worker")
.doc("Whether worker will close idle connections.")
.version("0.2.0")
.booleanConf
.createWithDefault(false)
val WORKER_REPLICATE_FAST_FAIL_DURATION: ConfigEntry[Long] =
buildConf("celeborn.worker.replicate.fastFail.duration")
.categories("worker")
.doc("If a replicate request not replied during the duration, worker will mark the replicate data request as failed." +
"It's recommended to set at least `240s` when `HDFS` is enabled in `celeborn.storage.activeTypes`.")
.version("0.2.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("60s")
val WORKER_REPLICATE_RANDOM_CONNECTION_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.worker.replicate.randomConnection.enabled")
.categories("worker")
.doc("Whether worker will create random connection to peer when replicate data. When false, worker tend to " +
"reuse the same cached TransportClient to a specific replicate worker; when true, worker tend to use " +
"different cached TransportClient. Netty will use the same thread to serve the same connection, so " +
"with more connections replicate server can leverage more netty threads")
.version("0.2.1")
.booleanConf
.createWithDefault(true)
val WORKER_REPLICATE_THREADS: ConfigEntry[Int] =
buildConf("celeborn.worker.replicate.threads")
.categories("worker")
.version("0.2.0")
.doc("Thread number of worker to replicate shuffle data.")
.intConf
.createWithDefault(64)
val WORKER_COMMIT_THREADS: ConfigEntry[Int] =
buildConf("celeborn.worker.commitFiles.threads")
.withAlternative("celeborn.worker.commit.threads")
.categories("worker")
.version("0.3.0")
.doc("Thread number of worker to commit shuffle data files asynchronously. " +
"It's recommended to set at least `128` when `HDFS` is enabled in `celeborn.storage.activeTypes`.")
.intConf
.createWithDefault(32)
val WORKER_SHUFFLE_COMMIT_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.worker.commitFiles.timeout")
.withAlternative("celeborn.worker.shuffle.commit.timeout")
.categories("worker")
.doc("Timeout for a Celeborn worker to commit files of a shuffle. " +
"It's recommended to set at least `240s` when `HDFS` is enabled in `celeborn.storage.activeTypes`.")
.version("0.3.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("120s")
val PARTITION_SORTER_SORT_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.worker.sortPartition.timeout")
.withAlternative("celeborn.worker.partitionSorter.sort.timeout")
.categories("worker")
.doc("Timeout for a shuffle file to sort.")
.version("0.3.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("220s")
val PARTITION_SORTER_THREADS: OptionalConfigEntry[Int] =
buildConf("celeborn.worker.sortPartition.threads")
.withAlternative("celeborn.worker.partitionSorter.threads")
.categories("worker")
.doc("PartitionSorter's thread counts. " +
"It's recommended to set at least `64` when `HDFS` is enabled in `celeborn.storage.activeTypes`.")
.version("0.3.0")
.intConf
.createOptional
val WORKER_PARTITION_SORTER_PER_PARTITION_RESERVED_MEMORY: ConfigEntry[Long] =
buildConf("celeborn.worker.sortPartition.reservedMemoryPerPartition")
.withAlternative("celeborn.worker.partitionSorter.reservedMemoryPerPartition")
.categories("worker")
.doc("Reserved memory when sorting a shuffle file off-heap.")
.version("0.3.0")
.bytesConf(ByteUnit.BYTE)
.checkValue(v => v < Int.MaxValue, "Reserved memory per partition must be less than 2GB.")
.createWithDefaultString("1mb")
val WORKER_FLUSHER_BUFFER_SIZE: ConfigEntry[Long] =
buildConf("celeborn.worker.flusher.buffer.size")
.categories("worker")
.version("0.2.0")
.doc("Size of buffer used by a single flusher.")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("256k")
val WORKER_HDFS_FLUSHER_BUFFER_SIZE: ConfigEntry[Long] =
buildConf("celeborn.worker.flusher.hdfs.buffer.size")
.categories("worker")
.version("0.3.0")
.doc("Size of buffer used by a HDFS flusher.")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("4m")
val WORKER_WRITER_CLOSE_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.worker.writer.close.timeout")
.categories("worker")
.doc("Timeout for a file writer to close")
.version("0.2.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("120s")
val WORKER_FLUSHER_THREADS: ConfigEntry[Int] =
buildConf("celeborn.worker.flusher.threads")
.categories("worker")
.doc("Flusher's thread count per disk for unkown-type disks.")
.version("0.2.0")
.intConf
.createWithDefault(16)
val WORKER_FLUSHER_HDD_THREADS: ConfigEntry[Int] =
buildConf("celeborn.worker.flusher.hdd.threads")
.categories("worker")
.doc("Flusher's thread count per disk used for write data to HDD disks.")
.version("0.2.0")
.intConf
.createWithDefault(1)
val WORKER_FLUSHER_SSD_THREADS: ConfigEntry[Int] =
buildConf("celeborn.worker.flusher.ssd.threads")
.categories("worker")
.doc("Flusher's thread count per disk used for write data to SSD disks.")
.version("0.2.0")
.intConf
.createWithDefault(16)
val WORKER_FLUSHER_HDFS_THREADS: ConfigEntry[Int] =
buildConf("celeborn.worker.flusher.hdfs.threads")
.categories("worker")
.doc("Flusher's thread count used for write data to HDFS.")
.version("0.2.0")
.intConf
.createWithDefault(8)
val WORKER_FLUSHER_SHUTDOWN_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.worker.flusher.shutdownTimeout")
.categories("worker")
.doc("Timeout for a flusher to shutdown.")
.version("0.2.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("3s")
val WORKER_GRACEFUL_SHUTDOWN_SAVE_COMMITTED_FILEINFO_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.worker.graceful.shutdown.saveCommittedFileInfo.interval")
.categories("worker")
.doc("Interval for a Celeborn worker to flush committed file infos into Level DB.")
.version("0.3.1")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("5s")
val WORKER_GRACEFUL_SHUTDOWN_SAVE_COMMITTED_FILEINFO_SYNC: ConfigEntry[Boolean] =
buildConf("celeborn.worker.graceful.shutdown.saveCommittedFileInfo.sync")
.categories("worker")
.doc(
"Whether to call sync method to save committed file infos into Level DB to handle OS crash.")
.version("0.3.1")
.booleanConf
.createWithDefault(false)
val WORKER_DISKTIME_SLIDINGWINDOW_SIZE: ConfigEntry[Int] =
buildConf("celeborn.worker.flusher.diskTime.slidingWindow.size")
.withAlternative("celeborn.worker.flusher.avgFlushTime.slidingWindow.size")
.categories("worker")
.doc("The size of sliding windows used to calculate statistics about flushed time and count.")
.version("0.3.0")
.intConf
.createWithDefault(20)
val WORKER_DISKTIME_SLIDINGWINDOW_MINFLUSHCOUNT: ConfigEntry[Int] =
buildConf("celeborn.worker.diskTime.slidingWindow.minFlushCount")
.withAlternative("celeborn.worker.flusher.avgFlushTime.slidingWindow.minCount")
.categories("worker")
.doc("The minimum flush count to enter a sliding window" +
" to calculate statistics about flushed time and count.")
.version("0.3.0")
.internal
.intConf
.createWithDefault(500)
val WORKER_DISKTIME_SLIDINGWINDOW_MINFETCHCOUNT: ConfigEntry[Int] =
buildConf("celeborn.worker.diskTime.slidingWindow.minFetchCount")
.categories("worker")
.doc("The minimum fetch count to enter a sliding window" +
" to calculate statistics about flushed time and count.")
.version("0.2.1")
.internal
.intConf
.createWithDefault(100)
val WORKER_DIRECT_MEMORY_CHECK_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.worker.monitor.memory.check.interval")
.withAlternative("celeborn.worker.memory.checkInterval")
.categories("worker")
.doc("Interval of worker direct memory checking.")
.version("0.3.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("10ms")
val WORKER_DIRECT_MEMORY_REPORT_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.worker.monitor.memory.report.interval")
.withAlternative("celeborn.worker.memory.reportInterval")
.categories("worker")
.doc("Interval of worker direct memory tracker reporting to log.")
.version("0.3.0")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("10s")
val WORKER_DIRECT_MEMORY_TRIM_CHANNEL_WAIT_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.worker.monitor.memory.trimChannelWaitInterval")
.categories("worker")
.doc("Wait time after worker trigger channel to trim cache.")
.version("0.3.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1s")
val WORKER_DIRECT_MEMORY_TRIM_FLUSH_WAIT_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.worker.monitor.memory.trimFlushWaitInterval")
.categories("worker")
.doc("Wait time after worker trigger StorageManger to flush data.")
.version("0.3.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1s")
val WORKER_DISK_MONITOR_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.worker.monitor.disk.enabled")
.categories("worker")
.version("0.3.0")
.doc("When true, worker will monitor device and report to master.")
.booleanConf
.createWithDefault(true)
val WORKER_DEVICE_STATUS_CHECK_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.worker.monitor.disk.check.timeout")
.withAlternative("celeborn.worker.disk.check.timeout")
.categories("worker")
.doc("Timeout time for worker check device status.")
.version("0.3.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("30s")
val WORKER_DISK_MONITOR_CHECKLIST: ConfigEntry[Seq[String]] =
buildConf("celeborn.worker.monitor.disk.checklist")
.categories("worker")
.version("0.2.0")
.doc("Monitor type for disk, available items are: " +
"iohang, readwrite and diskusage.")
.stringConf
.transform(_.toLowerCase)
.toSequence
.createWithDefaultString("readwrite,diskusage")
val WORKER_DISK_MONITOR_CHECK_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.worker.monitor.disk.check.interval")
.withAlternative("celeborn.worker.monitor.disk.checkInterval")
.categories("worker")
.version("0.3.0")
.doc("Intervals between device monitor to check disk.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("60s")
val WORKER_DISK_MONITOR_SYS_BLOCK_DIR: ConfigEntry[String] =
buildConf("celeborn.worker.monitor.disk.sys.block.dir")
.categories("worker")
.version("0.2.0")
.doc("The directory where linux file block information is stored.")
.stringConf
.createWithDefault("/sys/block")
val WORKER_DISK_MONITOR_NOTIFY_ERROR_THRESHOLD: ConfigEntry[Int] =
buildConf("celeborn.worker.monitor.disk.notifyError.threshold")
.categories("worker")
.version("0.3.0")
.doc("Device monitor will only notify critical error once the accumulated valid non-critical error number " +
"exceeding this threshold.")
.intConf
.createWithDefault(64)
val WORKER_DISK_MONITOR_NOTIFY_ERROR_EXPIRE_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.worker.monitor.disk.notifyError.expireTimeout")
.categories("worker")
.version("0.3.0")
.doc("The expire timeout of non-critical device error. Only notify critical error when the number of non-critical " +
"errors for a period of time exceeds threshold.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("10m")
val WORKER_WRITER_CREATE_MAX_ATTEMPTS: ConfigEntry[Int] =
buildConf("celeborn.worker.writer.create.maxAttempts")
.categories("worker")
.version("0.2.0")
.doc("Retry count for a file writer to create if its creation was failed.")
.intConf
.createWithDefault(3)
val PARTITION_SORTER_DIRECT_MEMORY_RATIO_THRESHOLD: ConfigEntry[Double] =
buildConf("celeborn.worker.partitionSorter.directMemoryRatioThreshold")
.categories("worker")
.doc("Max ratio of partition sorter's memory for sorting, when reserved memory is higher than max partition " +
"sorter memory, partition sorter will stop sorting.")
.version("0.2.0")
.doubleConf
.checkValue(v => v >= 0.0 && v <= 1.0, "Should be in [0.0, 1.0].")
.createWithDefault(0.1)
val WORKER_DIRECT_MEMORY_RATIO_FOR_READ_BUFFER: ConfigEntry[Double] =
buildConf("celeborn.worker.directMemoryRatioForReadBuffer")
.categories("worker")
.doc("Max ratio of direct memory for read buffer")
.version("0.2.0")
.doubleConf
.createWithDefault(0.1)
val WORKER_DIRECT_MEMORY_RATIO_FOR_SHUFFLE_STORAGE: ConfigEntry[Double] =
buildConf("celeborn.worker.directMemoryRatioForMemoryShuffleStorage")
.categories("worker")
.doc("Max ratio of direct memory to store shuffle data")
.version("0.2.0")
.doubleConf
.createWithDefault(0.0)
val WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE: ConfigEntry[Double] =
buildConf("celeborn.worker.directMemoryRatioToPauseReceive")
.categories("worker")
.doc("If direct memory usage reaches this limit, the worker will stop to receive data from Celeborn shuffle clients.")
.version("0.2.0")
.doubleConf
.createWithDefault(0.85)
val WORKER_DIRECT_MEMORY_RATIO_PAUSE_REPLICATE: ConfigEntry[Double] =
buildConf("celeborn.worker.directMemoryRatioToPauseReplicate")
.categories("worker")
.doc("If direct memory usage reaches this limit, the worker will stop to receive replication data from other workers.")
.version("0.2.0")
.doubleConf
.createWithDefault(0.95)
val WORKER_DIRECT_MEMORY_RATIO_RESUME: ConfigEntry[Double] =
buildConf("celeborn.worker.directMemoryRatioToResume")
.categories("worker")
.doc("If direct memory usage is less than this limit, worker will resume.")
.version("0.2.0")
.doubleConf
.createWithDefault(0.5)
val WORKER_CONGESTION_CONTROL_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.worker.congestionControl.enabled")
.categories("worker")
.doc("Whether to enable congestion control or not.")
.version("0.3.0")
.booleanConf
.createWithDefault(false)
val WORKER_CONGESTION_CONTROL_SAMPLE_TIME_WINDOW: ConfigEntry[Long] =
buildConf("celeborn.worker.congestionControl.sample.time.window")
.categories("worker")
.doc("The worker holds a time sliding list to calculate users' produce/consume rate")
.version("0.3.0")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("10s")
val WORKER_CONGESTION_CONTROL_LOW_WATERMARK: OptionalConfigEntry[Long] =
buildConf("celeborn.worker.congestionControl.low.watermark")
.categories("worker")
.doc("Will stop congest users if the total pending bytes of disk buffer is lower than " +
"this configuration")
.version("0.3.0")
.bytesConf(ByteUnit.BYTE)
.createOptional
val WORKER_CONGESTION_CONTROL_HIGH_WATERMARK: OptionalConfigEntry[Long] =
buildConf("celeborn.worker.congestionControl.high.watermark")
.categories("worker")
.doc("If the total bytes in disk buffer exceeds this configure, will start to congest" +
"users whose produce rate is higher than the potential average consume rate. " +
"The congestion will stop if the produce rate is lower or equal to the " +
"average consume rate, or the total pending bytes lower than " +
s"${WORKER_CONGESTION_CONTROL_LOW_WATERMARK.key}")
.version("0.3.0")
.bytesConf(ByteUnit.BYTE)
.createOptional
val WORKER_CONGESTION_CONTROL_USER_INACTIVE_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.worker.congestionControl.user.inactive.interval")
.categories("worker")
.doc("How long will consider this user is inactive if it doesn't send data")
.version("0.3.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("10min")
val WORKER_DECOMMISSION_CHECK_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.worker.decommission.checkInterval")
.categories("worker")
.doc(
"The wait interval of checking whether all the shuffle expired during worker decomission")
.version("0.4.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("30s")
val WORKER_DECOMMISSION_FORCE_EXIT_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.worker.decommission.forceExitTimeout")
.categories("worker")
.doc("The wait time of waiting for all the shuffle expire during worker decommission.")
.version("0.4.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("6h")
val WORKER_GRACEFUL_SHUTDOWN_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.worker.graceful.shutdown.enabled")
.categories("worker")
.doc("When true, during worker shutdown, the worker will wait for all released slots " +
s"to be committed or destroyed.")
.version("0.2.0")
.booleanConf
.createWithDefault(false)
val WORKER_GRACEFUL_SHUTDOWN_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.worker.graceful.shutdown.timeout")
.categories("worker")
.doc("The worker's graceful shutdown timeout time.")
.version("0.2.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("600s")
val WORKER_CHECK_SLOTS_FINISHED_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.worker.graceful.shutdown.checkSlotsFinished.interval")
.categories("worker")
.doc("The wait interval of checking whether all released slots " +
"to be committed or destroyed during worker graceful shutdown")
.version("0.2.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1s")
val WORKER_CHECK_SLOTS_FINISHED_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.worker.graceful.shutdown.checkSlotsFinished.timeout")
.categories("worker")
.doc("The wait time of waiting for the released slots" +
" to be committed or destroyed during worker graceful shutdown.")
.version("0.2.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("480s")
val WORKER_GRACEFUL_SHUTDOWN_RECOVER_PATH: ConfigEntry[String] =
buildConf("celeborn.worker.graceful.shutdown.recoverPath")
.categories("worker")
.doc("The path to store levelDB.")
.version("0.2.0")
.stringConf
.transform(_.replace("<tmp>", System.getProperty("java.io.tmpdir")))
.createWithDefault(s"<tmp>/recover")
val WORKER_PARTITION_SORTER_SHUTDOWN_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.worker.graceful.shutdown.partitionSorter.shutdownTimeout")
.categories("worker")
.doc("The wait time of waiting for sorting partition files" +
" during worker graceful shutdown.")
.version("0.2.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("120s")
val WORKER_PARTITION_READ_BUFFERS_MIN: ConfigEntry[Int] =
buildConf("celeborn.worker.partition.initial.readBuffersMin")
.categories("worker")
.version("0.3.0")
.doc("Min number of initial read buffers")
.intConf
.createWithDefault(1)
val WORKER_PARTITION_READ_BUFFERS_MAX: ConfigEntry[Int] =
buildConf("celeborn.worker.partition.initial.readBuffersMax")
.categories("worker")
.version("0.3.0")
.doc("Max number of initial read buffers")
.intConf
.createWithDefault(1024)
val WORKER_BUFFERSTREAM_THREADS_PER_MOUNTPOINT: ConfigEntry[Int] =
buildConf("celeborn.worker.bufferStream.threadsPerMountpoint")
.categories("worker")
.version("0.3.0")
.doc("Threads count for read buffer per mount point.")
.intConf
.createWithDefault(8)
val WORKER_READBUFFER_ALLOCATIONWAIT: ConfigEntry[Long] =
buildConf("celeborn.worker.readBuffer.allocationWait")
.categories("worker")
.version("0.3.0")
.doc("The time to wait when buffer dispatcher can not allocate a buffer.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("50ms")
val WORKER_READBUFFER_TARGET_RATIO: ConfigEntry[Double] =
buildConf("celeborn.worker.readBuffer.target.ratio")
.categories("worker")
.version("0.3.0")
.doc("The target ratio for read ahead buffer's memory usage.")
.doubleConf
.createWithDefault(0.9)
val WORKER_READBUFFER_TARGET_UPDATE_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.worker.readBuffer.target.updateInterval")
.categories("worker")
.version("0.3.0")
.doc("The interval for memory manager to calculate new read buffer's target memory.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("100ms")
val WORKER_READBUFFER_TARGET_NOTIFY_THRESHOLD: ConfigEntry[Long] =
buildConf("celeborn.worker.readBuffer.target.changeThreshold")
.categories("worker")
.version("0.3.0")
.doc("The target ratio for pre read memory usage.")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("1mb")
val WORKER_READBUFFERS_TOTRIGGERREAD_MIN: ConfigEntry[Int] =
buildConf("celeborn.worker.readBuffer.toTriggerReadMin")
.categories("worker")
.version("0.3.0")
.doc("Min buffers count for map data partition to trigger read.")
.intConf
.createWithDefault(32)
val WORKER_PUSH_HEARTBEAT_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.worker.push.heartbeat.enabled")
.categories("worker")
.version("0.3.0")
.doc("enable the heartbeat from worker to client when pushing data")
.booleanConf
.createWithDefault(false)
val WORKER_PUSH_COMPOSITEBUFFER_MAXCOMPONENTS: ConfigEntry[Int] =
buildConf("celeborn.worker.push.compositeBuffer.maxComponents")
.internal
.categories("worker")
.version("0.3.0")
.doc("Max components of Netty `CompositeByteBuf` in `FileWriter`'s `flushBuffer`. " +
"When this value is too big, i.e. 256, there will be many memory fragments in Netty's memory pool, " +
"and total direct memory can be significantly larger than the disk buffer. " +
"When set to 1, Netty's direct memory is close to disk buffer, but performance " +
"might decrease due to frequent memory copy during compaction.")
.intConf
.createWithDefault(128)
val WORKER_FETCH_HEARTBEAT_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.worker.fetch.heartbeat.enabled")
.categories("worker")
.version("0.3.0")
.doc("enable the heartbeat from worker to client when fetching data")
.booleanConf
.createWithDefault(false)
val APPLICATION_HEARTBEAT_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.client.application.heartbeatInterval")
.withAlternative("celeborn.application.heartbeatInterval")
.categories("client")
.version("0.3.0")
.doc("Interval for client to send heartbeat message to master.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("10s")
val CLIENT_EXCLUDE_PEER_WORKER_ON_FAILURE_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.client.excludePeerWorkerOnFailure.enabled")
.categories("client")
.version("0.3.0")
.doc("When true, Celeborn will exclude partition's peer worker on failure " +
"when push data to replica failed.")
.booleanConf
.createWithDefault(true)
val CLIENT_EXCLUDED_WORKER_EXPIRE_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.client.excludedWorker.expireTimeout")
.withAlternative("celeborn.worker.excluded.expireTimeout")
.categories("client")
.version("0.3.0")
.doc("Timeout time for LifecycleManager to clear reserved excluded worker. Default to be 1.5 * `celeborn.master.heartbeat.worker.timeout`" +
"to cover worker heartbeat timeout check period")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("180s")
val CLIENT_CHECKED_USE_ALLOCATED_WORKERS: ConfigEntry[Boolean] =
buildConf("celeborn.client.checked.useAllocatedWorkers")
.internal
.categories("client")
.version("0.3.0")
.doc("When true, Celeborn will use local allocated workers as candidate being checked workers(check the workers" +
"whether unKnown in master), this may be more useful for map partition to regenerate the lost data), " +
"otherwise use local black list as candidate being checked workers.")
.booleanConf
.createWithDefault(false)
val TEST_CLIENT_RETRY_COMMIT_FILE: ConfigEntry[Boolean] =
buildConf("celeborn.test.client.retryCommitFiles")
.withAlternative("celeborn.test.retryCommitFiles")
.internal
.categories("test", "client")
.doc("Fail commitFile request for test")
.version("0.3.0")
.booleanConf
.createWithDefault(false)
val CLIENT_PUSH_REPLICATE_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.client.push.replicate.enabled")
.withAlternative("celeborn.push.replicate.enabled")
.categories("client")
.doc("When true, Celeborn worker will replicate shuffle data to another Celeborn worker " +
"asynchronously to ensure the pushed shuffle data won't be lost after the node failure. " +
"It's recommended to set `false` when `HDFS` is enabled in `celeborn.storage.activeTypes`.")
.version("0.3.0")
.booleanConf
.createWithDefault(false)
val CLIENT_PUSH_BUFFER_INITIAL_SIZE: ConfigEntry[Long] =
buildConf("celeborn.client.push.buffer.initial.size")
.withAlternative("celeborn.push.buffer.initial.size")
.categories("client")
.version("0.3.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("8k")
val CLIENT_PUSH_BUFFER_MAX_SIZE: ConfigEntry[Long] =
buildConf("celeborn.client.push.buffer.max.size")
.withAlternative("celeborn.push.buffer.max.size")
.categories("client")
.version("0.3.0")
.doc("Max size of reducer partition buffer memory for shuffle hash writer. The pushed " +
"data will be buffered in memory before sending to Celeborn worker. For performance " +
"consideration keep this buffer size higher than 32K. Example: If reducer amount is " +
"2000, buffer size is 64K, then each task will consume up to `64KiB * 2000 = 125MiB` " +
"heap memory.")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("64k")
val CLIENT_PUSH_QUEUE_CAPACITY: ConfigEntry[Int] =
buildConf("celeborn.client.push.queue.capacity")
.withAlternative("celeborn.push.queue.capacity")
.categories("client")
.version("0.3.0")
.doc("Push buffer queue size for a task. The maximum memory is " +
"`celeborn.client.push.buffer.max.size` * `celeborn.client.push.queue.capacity`, " +
"default: 64KiB * 512 = 32MiB")
.intConf
.createWithDefault(512)
val CLIENT_PUSH_MAX_REQS_IN_FLIGHT_TOTAL: ConfigEntry[Int] =
buildConf("celeborn.client.push.maxReqsInFlight.total")
.withAlternative("celeborn.push.maxReqsInFlight")
.categories("client")
.version("0.3.0")
.doc("Amount of total Netty in-flight requests. The maximum memory is " +
"`celeborn.client.push.maxReqsInFlight.total` * `celeborn.client.push.buffer.max.size` " +
"* compression ratio(1 in worst case): 64KiB * 256 = 16MiB")
.intConf
.createWithDefault(256)
val CLIENT_PUSH_MAX_REQS_IN_FLIGHT_PERWORKER: ConfigEntry[Int] =
buildConf("celeborn.client.push.maxReqsInFlight.perWorker")
.categories("client")
.version("0.3.0")
.doc(
"Amount of Netty in-flight requests per worker. Default max memory of in flight requests " +
" per worker is `celeborn.client.push.maxReqsInFlight.perWorker` * `celeborn.client.push.buffer.max.size` " +
"* compression ratio(1 in worst case): 64KiB * 32 = 2MiB. The maximum memory will " +
"not exceed `celeborn.client.push.maxReqsInFlight.total`.")
.intConf
.createWithDefault(32)
val CLIENT_PUSH_MAX_REVIVE_TIMES: ConfigEntry[Int] =
buildConf("celeborn.client.push.revive.maxRetries")
.categories("client")
.version("0.3.0")
.doc("Max retry times for reviving when celeborn push data failed.")
.intConf
.createWithDefault(5)
val CLIENT_PUSH_REVIVE_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.client.push.revive.interval")
.categories("client")
.version("0.3.0")
.doc("Interval for client to trigger Revive to LifecycleManager. The number of partitions in one Revive " +
"request is `celeborn.client.push.revive.batchSize`.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("100ms")
val CLIENT_PUSH_REVIVE_BATCHSIZE: ConfigEntry[Int] =
buildConf("celeborn.client.push.revive.batchSize")
.categories("client")
.version("0.3.0")
.doc("Max number of partitions in one Revive request.")
.intConf
.createWithDefault(2048)
val CLIENT_PUSH_EXCLUDE_WORKER_ON_FAILURE_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.client.push.excludeWorkerOnFailure.enabled")
.categories("client")
.doc("Whether to enable shuffle client-side push exclude workers on failures.")
.version("0.3.0")
.booleanConf
.createWithDefault(false)
val CLIENT_PUSH_LIMIT_STRATEGY: ConfigEntry[String] =
buildConf("celeborn.client.push.limit.strategy")
.categories("client")
.doc("The strategy used to control the push speed. " +
"Valid strategies are SIMPLE and SLOWSTART. The SLOWSTART strategy usually works with " +
"congestion control mechanism on the worker side.")
.version("0.3.0")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValues(Set("SIMPLE", "SLOWSTART"))
.createWithDefaultString("SIMPLE")
val CLIENT_PUSH_SLOW_START_INITIAL_SLEEP_TIME: ConfigEntry[Long] =
buildConf("celeborn.client.push.slowStart.initialSleepTime")
.categories("client")
.version("0.3.0")
.doc(s"The initial sleep time if the current max in flight requests is 0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("500ms")
val CLIENT_PUSH_SLOW_START_MAX_SLEEP_TIME: ConfigEntry[Long] =
buildConf("celeborn.client.push.slowStart.maxSleepTime")
.categories("client")
.version("0.3.0")
.doc(s"If ${CLIENT_PUSH_LIMIT_STRATEGY.key} is set to SLOWSTART, push side will " +
"take a sleep strategy for each batch of requests, this controls " +
"the max sleep time if the max in flight requests limit is 1 for a long time")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("2s")
val CLIENT_PUSH_DATA_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.client.push.timeout")
.withAlternative("celeborn.push.data.timeout")
.categories("client")
.version("0.3.0")
.doc(s"Timeout for a task to push data rpc message. This value should better be more than twice of `${PUSH_TIMEOUT_CHECK_INTERVAL.key}`")
.timeConf(TimeUnit.MILLISECONDS)
.checkValue(_ > 0, "Value must be positive!")
.createWithDefaultString("120s")
val TEST_CLIENT_PUSH_PRIMARY_DATA_TIMEOUT: ConfigEntry[Boolean] =
buildConf("celeborn.test.worker.pushPrimaryDataTimeout")
.withAlternative("celeborn.test.pushMasterDataTimeout")
.internal
.categories("test", "worker")
.version("0.3.0")
.doc("Whether to test push primary data timeout")
.booleanConf
.createWithDefault(false)
val TEST_WORKER_PUSH_REPLICA_DATA_TIMEOUT: ConfigEntry[Boolean] =
buildConf("celeborn.test.worker.pushReplicaDataTimeout")
.internal
.categories("test", "worker")
.version("0.3.0")
.doc("Whether to test push replica data timeout")
.booleanConf
.createWithDefault(false)
val CLIENT_PUSH_LIMIT_IN_FLIGHT_TIMEOUT: OptionalConfigEntry[Long] =
buildConf("celeborn.client.push.limit.inFlight.timeout")
.withAlternative("celeborn.push.limit.inFlight.timeout")
.categories("client")
.doc("Timeout for netty in-flight requests to be done." +
s"Default value should be `${CLIENT_PUSH_DATA_TIMEOUT.key} * 2`.")
.version("0.3.0")
.timeConf(TimeUnit.MILLISECONDS)
.createOptional
val CLIENT_PUSH_LIMIT_IN_FLIGHT_SLEEP_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.client.push.limit.inFlight.sleepInterval")
.withAlternative("celeborn.push.limit.inFlight.sleepInterval")
.categories("client")
.doc("Sleep interval when check netty in-flight requests to be done.")
.version("0.3.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("50ms")
val CLIENT_PUSH_SORT_RANDOMIZE_PARTITION_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.client.push.sort.randomizePartitionId.enabled")
.withAlternative("celeborn.push.sort.randomizePartitionId.enabled")
.categories("client")
.doc(
"Whether to randomize partitionId in push sorter. If true, partitionId will be randomized " +
"when sort data to avoid skew when push to worker")
.version("0.3.0")
.booleanConf
.createWithDefault(false)
val CLIENT_PUSH_RETRY_THREADS: ConfigEntry[Int] =
buildConf("celeborn.client.push.retry.threads")
.withAlternative("celeborn.push.retry.threads")
.categories("client")
.doc("Thread number to process shuffle re-send push data requests.")
.version("0.3.0")
.intConf
.createWithDefault(8)
val CLIENT_PUSH_SPLIT_PARTITION_THREADS: ConfigEntry[Int] =
buildConf("celeborn.client.push.splitPartition.threads")
.withAlternative("celeborn.push.splitPartition.threads")
.categories("client")
.doc("Thread number to process shuffle split request in shuffle client.")
.version("0.3.0")
.intConf
.createWithDefault(8)
val CLIENT_PUSH_TAKE_TASK_WAIT_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.client.push.takeTaskWaitInterval")
.categories("client")
.doc("Wait interval if no task available to push to worker.")
.version("0.3.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("50ms")
val CLIENT_PUSH_TAKE_TASK_MAX_WAIT_ATTEMPTS: ConfigEntry[Int] =
buildConf("celeborn.client.push.takeTaskMaxWaitAttempts")
.categories("client")
.doc("Max wait times if no task available to push to worker.")
.version("0.3.0")
.intConf
.createWithDefault(1)
val CLIENT_PUSH_SENDBUFFERPOOL_EXPIRETIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.client.push.sendBufferPool.expireTimeout")
.categories("client")
.doc("Timeout before clean up SendBufferPool. If SendBufferPool is idle for more than this time, " +
"the send buffers and push tasks will be cleaned up.")
.version("0.3.1")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("60s")
val CLIENT_PUSH_SENDBUFFERPOOL_CHECKEXPIREINTERVAL: ConfigEntry[Long] =
buildConf("celeborn.client.push.sendBufferPool.checkExpireInterval")
.categories("client")
.doc("Interval to check expire for send buffer pool. If the pool has been idle " +
s"for more than `${CLIENT_PUSH_SENDBUFFERPOOL_EXPIRETIMEOUT.key}`, the pooled send buffers and push tasks will be cleaned up.")
.version("0.3.1")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("30s")
val TEST_CLIENT_RETRY_REVIVE: ConfigEntry[Boolean] =
buildConf("celeborn.test.client.retryRevive")
.withAlternative("celeborn.test.retryRevive")
.internal
.categories("test", "client")
.doc("Fail push data and request for test")
.version("0.3.0")
.booleanConf
.createWithDefault(false)
val CLIENT_FETCH_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.client.fetch.timeout")
.withAlternative("celeborn.fetch.timeout")
.categories("client")
.version("0.3.0")
.doc("Timeout for a task to open stream and fetch chunk.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("600s")
val CLIENT_FETCH_MAX_REQS_IN_FLIGHT: ConfigEntry[Int] =
buildConf("celeborn.client.fetch.maxReqsInFlight")
.withAlternative("celeborn.fetch.maxReqsInFlight")
.categories("client")
.version("0.3.0")
.doc("Amount of in-flight chunk fetch request.")
.intConf
.createWithDefault(3)
val CLIENT_FETCH_MAX_RETRIES_FOR_EACH_REPLICA: ConfigEntry[Int] =
buildConf("celeborn.client.fetch.maxRetriesForEachReplica")
.withAlternative("celeborn.fetch.maxRetriesForEachReplica")
.withAlternative("celeborn.fetch.maxRetries")
.categories("client")
.version("0.3.0")
.doc("Max retry times of fetch chunk on each replica")
.intConf
.createWithDefault(3)
val CLIENT_FETCH_EXCLUDE_WORKER_ON_FAILURE_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.client.fetch.excludeWorkerOnFailure.enabled")
.categories("client")
.doc("Whether to enable shuffle client-side fetch exclude workers on failure.")
.version("0.3.0")
.booleanConf
.createWithDefault(false)
val CLIENT_FETCH_EXCLUDED_WORKER_EXPIRE_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.client.fetch.excludedWorker.expireTimeout")
.categories("client")
.doc("ShuffleClient is a static object, it will be used in the whole lifecycle of Executor," +
"We give a expire time for excluded workers to avoid a transient worker issues.")
.version("0.3.0")
.fallbackConf(CLIENT_EXCLUDED_WORKER_EXPIRE_TIMEOUT)
val TEST_CLIENT_FETCH_FAILURE: ConfigEntry[Boolean] =
buildConf("celeborn.test.client.fetchFailure")
.withAlternative("celeborn.test.fetchFailure")
.internal
.categories("test", "client")
.version("0.3.0")
.doc("Whether to test fetch chunk failure")
.booleanConf
.createWithDefault(false)
val SHUFFLE_RANGE_READ_FILTER_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.client.shuffle.rangeReadFilter.enabled")
.withAlternative("celeborn.shuffle.rangeReadFilter.enabled")
.categories("client")
.version("0.2.0")
.doc("If a spark application have skewed partition, this value can set to true to improve performance.")
.booleanConf
.createWithDefault(false)
val SHUFFLE_PARTITION_TYPE: ConfigEntry[String] =
buildConf("celeborn.client.shuffle.partition.type")
.withAlternative("celeborn.shuffle.partition.type")
.categories("client")
.doc("Type of shuffle's partition.")
.version("0.3.0")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValues(Set(
PartitionType.REDUCE.name,
PartitionType.MAP.name,
PartitionType.MAPGROUP.name))
.createWithDefault(PartitionType.REDUCE.name)
val SHUFFLE_PARTITION_SPLIT_THRESHOLD: ConfigEntry[Long] =
buildConf("celeborn.client.shuffle.partitionSplit.threshold")
.withAlternative("celeborn.shuffle.partitionSplit.threshold")
.categories("client")
.doc("Shuffle file size threshold, if file size exceeds this, trigger split.")
.version("0.3.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("1G")
val SHUFFLE_PARTITION_SPLIT_MODE: ConfigEntry[String] =
buildConf("celeborn.client.shuffle.partitionSplit.mode")
.withAlternative("celeborn.shuffle.partitionSplit.mode")
.categories("client")
.doc("soft: the shuffle file size might be larger than split threshold. " +
"hard: the shuffle file size will be limited to split threshold.")
.version("0.3.0")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValues(Set(PartitionSplitMode.SOFT.name, PartitionSplitMode.HARD.name))
.createWithDefault(PartitionSplitMode.SOFT.name)
val SHUFFLE_COMPRESSION_CODEC: ConfigEntry[String] =
buildConf("celeborn.client.shuffle.compression.codec")
.withAlternative("celeborn.shuffle.compression.codec")
.withAlternative("remote-shuffle.job.compression.codec")
.categories("client")
.doc("The codec used to compress shuffle data. By default, Celeborn provides three codecs: `lz4`, `zstd`, `none`.")
.version("0.3.0")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValues(Set(
CompressionCodec.LZ4.name,
CompressionCodec.ZSTD.name,
CompressionCodec.NONE.name))
.createWithDefault(CompressionCodec.LZ4.name)
val SHUFFLE_COMPRESSION_ZSTD_LEVEL: ConfigEntry[Int] =
buildConf("celeborn.client.shuffle.compression.zstd.level")
.withAlternative("celeborn.shuffle.compression.zstd.level")
.categories("client")
.doc("Compression level for Zstd compression codec, its value should be an integer " +
"between -5 and 22. Increasing the compression level will result in better compression " +
"at the expense of more CPU and memory.")
.version("0.3.0")
.intConf
.checkValue(
value => value >= -5 && value <= 22,
s"Compression level for Zstd compression codec should be an integer between -5 and 22.")
.createWithDefault(1)
val SHUFFLE_EXPIRED_CHECK_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.client.shuffle.expired.checkInterval")
.withAlternative("celeborn.shuffle.expired.checkInterval")
.categories("client")
.version("0.3.0")
.doc("Interval for client to check expired shuffles.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("60s")
val CLIENT_SHUFFLE_MANAGER_PORT: ConfigEntry[Int] =
buildConf("celeborn.client.shuffle.manager.port")
.withAlternative("celeborn.shuffle.manager.port")
.categories("client")
.version("0.3.0")
.doc("Port used by the LifecycleManager on the Driver.")
.intConf
.checkValue(
(port: Int) => {
if (port != 0) {
logWarning(
"The user specifies the port used by the LifecycleManager on the Driver, and its" +
s" values is $port, which may cause port conflicts and startup failure.")
}
true
},