in common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala [1812:4976]
private def buildConf(key: String): ConfigBuilder = ConfigBuilder(key).onCreate(register)
val NETWORK_BIND_PREFER_IP: ConfigEntry[Boolean] =
buildConf("celeborn.network.bind.preferIpAddress")
.categories("network")
.version("0.3.0")
.doc("When `true`, prefer to use IP address, otherwise FQDN. This configuration only " +
"takes effects when the bind hostname is not set explicitly, in such case, Celeborn " +
"will find the first non-loopback address to bind.")
.booleanConf
.createWithDefault(true)
val NETWORK_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.network.timeout")
.categories("network")
.version("0.2.0")
.doc("Default timeout for network operations.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("240s")
val NETWORK_CONNECT_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.network.connect.timeout")
.categories("network")
.doc("Default socket connect timeout.")
.version("0.2.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("10s")
val NETWORK_MEMORY_ALLOCATOR_ALLOW_CACHE: ConfigEntry[Boolean] =
buildConf("celeborn.network.memory.allocator.allowCache")
.categories("network")
.internal
.version("0.3.1")
.doc("When false, globally disable thread-local cache in the shared PooledByteBufAllocator.")
.booleanConf
.createWithDefault(false)
val NETWORK_MEMORY_ALLOCATOR_POOLED: ConfigEntry[Boolean] =
buildConf("celeborn.network.memory.allocator.pooled")
.categories("network")
.internal
.version("0.6.0")
.doc("If disabled, always use UnpooledByteBufAllocator for aggressive memory reclamation, " +
"this is helpful for cases that worker has high memory usage even after triming. " +
"Disabling would cause performace degression and higher CPU usage.")
.booleanConf
.createWithDefault(true)
val NETWORK_MEMORY_ALLOCATOR_SHARE: ConfigEntry[Boolean] =
buildConf("celeborn.network.memory.allocator.share")
.categories("network")
.internal
.version("0.3.0")
.doc("Whether to share memory allocator.")
.booleanConf
.createWithDefault(true)
val NETWORK_MEMORY_ALLOCATOR_ARENAS: OptionalConfigEntry[Int] =
buildConf("celeborn.network.memory.allocator.numArenas")
.categories("network")
.version("0.3.0")
.doc("Number of arenas for pooled memory allocator. Default value is Runtime.getRuntime.availableProcessors, min value is 2.")
.intConf
.createOptional
val NETWORK_WILDCARD_ADDRESS_BIND: ConfigEntry[Boolean] =
buildConf("celeborn.network.bind.wildcardAddress")
.categories("network")
.version("0.6.0")
.doc("When `true`, the bind address will be set to a wildcard address, while the advertise address will " +
"remain as whatever is set by `celeborn.network.advertise.preferIpAddress`. The wildcard address is a special " +
"local IP address, and usually refers to 'any' and can only be used for bind operations. In the case of IPv4, " +
"this is 0.0.0.0 and in the case of IPv6 this is ::0. This is helpful in dual-stack environments, where the " +
"service must listen to both IPv4 and IPv6 clients.")
.booleanConf
.createWithDefault(false)
val NETWORK_ADVERTISE_PREFER_IP: ConfigEntry[Boolean] =
buildConf("celeborn.network.advertise.preferIpAddress")
.categories("network")
.version("0.6.0")
.doc("When `true`, prefer to use IP address, otherwise FQDN for advertise address.")
.fallbackConf(NETWORK_BIND_PREFER_IP)
val NETWORK_MEMORY_ALLOCATOR_VERBOSE_METRIC: ConfigEntry[Boolean] =
buildConf("celeborn.network.memory.allocator.verbose.metric")
.categories("network")
.version("0.3.0")
.doc("Whether to enable verbose metric for pooled allocator.")
.booleanConf
.createWithDefault(false)
val PORT_MAX_RETRY: ConfigEntry[Int] =
buildConf("celeborn.port.maxRetries")
.categories("network")
.doc("When port is occupied, we will retry for max retry times.")
.version("0.2.0")
.intConf
.createWithDefault(1)
val RPC_IO_THREAD: OptionalConfigEntry[Int] =
buildConf("celeborn.rpc.io.threads")
.categories("network")
.doc("Netty IO thread number of NettyRpcEnv to handle RPC request. " +
"The default threads number is the number of runtime available processors.")
.version("0.2.0")
.intConf
.createOptional
val RPC_CONNECT_THREADS: ConfigEntry[Int] =
buildConf("celeborn.rpc.connect.threads")
.categories("network")
.version("0.2.0")
.intConf
.createWithDefault(64)
val RPC_LOOKUP_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.rpc.lookupTimeout")
.categories("network")
.version("0.2.0")
.doc("Timeout for RPC lookup operations.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("30s")
val RPC_ASK_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.rpc.askTimeout")
.categories("network")
.version("0.2.0")
.doc("Timeout for RPC ask operations. " +
"It's recommended to set at least `240s` when `HDFS` is enabled in `celeborn.storage.availableTypes`")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("60s")
val RPC_RETRY_WAIT: ConfigEntry[Long] =
buildConf("celeborn.rpc.retryWait")
.categories("network")
.version("0.5.4")
.doc("Time to wait before next retry on RpcTimeoutException.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1s")
val RPC_DISPATCHER_THREADS: ConfigEntry[Int] =
buildConf("celeborn.rpc.dispatcher.threads")
.withAlternative("celeborn.rpc.dispatcher.numThreads")
.categories("network")
.doc("Threads number of message dispatcher event loop. Default to 0, which is availableCore.")
.version("0.3.0")
.intConf
.createWithDefault(0)
val RPC_INBOX_CAPACITY: ConfigEntry[Int] =
buildConf("celeborn.rpc.inbox.capacity")
.categories("network")
.doc("Specifies size of the in memory bounded capacity.")
.version("0.5.0")
.intConf
.checkValue(
v => v >= 0,
"the capacity of inbox must be no less than 0, 0 means no limitation")
.createWithDefault(0)
val RPC_ROLE_DISPATHER_THREADS: ConfigEntry[Int] =
buildConf("celeborn.<role>.rpc.dispatcher.threads")
.categories("network")
.doc("Threads number of message dispatcher event loop for roles")
.fallbackConf(RPC_DISPATCHER_THREADS)
val RPC_SLOW_THRESHOLD: ConfigEntry[Long] =
buildConf("celeborn.rpc.slow.threshold")
.categories("network")
.doc("threshold for RPC framework to log slow RPC")
.version("0.6.0")
.timeConf(TimeUnit.NANOSECONDS)
.createWithDefaultString("1s")
val RPC_SLOW_INTERVAL: OptionalConfigEntry[Long] =
buildConf("celeborn.rpc.slow.interval")
.categories("network")
.doc("min interval (ms) for RPC framework to log slow RPC")
.version("0.6.0")
.timeConf(TimeUnit.MILLISECONDS)
.createOptional
val RPC_SUMMARY_DUMP_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.rpc.dump.interval")
.categories("network")
.doc("min interval (ms) for RPC framework to dump performance summary")
.version("0.6.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("60s")
val NETWORK_IO_MODE: OptionalConfigEntry[String] =
buildConf("celeborn.<module>.io.mode")
.categories("network")
.doc("Netty EventLoopGroup backend, available options: NIO, EPOLL. If epoll mode is available, the default IO mode is EPOLL; otherwise, the default is NIO.")
.stringConf
.transform(_.toUpperCase)
.checkValues(Set(IOMode.NIO.name(), IOMode.EPOLL.name()))
.createOptional
val NETWORK_IO_PREFER_DIRECT_BUFS: ConfigEntry[Boolean] =
buildConf("celeborn.<module>.io.preferDirectBufs")
.categories("network")
.doc("If true, we will prefer allocating off-heap byte buffers within Netty. " +
s"If setting <module> to `${TransportModuleConstants.RPC_APP_MODULE}`, " +
s"works for shuffle client. " +
s"If setting <module> to `${TransportModuleConstants.RPC_SERVICE_MODULE}`, " +
s"works for master or worker. " +
s"If setting <module> to `${TransportModuleConstants.DATA_MODULE}`, " +
s"it works for shuffle client push and fetch data. " +
s"If setting <module> to `${TransportModuleConstants.PUSH_MODULE}`, " +
s"it works for worker receiving push data. " +
s"If setting <module> to `${TransportModuleConstants.REPLICATE_MODULE}`, " +
s"it works for replicate server or client of worker replicating data to peer worker. " +
s"If setting <module> to `${TransportModuleConstants.FETCH_MODULE}`, " +
s"it works for worker fetch server.")
.booleanConf
.createWithDefault(true)
val NETWORK_IO_CONNECT_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.<module>.io.connectTimeout")
.categories("network")
.doc("Socket connect timeout. " +
s"If setting <module> to `${TransportModuleConstants.RPC_APP_MODULE}`, " +
s"works for shuffle client. " +
s"If setting <module> to `${TransportModuleConstants.RPC_SERVICE_MODULE}`, " +
s"works for master or worker. " +
s"If setting <module> to `${TransportModuleConstants.DATA_MODULE}`, " +
s"it works for shuffle client push and fetch data. " +
s"If setting <module> to `${TransportModuleConstants.REPLICATE_MODULE}`, " +
s"it works for the replicate client of worker replicating data to peer worker.")
.fallbackConf(NETWORK_CONNECT_TIMEOUT)
val NETWORK_IO_CONNECTION_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.<module>.io.connectionTimeout")
.categories("network")
.doc("Connection active timeout. " +
s"If setting <module> to `${TransportModuleConstants.RPC_APP_MODULE}`, " +
s"works for shuffle client. " +
s"If setting <module> to `${TransportModuleConstants.RPC_SERVICE_MODULE}`, " +
s"works for master or worker. " +
s"If setting <module> to `${TransportModuleConstants.DATA_MODULE}`, " +
s"it works for shuffle client push and fetch data. " +
s"If setting <module> to `${TransportModuleConstants.PUSH_MODULE}`, " +
s"it works for worker receiving push data. " +
s"If setting <module> to `${TransportModuleConstants.REPLICATE_MODULE}`, " +
s"it works for replicate server or client of worker replicating data to peer worker. " +
s"If setting <module> to `${TransportModuleConstants.FETCH_MODULE}`, " +
s"it works for worker fetch server.")
.fallbackConf(NETWORK_TIMEOUT)
val NETWORK_IO_NUM_CONNECTIONS_PER_PEER: ConfigEntry[Int] =
buildConf("celeborn.<module>.io.numConnectionsPerPeer")
.categories("network")
.doc("Number of concurrent connections between two nodes. " +
s"If setting <module> to `${TransportModuleConstants.RPC_APP_MODULE}`, " +
s"works for shuffle client. " +
s"If setting <module> to `${TransportModuleConstants.RPC_SERVICE_MODULE}`, " +
s"works for master or worker. " +
s"If setting <module> to `${TransportModuleConstants.DATA_MODULE}`, " +
s"it works for shuffle client push and fetch data. " +
s"If setting <module> to `${TransportModuleConstants.REPLICATE_MODULE}`, " +
s"it works for replicate client of worker replicating data to peer worker.")
.intConf
.createWithDefault(1)
val NETWORK_IO_BACKLOG: ConfigEntry[Int] =
buildConf("celeborn.<module>.io.backLog")
.categories("network")
.doc(
"Requested maximum length of the queue of incoming connections. Default 0 for no backlog. " +
s"If setting <module> to `${TransportModuleConstants.RPC_APP_MODULE}`, " +
s"works for shuffle client. " +
s"If setting <module> to `${TransportModuleConstants.RPC_SERVICE_MODULE}`, " +
s"works for master or worker. " +
s"If setting <module> to `${TransportModuleConstants.PUSH_MODULE}`, " +
s"it works for worker receiving push data. " +
s"If setting <module> to `${TransportModuleConstants.REPLICATE_MODULE}`, " +
s"it works for replicate server of worker replicating data to peer worker. " +
s"If setting <module> to `${TransportModuleConstants.FETCH_MODULE}`, " +
s"it works for worker fetch server.")
.intConf
.createWithDefault(0)
val NETWORK_IO_SERVER_THREADS: ConfigEntry[Int] =
buildConf("celeborn.<module>.io.serverThreads")
.categories("network")
.doc("Number of threads used in the server thread pool. Default to 0, which is 2x#cores. " +
s"If setting <module> to `${TransportModuleConstants.RPC_APP_MODULE}`, " +
s"works for shuffle client. " +
s"If setting <module> to `${TransportModuleConstants.RPC_SERVICE_MODULE}`, " +
s"works for master or worker. " +
s"If setting <module> to `${TransportModuleConstants.PUSH_MODULE}`, " +
s"it works for worker receiving push data. " +
s"If setting <module> to `${TransportModuleConstants.REPLICATE_MODULE}`, " +
s"it works for replicate server of worker replicating data to peer worker. " +
s"If setting <module> to `${TransportModuleConstants.FETCH_MODULE}`, " +
s"it works for worker fetch server.")
.intConf
.createWithDefault(0)
val NETWORK_IO_CLIENT_THREADS: ConfigEntry[Int] =
buildConf("celeborn.<module>.io.clientThreads")
.categories("network")
.doc("Number of threads used in the client thread pool. Default to 0, which is 2x#cores. " +
s"If setting <module> to `${TransportModuleConstants.RPC_APP_MODULE}`, " +
s"works for shuffle client. " +
s"If setting <module> to `${TransportModuleConstants.RPC_SERVICE_MODULE}`, " +
s"works for master or worker. " +
s"If setting <module> to `${TransportModuleConstants.DATA_MODULE}`, " +
s"it works for shuffle client push and fetch data. " +
s"If setting <module> to `${TransportModuleConstants.REPLICATE_MODULE}`, " +
s"it works for replicate client of worker replicating data to peer worker.")
.intConf
.createWithDefault(0)
val NETWORK_IO_CLIENT_CONFLICT_AVOID_CHOOSER_ENABLE: ConfigEntry[Boolean] =
buildConf("celeborn.<module>.io.conflictAvoidChooser.enable")
.categories("network")
.version("0.5.4")
.doc("Whether to use conflict avoid event executor chooser in the client thread pool. " +
s"If setting <module> to `${TransportModuleConstants.RPC_APP_MODULE}`, " +
s"works for shuffle client. " +
s"If setting <module> to `${TransportModuleConstants.RPC_SERVICE_MODULE}`, " +
s"works for master or worker. " +
s"If setting <module> to `${TransportModuleConstants.DATA_MODULE}`, " +
s"it works for shuffle client push and fetch data. " +
s"If setting <module> to `${TransportModuleConstants.REPLICATE_MODULE}`, " +
s"it works for replicate client of worker replicating data to peer worker.")
.booleanConf
.createWithDefault(false)
val NETWORK_IO_RECEIVE_BUFFER: ConfigEntry[Long] =
buildConf("celeborn.<module>.io.receiveBuffer")
.categories("network")
.doc("Receive buffer size (SO_RCVBUF). Note: the optimal size for receive buffer and send buffer " +
"should be latency * network_bandwidth. Assuming latency = 1ms, network_bandwidth = 10Gbps " +
"buffer size should be ~ 1.25MB. " +
s"If setting <module> to `${TransportModuleConstants.RPC_APP_MODULE}`, " +
s"works for shuffle client. " +
s"If setting <module> to `${TransportModuleConstants.RPC_SERVICE_MODULE}`, " +
s"works for master or worker. " +
s"If setting <module> to `${TransportModuleConstants.DATA_MODULE}`, " +
s"it works for shuffle client push and fetch data. " +
s"If setting <module> to `${TransportModuleConstants.PUSH_MODULE}`, " +
s"it works for worker receiving push data. " +
s"If setting <module> to `${TransportModuleConstants.REPLICATE_MODULE}`, " +
s"it works for replicate server or client of worker replicating data to peer worker. " +
s"If setting <module> to `${TransportModuleConstants.FETCH_MODULE}`, " +
s"it works for worker fetch server.")
.version("0.2.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(0)
val NETWORK_IO_SEND_BUFFER: ConfigEntry[Long] =
buildConf("celeborn.<module>.io.sendBuffer")
.categories("network")
.doc("Send buffer size (SO_SNDBUF). " +
s"If setting <module> to `${TransportModuleConstants.RPC_APP_MODULE}`, " +
s"works for shuffle client. " +
s"If setting <module> to `${TransportModuleConstants.RPC_SERVICE_MODULE}`, " +
s"works for master or worker. " +
s"If setting <module> to `${TransportModuleConstants.DATA_MODULE}`, " +
s"it works for shuffle client push and fetch data. " +
s"If setting <module> to `${TransportModuleConstants.PUSH_MODULE}`, " +
s"it works for worker receiving push data. " +
s"If setting <module> to `${TransportModuleConstants.REPLICATE_MODULE}`, " +
s"it works for replicate server or client of worker replicating data to peer worker. " +
s"If setting <module> to `${TransportModuleConstants.FETCH_MODULE}`, " +
s"it works for worker fetch server.")
.version("0.2.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(0)
val NETWORK_IO_MAX_RETRIES: ConfigEntry[Int] =
buildConf("celeborn.<module>.io.maxRetries")
.categories("network")
.doc(
"Max number of times we will try IO exceptions (such as connection timeouts) per request. " +
"If set to 0, we will not do any retries. " +
s"If setting <module> to `${TransportModuleConstants.DATA_MODULE}`, " +
s"it works for shuffle client push and fetch data. " +
s"If setting <module> to `${TransportModuleConstants.REPLICATE_MODULE}`, " +
s"it works for replicate client of worker replicating data to peer worker. " +
s"If setting <module> to `${TransportModuleConstants.PUSH_MODULE}`, " +
s"it works for Flink shuffle client push data.")
.intConf
.createWithDefault(3)
val NETWORK_IO_RETRY_WAIT: ConfigEntry[Long] =
buildConf("celeborn.<module>.io.retryWait")
.categories("network")
.doc("Time that we will wait in order to perform a retry after an IOException. " +
"Only relevant if maxIORetries > 0. " +
s"If setting <module> to `${TransportModuleConstants.DATA_MODULE}`, " +
s"it works for shuffle client push and fetch data. " +
s"If setting <module> to `${TransportModuleConstants.REPLICATE_MODULE}`, " +
s"it works for replicate client of worker replicating data to peer worker. " +
s"If setting <module> to `${TransportModuleConstants.PUSH_MODULE}`, " +
s"it works for Flink shuffle client push data.")
.version("0.2.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("5s")
val NETWORK_IO_LAZY_FD: ConfigEntry[Boolean] =
buildConf("celeborn.<module>.io.lazyFD")
.categories("network")
.doc("Whether to initialize FileDescriptor lazily or not. If true, file descriptors are created only " +
"when data is going to be transferred. This can reduce the number of open files. " +
s"If setting <module> to `${TransportModuleConstants.FETCH_MODULE}`, " +
s"it works for worker fetch server.")
.booleanConf
.createWithDefault(true)
val NETWORK_IO_STORAGE_MEMORY_MAP_THRESHOLD: ConfigEntry[Long] =
buildConf("celeborn.<module>.storage.memoryMapThreshold")
.withAlternative("celeborn.storage.memoryMapThreshold")
.categories("network")
.internal
.doc("Minimum size of a block that we should start using memory map rather than reading in through " +
"normal IO operations. This prevents Celeborn from memory mapping very small blocks. In general, " +
"memory mapping has high overhead for blocks close to or below the page size of the OS. " +
s"If setting <module> to `${TransportModuleConstants.FETCH_MODULE}`, " +
s"it works for worker fetch server.")
.version("0.3.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("2m")
val MAX_CHUNKS_BEING_TRANSFERRED: OptionalConfigEntry[Long] =
buildConf("celeborn.shuffle.io.maxChunksBeingTransferred")
.categories("network")
.doc("The max number of chunks allowed to be transferred at the same time on shuffle service. Note " +
"that new incoming connections will be closed when the max number is hit. The client will retry " +
"according to the shuffle retry configs (see `celeborn.<module>.io.maxRetries` and " +
"`celeborn.<module>.io.retryWait`), if those limits are reached the task will fail with fetch failure.")
.version("0.2.0")
.longConf
.createOptional
val PUSH_TIMEOUT_CHECK_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.<module>.push.timeoutCheck.interval")
.categories("network")
.doc("Interval for checking push data timeout. " +
s"If setting <module> to `${TransportModuleConstants.DATA_MODULE}`, " +
s"it works for shuffle client push data. " +
s"If setting <module> to `${TransportModuleConstants.PUSH_MODULE}`, " +
s"it works for Flink shuffle client push data. " +
s"If setting <module> to `${TransportModuleConstants.REPLICATE_MODULE}`, " +
s"it works for replicate client of worker replicating data to peer worker.")
.version("0.3.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("5s")
val PUSH_TIMEOUT_CHECK_THREADS: ConfigEntry[Int] =
buildConf("celeborn.<module>.push.timeoutCheck.threads")
.categories("network")
.doc("Threads num for checking push data timeout. " +
s"If setting <module> to `${TransportModuleConstants.DATA_MODULE}`, " +
s"it works for shuffle client push data. " +
s"If setting <module> to `${TransportModuleConstants.PUSH_MODULE}`, " +
s"it works for Flink shuffle client push data. " +
s"If setting <module> to `${TransportModuleConstants.REPLICATE_MODULE}`, " +
s"it works for replicate client of worker replicating data to peer worker.")
.version("0.3.0")
.intConf
.createWithDefault(4)
val FETCH_TIMEOUT_CHECK_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.<module>.fetch.timeoutCheck.interval")
.categories("network")
.doc("Interval for checking fetch data timeout. " +
s"It only support setting <module> to `${TransportModuleConstants.DATA_MODULE}` " +
s"since it works for shuffle client fetch data.")
.version("0.3.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("5s")
val FETCH_TIMEOUT_CHECK_THREADS: ConfigEntry[Int] =
buildConf("celeborn.<module>.fetch.timeoutCheck.threads")
.categories("network")
.doc("Threads num for checking fetch data timeout. " +
s"It only support setting <module> to `${TransportModuleConstants.DATA_MODULE}` " +
s"since it works for shuffle client fetch data.")
.version("0.3.0")
.intConf
.createWithDefault(4)
val CHANNEL_HEARTBEAT_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.<module>.heartbeat.interval")
.withAlternative("celeborn.client.heartbeat.interval")
.categories("network")
.version("0.3.0")
.doc("The heartbeat interval between worker and client. " +
s"If setting <module> to `${TransportModuleConstants.RPC_APP_MODULE}`, " +
s"works for shuffle client. " +
s"If setting <module> to `${TransportModuleConstants.RPC_SERVICE_MODULE}`, " +
s"works for master or worker. " +
s"If setting <module> to `${TransportModuleConstants.DATA_MODULE}`, " +
s"it works for shuffle client push and fetch data. " +
s"If setting <module> to `${TransportModuleConstants.REPLICATE_MODULE}`, " +
s"it works for replicate client of worker replicating data to peer worker. " +
"If you are using the \"celeborn.client.heartbeat.interval\", " +
"please use the new configs for each module according to your needs or " +
"replace it with \"celeborn.rpc.heartbeat.interval\", " +
"\"celeborn.data.heartbeat.interval\" and " +
"\"celeborn.replicate.heartbeat.interval\". ")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("60s")
val MASTER_ENDPOINTS_RESOLVER: ConfigEntry[String] =
buildConf("celeborn.master.endpoints.resolver")
.categories("client", "worker")
.doc("Resolver class that can be used for discovering and updating the master endpoints. This allows " +
"users to provide a custom master endpoint resolver implementation. This is useful in environments " +
"where the master nodes might change due to scaling operations or infrastructure updates. Clients " +
"need to ensure that provided resolver class should be present in the classpath.")
.version("0.5.2")
.stringConf
.checkValue(
resolver => Utils.classIsLoadable(resolver),
"Resolver class was not found in the classpath. Please check the class name " +
"and ensure that it is present in classpath")
.createWithDefault("org.apache.celeborn.common.client.StaticMasterEndpointResolver")
val MASTER_ENDPOINTS: ConfigEntry[Seq[String]] =
buildConf("celeborn.master.endpoints")
.categories("client", "worker")
.doc("Endpoints of master nodes for celeborn clients to connect. Client uses resolver provided by " +
s"${MASTER_ENDPOINTS_RESOLVER.key} to resolve the master endpoints. By default Celeborn uses " +
"`org.apache.celeborn.common.client.StaticMasterEndpointResolver` which take static master endpoints " +
"as input. Allowed pattern: `<host1>:<port1>[,<host2>:<port2>]*`, e.g. `clb1:9097,clb2:9098,clb3:9099`. " +
"If the port is omitted, 9097 will be used. If the master endpoints are not static then users can pass " +
s"custom resolver implementation to discover master endpoints actively using ${MASTER_ENDPOINTS_RESOLVER.key}.")
.version("0.2.0")
.stringConf
.toSequence
.createWithDefaultString(s"<localhost>:9097")
val MASTER_CLIENT_RPC_ASK_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.masterClient.rpc.askTimeout")
.withAlternative("celeborn.rpc.haClient.askTimeout")
.internal
.categories("client", "worker")
.version("0.3.0")
.doc("Timeout for HA client RPC ask operations.")
.fallbackConf(RPC_ASK_TIMEOUT)
val MASTER_CLIENT_MAX_RETRIES: ConfigEntry[Int] =
buildConf("celeborn.masterClient.maxRetries")
.withAlternative("celeborn.client.maxRetries")
.internal
.categories("client", "worker")
.doc("Max retry times for client to connect master endpoint. The number of retries will be at least equal to the number of master endpoints.")
.version("0.3.0")
.intConf
.createWithDefault(15)
val APPLICATION_HEARTBEAT_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.master.heartbeat.application.timeout")
.withAlternative("celeborn.application.heartbeat.timeout")
.categories("master")
.version("0.3.0")
.doc("Application heartbeat timeout.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("300s")
val HDFS_EXPIRE_DIRS_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.master.hdfs.expireDirs.timeout")
.categories("master")
.version("0.3.0")
.doc("The timeout for a expire dirs to be deleted on HDFS.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1h")
val DFS_EXPIRE_DIRS_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.master.dfs.expireDirs.timeout")
.categories("master")
.version("0.6.0")
.doc("The timeout for a expire dirs to be deleted on S3 or HDFS.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1h")
val WORKER_HEARTBEAT_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.master.heartbeat.worker.timeout")
.withAlternative("celeborn.worker.heartbeat.timeout")
.categories("master")
.version("0.3.0")
.doc("Worker heartbeat timeout.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("120s")
val ALLOW_WORKER_HOST_PATTERN: OptionalConfigEntry[Regex] =
buildConf("celeborn.master.allowWorkerHostPattern")
.categories("master")
.version("0.6.0")
.doc("Pattern of worker host that allowed to register with the master." +
" If not set, all workers are allowed to register.")
.regexConf
.createOptional
val DENY_WORKER_HOST_PATTERN: OptionalConfigEntry[Regex] =
buildConf("celeborn.master.denyWorkerHostPattern")
.categories("master")
.version("0.6.0")
.doc("Pattern of worker host that denied to register with the master." +
" If not set, no workers are denied to register.")
.regexConf
.createOptional
val WORKER_UNAVAILABLE_INFO_EXPIRE_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.master.workerUnavailableInfo.expireTimeout")
.categories("master")
.version("0.3.1")
.doc("Worker unavailable info would be cleared when the retention period is expired." +
" Set -1 to disable the expiration.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1800s")
val MASTER_HOST: ConfigEntry[String] =
buildConf("celeborn.master.host")
.categories("master")
.version("0.2.0")
.doc("Hostname for master to bind.")
.stringConf
.createWithDefaultString("<localhost>")
val MASTER_HTTP_HOST: ConfigEntry[String] =
buildConf("celeborn.master.http.host")
.withAlternative("celeborn.metrics.master.prometheus.host")
.withAlternative("celeborn.master.metrics.prometheus.host")
.categories("master")
.version("0.4.0")
.doc("Master's http host.")
.stringConf
.createWithDefaultString("<localhost>")
val MASTER_PORT: ConfigEntry[Int] =
buildConf("celeborn.master.port")
.categories("master")
.version("0.2.0")
.doc("Port for master to bind.")
.intConf
.checkValue(p => p >= 1024 && p < 65535, "Invalid port")
.createWithDefault(9097)
val MASTER_HTTP_PORT: ConfigEntry[Int] =
buildConf("celeborn.master.http.port")
.withAlternative("celeborn.metrics.master.prometheus.port")
.withAlternative("celeborn.master.metrics.prometheus.port")
.categories("master")
.version("0.4.0")
.doc("Master's http port.")
.intConf
.checkValue(p => p >= 1024 && p < 65535, "Invalid port")
.createWithDefault(9098)
val MASTER_HTTP_MAX_WORKER_THREADS: ConfigEntry[Int] =
buildConf("celeborn.master.http.maxWorkerThreads")
.categories("master")
.version("0.5.0")
.doc("Maximum number of threads in the master http worker thread pool.")
.intConf
.checkValue(_ > 0, "Must be positive.")
.createWithDefault(200)
val MASTER_HTTP_STOP_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.master.http.stopTimeout")
.categories("master")
.version("0.5.0")
.doc("Master http server stop timeout.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("5s")
val MASTER_HTTP_IDLE_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.master.http.idleTimeout")
.categories("master")
.version("0.5.0")
.doc("Master http server idle timeout.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("30s")
val MASTER_HTTP_AUTH_SUPPORTED_SCHEMES: ConfigEntry[Seq[String]] =
buildConf("celeborn.master.http.auth.supportedSchemes")
.categories("master")
.version("0.6.0")
.doc("A comma-separated list of master http auth supported schemes." +
"<ul>" +
" <li>SPNEGO: Kerberos/GSSAPI authentication.</li>" +
" <li>BASIC: User-defined password authentication, the concreted implementation is" +
" configurable via `celeborn.master.http.auth.basic.provider`.</li>" +
" <li>BEARER: User-defined bearer token authentication, the concreted implementation is" +
" configurable via `celeborn.master.http.auth.bearer.provider`.</li>" +
"</ul>")
.stringConf
.toSequence
.createWithDefault(Nil)
val MASTER_HTTP_SPNEGO_KEYTAB: OptionalConfigEntry[String] =
buildConf("celeborn.master.http.spnego.keytab")
.categories("master")
.version("0.6.0")
.doc("The keytab file for SPNego authentication.")
.stringConf
.createOptional
val MASTER_HTTP_SPNEGO_PRINCIPAL: OptionalConfigEntry[String] =
buildConf("celeborn.master.http.spnego.principal")
.categories("master")
.version("0.6.0")
.doc("SPNego service principal, typical value would look like HTTP/_HOST@EXAMPLE.COM." +
" SPNego service principal would be used when celeborn http authentication is enabled." +
" This needs to be set only if SPNEGO is to be used in authentication.")
.stringConf
.createOptional
val MASTER_HTTP_PROXY_CLIENT_IP_HEADER: ConfigEntry[String] =
buildConf("celeborn.master.http.proxy.client.ip.header")
.categories("master")
.doc("The HTTP header to record the real client IP address. If your server is behind a load" +
" balancer or other proxy, the server will see this load balancer or proxy IP address as" +
" the client IP address, to get around this common issue, most load balancers or proxies" +
" offer the ability to record the real remote IP address in an HTTP header that will be" +
" added to the request for other devices to use. Note that, because the header value can" +
" be specified to any IP address, so it will not be used for authentication.")
.version("0.6.0")
.stringConf
.createWithDefault("X-Real-IP")
val MASTER_HTTP_AUTH_BASIC_PROVIDER: ConfigEntry[String] =
buildConf("celeborn.master.http.auth.basic.provider")
.categories("master")
.version("0.6.0")
.doc("User-defined password authentication implementation of " +
"org.apache.celeborn.spi.authentication.PasswdAuthenticationProvider")
.stringConf
.createWithDefault(classOf[AnonymousAuthenticationProviderImpl].getName)
val MASTER_HTTP_AUTH_BEARER_PROVIDER: ConfigEntry[String] =
buildConf("celeborn.master.http.auth.bearer.provider")
.categories("master")
.version("0.6.0")
.doc("User-defined token authentication implementation of " +
"org.apache.celeborn.spi.authentication.TokenAuthenticationProvider")
.stringConf
.createWithDefault(classOf[AnonymousAuthenticationProviderImpl].getName)
val MASTER_HTTP_AUTH_ADMINISTERS: ConfigEntry[Seq[String]] =
buildConf("celeborn.master.http.auth.administers")
.categories("master")
.version("0.6.0")
.doc("A comma-separated list of users who have admin privileges," +
s" Note, when ${MASTER_HTTP_AUTH_SUPPORTED_SCHEMES.key} is not set," +
s" everyone is treated as administrator.")
.stringConf
.toSequence
.createWithDefault(Seq.empty)
val MASTER_HTTP_SSL_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.master.http.ssl.enabled")
.categories("master")
.version("0.6.0")
.doc("Set this to true for using SSL encryption in http server.")
.booleanConf
.createWithDefault(false)
val MASTER_HTTP_SSL_KEYSTORE_PATH: OptionalConfigEntry[String] =
buildConf("celeborn.master.http.ssl.keystore.path")
.categories("master")
.version("0.6.0")
.doc("SSL certificate keystore location.")
.stringConf
.createOptional
val MASTER_HTTP_SSL_KEYSTORE_PASSWORD: OptionalConfigEntry[String] =
buildConf("celeborn.master.http.ssl.keystore.password")
.categories("master")
.version("0.6.0")
.doc("SSL certificate keystore password.")
.stringConf
.createOptional
val MASTER_HTTP_SSL_KEYSTORE_TYPE: OptionalConfigEntry[String] =
buildConf("celeborn.master.http.ssl.keystore.type")
.categories("master")
.version("0.6.0")
.doc("SSL certificate keystore type.")
.stringConf
.createOptional
val MASTER_HTTP_SSL_KEYSTORE_ALGORITHM: OptionalConfigEntry[String] =
buildConf("celeborn.master.http.ssl.keystore.algorithm")
.categories("master")
.version("0.6.0")
.doc("SSL certificate keystore algorithm.")
.stringConf
.createOptional
val MASTER_HTTP_SSL_DISALLOWED_PROTOCOLS: ConfigEntry[Seq[String]] =
buildConf("celeborn.master.http.ssl.disallowed.protocols")
.categories("master")
.version("0.6.0")
.doc("SSL versions to disallow.")
.stringConf
.toSequence
.createWithDefault(Seq("SSLv2", "SSLv3"))
val MASTER_HTTP_SSL_INCLUDE_CIPHER_SUITES: ConfigEntry[Seq[String]] =
buildConf("celeborn.master.http.ssl.include.ciphersuites")
.categories("master")
.version("0.6.0")
.doc("A comma-separated list of include SSL cipher suite names.")
.stringConf
.toSequence
.createWithDefault(Nil)
val HA_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.master.ha.enabled")
.withAlternative("celeborn.ha.enabled")
.categories("ha")
.version("0.3.0")
.doc("When true, master nodes run as Raft cluster mode.")
.booleanConf
.createWithDefault(false)
val HA_MASTER_NODE_ID: OptionalConfigEntry[String] =
buildConf("celeborn.master.ha.node.id")
.withAlternative("celeborn.ha.master.node.id")
.doc("Node id for master raft cluster in HA mode, if not define, " +
"will be inferred by hostname.")
.version("0.3.0")
.stringConf
.createOptional
val HA_MASTER_NODE_HOST: ConfigEntry[String] =
buildConf("celeborn.master.ha.node.<id>.host")
.withAlternative("celeborn.ha.master.node.<id>.host")
.categories("ha")
.doc("Host to bind of master node <id> in HA mode.")
.version("0.3.0")
.stringConf
.createWithDefaultString("<required>")
val HA_MASTER_NODE_PORT: ConfigEntry[Int] =
buildConf("celeborn.master.ha.node.<id>.port")
.withAlternative("celeborn.ha.master.node.<id>.port")
.categories("ha")
.doc("Port to bind of master node <id> in HA mode.")
.version("0.3.0")
.intConf
.checkValue(p => p >= 1024 && p < 65535, "Invalid port")
.createWithDefault(9097)
val HA_MASTER_NODE_RATIS_HOST: OptionalConfigEntry[String] =
buildConf("celeborn.master.ha.node.<id>.ratis.host")
.withAlternative("celeborn.ha.master.node.<id>.ratis.host")
.internal
.categories("ha")
.doc("Ratis host to bind of master node <id> in HA mode. If not provided, " +
s"fallback to ${HA_MASTER_NODE_HOST.key}.")
.version("0.3.0")
.stringConf
.createOptional
val HA_MASTER_NODE_RATIS_PORT: ConfigEntry[Int] =
buildConf("celeborn.master.ha.node.<id>.ratis.port")
.withAlternative("celeborn.ha.master.node.<id>.ratis.port")
.categories("ha")
.doc("Ratis port to bind of master node <id> in HA mode.")
.version("0.3.0")
.intConf
.checkValue(p => p >= 1024 && p < 65535, "Invalid port")
.createWithDefault(9872)
val HA_MASTER_RATIS_RPC_TYPE: ConfigEntry[String] =
buildConf("celeborn.master.ha.ratis.raft.rpc.type")
.withAlternative("celeborn.ha.master.ratis.raft.rpc.type")
.categories("ha")
.doc("RPC type for Ratis, available options: netty, grpc.")
.version("0.3.0")
.stringConf
.transform(_.toLowerCase)
.checkValues(Set("netty", "grpc"))
.createWithDefault("netty")
val HA_MASTER_RATIS_STORAGE_DIR: ConfigEntry[String] =
buildConf("celeborn.master.ha.ratis.raft.server.storage.dir")
.withAlternative("celeborn.ha.master.ratis.raft.server.storage.dir")
.categories("ha")
.doc("Root storage directory to hold RaftServer data.")
.version("0.3.0")
.stringConf
.createWithDefault("/tmp/ratis")
val HA_MASTER_RATIS_STORAGE_STARTUP_OPTION: ConfigEntry[String] =
buildConf("celeborn.master.ha.ratis.raft.server.storage.startup.option")
.categories("ha")
.doc("Startup option of RaftServer storage. Available options: RECOVER, FORMAT.")
.version("0.5.0")
.stringConf
.checkValues(Set("RECOVER", "FORMAT"))
.createWithDefault("RECOVER")
val HA_MASTER_RATIS_LOG_SEGMENT_SIZE_MAX: ConfigEntry[Long] =
buildConf("celeborn.master.ha.ratis.raft.server.log.segment.size.max")
.withAlternative("celeborn.ha.master.ratis.raft.server.log.segment.size.max")
.internal
.categories("ha")
.version("0.3.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("4MB")
val HA_MASTER_RATIS_LOG_PREALLOCATED_SIZE: ConfigEntry[Long] =
buildConf("celeborn.master.ha.ratis.raft.server.log.preallocated.size")
.withAlternative("celeborn.ha.master.ratis.raft.server.log.preallocated.size")
.internal
.categories("ha")
.version("0.3.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("4MB")
val HA_MASTER_RATIS_LOG_WRITE_BUFFER_SIZE: ConfigEntry[Long] =
buildConf("celeborn.master.ha.ratis.raft.server.log.write.buffer.size")
.internal
.categories("ha")
.version("0.5.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("36MB")
val HA_MASTER_RATIS_LOG_APPENDER_QUEUE_NUM_ELEMENTS: ConfigEntry[Int] =
buildConf("celeborn.master.ha.ratis.raft.server.log.appender.buffer.element-limit")
.withAlternative("celeborn.ha.master.ratis.raft.server.log.appender.buffer.element-limit")
.internal
.categories("ha")
.version("0.3.0")
.intConf
.createWithDefault(1024)
val HA_MASTER_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT: ConfigEntry[Long] =
buildConf("celeborn.master.ha.ratis.raft.server.log.appender.buffer.byte-limit")
.withAlternative("celeborn.ha.master.ratis.raft.server.log.appender.buffer.byte-limit")
.internal
.categories("ha")
.version("0.3.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("32MB")
val HA_MASTER_RATIS_LOG_INSTALL_SNAPSHOT_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.master.ha.ratis.raft.server.log.appender.install.snapshot.enabled")
.withAlternative("celeborn.ha.master.ratis.raft.server.log.appender.install.snapshot.enabled")
.internal
.categories("ha")
.version("0.3.0")
.booleanConf
.createWithDefault(true)
val HA_MASTER_RATIS_LOG_PURGE_GAP: ConfigEntry[Int] =
buildConf("celeborn.master.ha.ratis.raft.server.log.purge.gap")
.withAlternative("celeborn.ha.master.ratis.raft.server.log.purge.gap")
.internal
.categories("ha")
.version("0.3.0")
.intConf
.createWithDefault(1000000)
val HA_MASTER_RATIS_RPC_REQUEST_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.master.ha.ratis.raft.server.rpc.request.timeout")
.withAlternative("celeborn.ha.master.ratis.raft.server.rpc.request.timeout")
.internal
.categories("ha")
.version("0.3.0")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("3s")
val HA_MASTER_RATIS_SERVER_RETRY_CACHE_EXPIRY_TIME: ConfigEntry[Long] =
buildConf("celeborn.master.ha.ratis.raft.server.retrycache.expirytime")
.withAlternative("celeborn.ha.master.ratis.raft.server.retrycache.expirytime")
.internal
.categories("ha")
.version("0.3.0")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("600s")
val HA_MASTER_RATIS_RPC_TIMEOUT_MIN: ConfigEntry[Long] =
buildConf("celeborn.master.ha.ratis.raft.server.rpc.timeout.min")
.withAlternative("celeborn.ha.master.ratis.raft.server.rpc.timeout.min")
.internal
.categories("ha")
.version("0.3.0")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("3s")
val HA_MASTER_RATIS_RPC_TIMEOUT_MAX: ConfigEntry[Long] =
buildConf("celeborn.master.ha.ratis.raft.server.rpc.timeout.max")
.withAlternative("celeborn.ha.master.ratis.raft.server.rpc.timeout.max")
.internal
.categories("ha")
.version("0.3.0")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("5s")
val HA_MASTER_RATIS_CLIENT_RPC_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.master.ha.ratis.raft.client.rpc.timeout")
.internal
.categories("ha")
.version("0.3.2")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("10s")
val HA_MASTER_RATIS_CLIENT_RPC_WATCH_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.master.ha.ratis.raft.client.rpc.watch.timeout")
.internal
.categories("ha")
.version("0.3.2")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("20s")
val HA_MASTER_RATIS_FIRSTELECTION_TIMEOUT_MIN: ConfigEntry[Long] =
buildConf("celeborn.master.ha.ratis.first.election.timeout.min")
.withAlternative("celeborn.ha.master.ratis.first.election.timeout.min")
.internal
.categories("ha")
.version("0.3.0")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("3s")
val HA_MASTER_RATIS_FIRSTELECTION_TIMEOUT_MAX: ConfigEntry[Long] =
buildConf("celeborn.master.ha.ratis.first.election.timeout.max")
.withAlternative("celeborn.ha.master.ratis.first.election.timeout.max")
.internal
.categories("ha")
.version("0.3.0")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("5s")
val HA_MASTER_RATIS_LEADER_ELECTION_MEMBER_MAJORITY_ADD: ConfigEntry[Boolean] =
buildConf("celeborn.master.ha.ratis.leader.election.member.majority.add")
.internal
.categories("ha")
.version("0.6.0")
.booleanConf
.createWithDefault(true)
val HA_MASTER_RATIS_NOTIFICATION_NO_LEADER_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.master.ha.ratis.raft.server.notification.no-leader.timeout")
.withAlternative("celeborn.ha.master.ratis.raft.server.notification.no-leader.timeout")
.internal
.categories("ha")
.version("0.3.0")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("30s")
val HA_MASTER_RATIS_RPC_SLOWNESS_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.master.ha.ratis.raft.server.rpc.slowness.timeout")
.withAlternative("celeborn.ha.master.ratis.raft.server.rpc.slowness.timeout")
.internal
.categories("ha")
.version("0.3.0")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("120s")
val HA_MASTER_RATIS_ROLE_CHECK_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.master.ha.ratis.raft.server.role.check.interval")
.withAlternative("celeborn.ha.master.ratis.raft.server.role.check.interval")
.internal
.categories("ha")
.version("0.3.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1s")
val HA_MASTER_RATIS_SNAPSHOT_AUTO_TRIGGER_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.master.ha.ratis.raft.server.snapshot.auto.trigger.enabled")
.withAlternative("celeborn.ha.master.ratis.raft.server.snapshot.auto.trigger.enabled")
.internal
.categories("ha")
.version("0.3.0")
.booleanConf
.createWithDefault(true)
val HA_MASTER_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD: ConfigEntry[Long] =
buildConf("celeborn.master.ha.ratis.raft.server.snapshot.auto.trigger.threshold")
.withAlternative("celeborn.ha.master.ratis.raft.server.snapshot.auto.trigger.threshold")
.internal
.categories("ha")
.version("0.3.0")
.longConf
.createWithDefault(200000L)
val HA_MASTER_RATIS_SNAPSHOT_RETENTION_FILE_NUM: ConfigEntry[Int] =
buildConf("celeborn.master.ha.ratis.raft.server.snapshot.retention.file.num")
.withAlternative("celeborn.ha.master.ratis.raft.server.snapshot.retention.file.num")
.internal
.categories("ha")
.version("0.3.0")
.intConf
.createWithDefault(3)
val MASTER_PERSIST_WORKER_NETWORK_LOCATION: ConfigEntry[Boolean] =
buildConf("celeborn.master.persist.workerNetworkLocation")
.categories("master")
.version("0.6.0")
.booleanConf
.createWithDefault(false)
val MASTER_SLOT_ASSIGN_POLICY: ConfigEntry[String] =
buildConf("celeborn.master.slot.assign.policy")
.withAlternative("celeborn.slots.assign.policy")
.categories("master")
.version("0.3.0")
.doc("Policy for master to assign slots, Celeborn supports two types of policy: roundrobin and loadaware. " +
"Loadaware policy will be ignored when `HDFS` is enabled in `celeborn.storage.availableTypes`")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValues(Set(
SlotsAssignPolicy.ROUNDROBIN.name,
SlotsAssignPolicy.LOADAWARE.name))
.createWithDefault(SlotsAssignPolicy.ROUNDROBIN.name)
val MASTER_SLOT_ASSIGN_LOADAWARE_DISKGROUP_NUM: ConfigEntry[Int] =
buildConf("celeborn.master.slot.assign.loadAware.numDiskGroups")
.withAlternative("celeborn.slots.assign.loadAware.numDiskGroups")
.categories("master")
.doc("This configuration is a guidance for load-aware slot allocation algorithm. " +
"This value is control how many disk groups will be created.")
.version("0.3.0")
.intConf
.createWithDefault(5)
val MASTER_SLOT_ASSIGN_LOADAWARE_DISKGROUP_GRADIENT: ConfigEntry[Double] =
buildConf("celeborn.master.slot.assign.loadAware.diskGroupGradient")
.withAlternative("celeborn.slots.assign.loadAware.diskGroupGradient")
.categories("master")
.doc("This value means how many more workload will be placed into a faster disk group " +
"than a slower group.")
.version("0.3.0")
.doubleConf
.createWithDefault(0.1)
val MASTER_SLOT_ASSIGN_LOADAWARE_FLUSHTIME_WEIGHT: ConfigEntry[Double] =
buildConf("celeborn.master.slot.assign.loadAware.flushTimeWeight")
.withAlternative("celeborn.slots.assign.loadAware.flushTimeWeight")
.categories("master")
.doc(
"Weight of average flush time when calculating ordering in load-aware assignment strategy")
.version("0.3.0")
.doubleConf
.createWithDefault(0)
val MASTER_SLOT_ASSIGN_LOADAWARE_FETCHTIME_WEIGHT: ConfigEntry[Double] =
buildConf("celeborn.master.slot.assign.loadAware.fetchTimeWeight")
.withAlternative("celeborn.slots.assign.loadAware.fetchTimeWeight")
.categories("master")
.doc(
"Weight of average fetch time when calculating ordering in load-aware assignment strategy")
.version("0.3.0")
.doubleConf
.createWithDefault(1)
val MASTER_SLOT_ASSIGN_EXTRA_SLOTS: ConfigEntry[Int] =
buildConf("celeborn.master.slot.assign.extraSlots")
.withAlternative("celeborn.slots.assign.extraSlots")
.categories("master")
.version("0.3.0")
.doc("Extra slots number when master assign slots.")
.intConf
.createWithDefault(100)
val MASTER_SLOT_ASSIGN_MAX_WORKERS: ConfigEntry[Int] =
buildConf("celeborn.master.slot.assign.maxWorkers")
.categories("master")
.version("0.3.1")
.doc("Max workers that slots of one shuffle can be allocated on. Will choose the smaller positive one " +
s"from Master side and Client side, see `celeborn.client.slot.assign.maxWorkers`.")
.intConf
.createWithDefault(10000)
val ESTIMATED_PARTITION_SIZE_INITIAL_SIZE: ConfigEntry[Long] =
buildConf("celeborn.master.estimatedPartitionSize.initialSize")
.withAlternative("celeborn.shuffle.initialEstimatedPartitionSize")
.categories("master")
.doc("Initial partition size for estimation, it will change according to runtime stats.")
.version("0.3.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("64mb")
val ESTIMATED_PARTITION_SIZE_MAX_SIZE: OptionalConfigEntry[Long] =
buildConf("celeborn.master.estimatedPartitionSize.maxSize")
.categories("master")
.doc("Max partition size for estimation. Default value should be celeborn.worker.shuffle.partitionSplit.max * 2.")
.version("0.4.1")
.bytesConf(ByteUnit.BYTE)
.createOptional
val ESTIMATED_PARTITION_SIZE_MIN_SIZE: ConfigEntry[Long] =
buildConf("celeborn.master.estimatedPartitionSize.minSize")
.withAlternative("celeborn.shuffle.minPartitionSizeToEstimate")
.categories("master", "worker")
.doc(
"Ignore partition size smaller than this configuration of partition size for estimation.")
.version("0.3.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("8mb")
val ESTIMATED_PARTITION_SIZE_UPDATE_INITIAL_DELAY: ConfigEntry[Long] =
buildConf("celeborn.master.estimatedPartitionSize.update.initialDelay")
.withAlternative("celeborn.shuffle.estimatedPartitionSize.update.initialDelay")
.categories("master")
.doc("Initial delay time before start updating partition size for estimation.")
.version("0.3.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("5min")
val ESTIMATED_PARTITION_SIZE_UPDATE_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.master.estimatedPartitionSize.update.interval")
.withAlternative("celeborn.shuffle.estimatedPartitionSize.update.interval")
.categories("master")
.doc("Interval of updating partition size for estimation.")
.version("0.3.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("10min")
val MASTER_RESOURCE_CONSUMPTION_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.master.userResourceConsumption.update.interval")
.categories("master")
.doc("Time length for a window about compute user resource consumption.")
.version("0.3.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("30s")
val CLUSTER_NAME: ConfigEntry[String] =
buildConf("celeborn.cluster.name")
.categories("master", "worker")
.version("0.5.0")
.doc("Celeborn cluster name.")
.stringConf
.createWithDefaultString("default")
val SHUFFLE_CHUNK_SIZE: ConfigEntry[Long] =
buildConf("celeborn.shuffle.chunk.size")
.categories("worker")
.version("0.2.0")
.doc("Max chunk size of reducer's merged shuffle data. For example, if a reducer's " +
"shuffle data is 128M and the data will need 16 fetch chunk requests to fetch.")
.bytesConf(ByteUnit.BYTE)
.checkValue(v => v < Integer.MAX_VALUE, "Chunk size can not be larger than 2GB")
.createWithDefaultString("8m")
val CLIENT_FETCH_DFS_READ_CHUNK_SIZE: ConfigEntry[Long] =
buildConf("celeborn.client.fetch.dfsReadChunkSize")
.categories("client")
.version("0.3.1")
.doc("Max chunk size for DfsPartitionReader.")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("8m")
val WORKER_PARTITION_SPLIT_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.worker.shuffle.partitionSplit.enabled")
.withAlternative("celeborn.worker.partition.split.enabled")
.categories("worker")
.version("0.3.0")
.doc("enable the partition split on worker side")
.booleanConf
.createWithDefault(true)
val WORKER_PARTITION_SPLIT_MIN_SIZE: ConfigEntry[Long] =
buildConf("celeborn.worker.shuffle.partitionSplit.min")
.withAlternative("celeborn.shuffle.partitionSplit.min")
.categories("worker")
.doc("Min size for a partition to split")
.version("0.3.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("1m")
val WORKER_PARTITION_SPLIT_MAX_SIZE: ConfigEntry[Long] =
buildConf("celeborn.worker.shuffle.partitionSplit.max")
.categories("worker")
.doc("Specify the maximum partition size for splitting, and ensure that individual partition files are always smaller than this limit.")
.version("0.3.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("2g")
val WORKER_STORAGE_DIRS: OptionalConfigEntry[Seq[String]] =
buildConf("celeborn.worker.storage.dirs")
.categories("worker")
.version("0.2.0")
.doc("Directory list to store shuffle data. It's recommended to configure one directory " +
"on each disk. Storage size limit can be set for each directory. For the sake of " +
"performance, there should be no more than 2 flush threads " +
"on the same disk partition if you are using HDD, and should be 8 or more flush threads " +
"on the same disk partition if you are using SSD. For example: " +
"`dir1[:capacity=][:disktype=][:flushthread=],dir2[:capacity=][:disktype=][:flushthread=]`")
.stringConf
.toSequence
.createOptional
val WORKER_WORKING_DIR: ConfigEntry[String] =
buildConf("celeborn.worker.storage.workingDir")
.withAlternative("celeborn.worker.workingDir")
.categories("worker")
.doc("Worker's working dir path name.")
.version("0.3.0")
.stringConf
.createWithDefault("celeborn-worker/shuffle_data")
val WORKER_STORAGE_BASE_DIR_PREFIX: ConfigEntry[String] =
buildConf("celeborn.worker.storage.baseDir.prefix")
.internal
.categories("worker")
.version("0.2.0")
.doc("Base directory for Celeborn worker to write if " +
s"`${WORKER_STORAGE_DIRS.key}` is not set.")
.stringConf
.createWithDefault("/mnt/disk")
val WORKER_STORAGE_BASE_DIR_COUNT: ConfigEntry[Int] =
buildConf("celeborn.worker.storage.baseDir.number")
.internal
.categories("worker")
.version("0.2.0")
.doc(s"How many directories will be used if `${WORKER_STORAGE_DIRS.key}` is not set. " +
s"The directory name is a combination of `${WORKER_STORAGE_BASE_DIR_PREFIX.key}` " +
"and from one(inclusive) to `celeborn.worker.storage.baseDir.number`(inclusive) " +
"step by one.")
.intConf
.createWithDefault(16)
val WORKER_STORAGE_BASE_DIR_DISK_TYPE: ConfigEntry[String] =
buildConf("celeborn.worker.storage.baseDir.diskType")
.internal
.categories("worker")
.version("0.6.0")
.doc(s"The disk type of base directory for worker to write if `${WORKER_STORAGE_DIRS.key}` is not set. " +
s"Available options: ${StorageInfo.Type.HDD.name}, ${StorageInfo.Type.SSD.name}.")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValues(Set(StorageInfo.Type.HDD.name, StorageInfo.Type.SSD.name))
.createWithDefault(StorageInfo.Type.HDD.name)
val WORKER_STORAGE_EXPIRE_DIR_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.worker.storage.expireDirs.timeout")
.categories("worker")
.version("0.3.2")
.doc(s"The timeout for a expire dirs to be deleted on disk.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1h")
val HDFS_DIR: OptionalConfigEntry[String] =
buildConf("celeborn.storage.hdfs.dir")
.categories("worker", "master", "client")
.version("0.2.0")
.doc("HDFS base directory for Celeborn to store shuffle data.")
.stringConf
.createOptional
val S3_DIR: OptionalConfigEntry[String] =
buildConf("celeborn.storage.s3.dir")
.categories("worker", "master", "client")
.version("0.6.0")
.doc("S3 base directory for Celeborn to store shuffle data.")
.stringConf
.createOptional
val S3_SECRET_KEY: OptionalConfigEntry[String] =
buildConf("celeborn.storage.s3.secret.key")
.categories("worker", "master", "client")
.version("0.6.0")
.doc("S3 secret key for Celeborn to store shuffle data.")
.stringConf
.createOptional
val S3_ACCESS_KEY: OptionalConfigEntry[String] =
buildConf("celeborn.storage.s3.access.key")
.categories("worker", "master", "client")
.version("0.6.0")
.doc("S3 access key for Celeborn to store shuffle data.")
.stringConf
.createOptional
val S3_ENDPOINT_REGION: OptionalConfigEntry[String] =
buildConf("celeborn.storage.s3.endpoint.region")
.categories("worker", "master", "client")
.version("0.6.0")
.doc("S3 endpoint for Celeborn to store shuffle data.")
.stringConf
.createOptional
val S3_MPU_MAX_RETRIES: ConfigEntry[Int] =
buildConf("celeborn.storage.s3.mpu.maxRetries")
.categories("worker")
.version("0.6.0")
.doc("S3 MPU upload max retries.")
.intConf
.createWithDefault(5)
val OSS_ENDPOINT: OptionalConfigEntry[String] =
buildConf("celeborn.storage.oss.endpoint")
.categories("worker", "master", "client")
.version("0.6.0")
.doc("OSS endpoint for Celeborn to store shuffle data.")
.stringConf
.createOptional
val OSS_DIR: OptionalConfigEntry[String] =
buildConf("celeborn.storage.oss.dir")
.categories("worker", "master", "client")
.version("0.6.0")
.doc("OSS base directory for Celeborn to store shuffle data.")
.stringConf
.createOptional
val OSS_SECRET_KEY: OptionalConfigEntry[String] =
buildConf("celeborn.storage.oss.secret.key")
.categories("worker", "master", "client")
.version("0.6.0")
.doc("OSS secret key for Celeborn to store shuffle data.")
.stringConf
.createOptional
val OSS_IGNORE_CREDENTIALS: ConfigEntry[Boolean] =
buildConf("celeborn.storage.oss.ignore.credentials")
.categories("worker", "master", "client")
.version("0.6.0")
.doc("Whether to skip oss credentials, disable this config to support jindo sdk .")
.booleanConf
.createWithDefault(true)
val OSS_ACCESS_KEY: OptionalConfigEntry[String] =
buildConf("celeborn.storage.oss.access.key")
.categories("worker", "master", "client")
.version("0.6.0")
.doc("OSS access key for Celeborn to store shuffle data.")
.stringConf
.createOptional
val WORKER_DISK_RESERVE_SIZE: ConfigEntry[Long] =
buildConf("celeborn.worker.storage.disk.reserve.size")
.withAlternative("celeborn.worker.disk.reserve.size")
.categories("worker")
.doc("Celeborn worker reserved space for each disk.")
.version("0.3.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("5G")
val WORKER_DISK_RESERVE_RATIO: OptionalConfigEntry[Double] =
buildConf("celeborn.worker.storage.disk.reserve.ratio")
.categories("worker")
.doc("Celeborn worker reserved ratio for each disk. The minimum usable size for each disk is the max space " +
"between the reserved space and the space calculate via reserved ratio.")
.version("0.3.2")
.doubleConf
.checkValue(v => v > 0.0 && v < 1.0, "Should be in (0.0, 1.0).")
.createOptional
val WORKER_DISK_CLEAN_THREADS: ConfigEntry[Int] =
buildConf("celeborn.worker.disk.clean.threads")
.categories("worker")
.version("0.3.2")
.doc("Thread number of worker to clean up directories of expired shuffle keys on disk.")
.intConf
.createWithDefault(4)
val WORKER_CHECK_FILE_CLEAN_MAX_RETRIES: ConfigEntry[Int] =
buildConf("celeborn.worker.storage.checkDirsEmpty.maxRetries")
.withAlternative("celeborn.worker.disk.checkFileClean.maxRetries")
.categories("worker")
.doc("The number of retries for a worker to check if the working directory is cleaned up before registering with the master.")
.version("0.3.0")
.intConf
.createWithDefault(3)
val WORKER_CHECK_FILE_CLEAN_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.worker.storage.checkDirsEmpty.timeout")
.withAlternative("celeborn.worker.disk.checkFileClean.timeout")
.categories("worker")
.doc("The wait time per retry for a worker to check if the working directory is cleaned up before registering with the master.")
.version("0.3.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1000ms")
val WORKER_STORAGE_CREATE_FILE_POLICY: OptionalConfigEntry[String] =
buildConf("celeborn.worker.storage.storagePolicy.createFilePolicy")
.categories("worker")
.doc("This defined the order for creating files across available storages." +
" Available storages options are: MEMORY,SSD,HDD,HDFS,OSS")
.version("0.5.1")
.stringConf
.checkValue(
_.split(",").map(str => StorageInfo.typeNames.contains(str.trim.toUpperCase)).forall(p =>
p),
"Will use default create file order. Default order: MEMORY,SSD,HDD,HDFS,OSS")
.createOptional
val WORKER_STORAGE_EVICT_POLICY: OptionalConfigEntry[String] =
buildConf("celeborn.worker.storage.storagePolicy.evictPolicy")
.categories("worker")
.doc("This define the order of evict files if the storages are available." +
" Available storages: MEMORY,SSD,HDD,HDFS. " +
"Definition: StorageTypes|StorageTypes|StorageTypes. " +
"Example: MEMORY,SSD|SSD,HDFS." +
" The example means that a MEMORY shuffle file can be evicted to SSD " +
"and a SSD shuffle file can be evicted to HDFS.")
.version("0.5.1")
.stringConf
.checkValue(
_.replace("|", ",").split(",").map(str =>
StorageInfo.typeNames.contains(str.trim.toUpperCase)).forall(p => p),
"Will use default evict order. Default order: MEMORY,SSD,HDD,HDFS,OSS")
.createOptional
val WORKER_HTTP_HOST: ConfigEntry[String] =
buildConf("celeborn.worker.http.host")
.withAlternative("celeborn.metrics.worker.prometheus.host")
.withAlternative("celeborn.worker.metrics.prometheus.host")
.categories("worker")
.doc("Worker's http host.")
.version("0.4.0")
.stringConf
.createWithDefault("<localhost>")
val WORKER_HTTP_PORT: ConfigEntry[Int] =
buildConf("celeborn.worker.http.port")
.withAlternative("celeborn.metrics.worker.prometheus.port")
.withAlternative("celeborn.worker.metrics.prometheus.port")
.categories("worker")
.doc("Worker's http port.")
.version("0.4.0")
.intConf
.checkValue(p => p >= 1024 && p < 65535, "Invalid port")
.createWithDefault(9096)
val WORKER_HTTP_MAX_WORKER_THREADS: ConfigEntry[Int] =
buildConf("celeborn.worker.http.maxWorkerThreads")
.categories("worker")
.version("0.5.0")
.doc("Maximum number of threads in the worker http worker thread pool.")
.intConf
.checkValue(_ > 0, "Must be positive.")
.createWithDefault(200)
val WORKER_HTTP_STOP_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.worker.http.stopTimeout")
.categories("worker")
.version("0.5.0")
.doc("Worker http server stop timeout.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("5s")
val WORKER_HTTP_IDLE_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.worker.http.idleTimeout")
.categories("worker")
.version("0.5.0")
.doc("Worker http server idle timeout.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("30s")
val WORKER_HTTP_AUTH_SUPPORTED_SCHEMES: ConfigEntry[Seq[String]] =
buildConf("celeborn.worker.http.auth.supportedSchemes")
.categories("worker")
.version("0.6.0")
.doc("A comma-separated list of worker http auth supported schemes." +
"<ul>" +
" <li>SPNEGO: Kerberos/GSSAPI authentication.</li>" +
" <li>BASIC: User-defined password authentication, the concreted implementation is" +
" configurable via `celeborn.worker.http.auth.basic.provider`.</li>" +
" <li>BEARER: User-defined bearer token authentication, the concreted implementation is" +
" configurable via `celeborn.worker.http.auth.bearer.provider`.</li>" +
"</ul>")
.stringConf
.toSequence
.createWithDefault(Nil)
val WORKER_HTTP_SPNEGO_KEYTAB: OptionalConfigEntry[String] =
buildConf("celeborn.worker.http.spnego.keytab")
.categories("worker")
.version("0.6.0")
.doc("The keytab file for SPNego authentication.")
.stringConf
.createOptional
val WORKER_HTTP_SPNEGO_PRINCIPAL: OptionalConfigEntry[String] =
buildConf("celeborn.worker.http.spnego.principal")
.categories("worker")
.version("0.6.0")
.doc("SPNego service principal, typical value would look like HTTP/_HOST@EXAMPLE.COM." +
" SPNego service principal would be used when celeborn http authentication is enabled." +
" This needs to be set only if SPNEGO is to be used in authentication.")
.stringConf
.createOptional
val WORKER_HTTP_PROXY_CLIENT_IP_HEADER: ConfigEntry[String] =
buildConf("celeborn.worker.http.proxy.client.ip.header")
.categories("worker")
.doc("The HTTP header to record the real client IP address. If your server is behind a load" +
" balancer or other proxy, the server will see this load balancer or proxy IP address as" +
" the client IP address, to get around this common issue, most load balancers or proxies" +
" offer the ability to record the real remote IP address in an HTTP header that will be" +
" added to the request for other devices to use. Note that, because the header value can" +
" be specified to any IP address, so it will not be used for authentication.")
.version("0.6.0")
.stringConf
.createWithDefault("X-Real-IP")
val WORKER_HTTP_AUTH_BASIC_PROVIDER: ConfigEntry[String] =
buildConf("celeborn.worker.http.auth.basic.provider")
.categories("worker")
.version("0.6.0")
.doc("User-defined password authentication implementation of " +
"org.apache.celeborn.common.authentication.PasswdAuthenticationProvider")
.stringConf
.createWithDefault(classOf[AnonymousAuthenticationProviderImpl].getName)
val WORKER_HTTP_AUTH_BEARER_PROVIDER: ConfigEntry[String] =
buildConf("celeborn.worker.http.auth.bearer.provider")
.categories("worker")
.version("0.6.0")
.doc("User-defined token authentication implementation of " +
"org.apache.celeborn.common.authentication.TokenAuthenticationProvider")
.stringConf
.createWithDefault(classOf[AnonymousAuthenticationProviderImpl].getName)
val WORKER_HTTP_AUTH_ADMINISTERS: ConfigEntry[Seq[String]] =
buildConf("celeborn.worker.http.auth.administers")
.categories("worker")
.version("0.6.0")
.doc("A comma-separated list of users who have admin privileges," +
s" Note, when ${WORKER_HTTP_AUTH_SUPPORTED_SCHEMES.key} is not set," +
s" everyone is treated as administrator.")
.stringConf
.toSequence
.createWithDefault(Seq.empty)
val WORKER_HTTP_SSL_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.worker.http.ssl.enabled")
.categories("worker")
.version("0.6.0")
.doc("Set this to true for using SSL encryption in http server.")
.booleanConf
.createWithDefault(false)
val WORKER_HTTP_SSL_KEYSTORE_PATH: OptionalConfigEntry[String] =
buildConf("celeborn.worker.http.ssl.keystore.path")
.categories("worker")
.version("0.6.0")
.doc("SSL certificate keystore location.")
.stringConf
.createOptional
val WORKER_HTTP_SSL_KEYSTORE_PASSWORD: OptionalConfigEntry[String] =
buildConf("celeborn.worker.http.ssl.keystore.password")
.categories("worker")
.version("0.6.0")
.doc("SSL certificate keystore password.")
.stringConf
.createOptional
val WORKER_HTTP_SSL_KEYSTORE_TYPE: OptionalConfigEntry[String] =
buildConf("celeborn.worker.http.ssl.keystore.type")
.categories("worker")
.version("0.6.0")
.doc("SSL certificate keystore type.")
.stringConf
.createOptional
val WORKER_HTTP_SSL_KEYSTORE_ALGORITHM: OptionalConfigEntry[String] =
buildConf("celeborn.worker.http.ssl.keystore.algorithm")
.categories("worker")
.version("0.6.0")
.doc("SSL certificate keystore algorithm.")
.stringConf
.createOptional
val WORKER_HTTP_SSL_DISALLOWED_PROTOCOLS: ConfigEntry[Seq[String]] =
buildConf("celeborn.worker.http.ssl.disallowed.protocols")
.categories("worker")
.version("0.6.0")
.doc("SSL versions to disallow.")
.stringConf
.toSequence
.createWithDefault(Seq("SSLv2", "SSLv3"))
val WORKER_HTTP_SSL_INCLUDE_CIPHER_SUITES: ConfigEntry[Seq[String]] =
buildConf("celeborn.worker.http.ssl.include.ciphersuites")
.categories("worker")
.version("0.6.0")
.doc("A comma-separated list of include SSL cipher suite names.")
.stringConf
.toSequence
.createWithDefault(Nil)
val WORKER_RPC_PORT: ConfigEntry[Int] =
buildConf("celeborn.worker.rpc.port")
.categories("worker")
.doc("Server port for Worker to receive RPC request.")
.version("0.2.0")
.intConf
.createWithDefault(0)
val WORKER_PUSH_PORT: ConfigEntry[Int] =
buildConf("celeborn.worker.push.port")
.categories("worker")
.doc("Server port for Worker to receive push data request from ShuffleClient.")
.version("0.2.0")
.intConf
.createWithDefault(0)
val WORKER_FETCH_PORT: ConfigEntry[Int] =
buildConf("celeborn.worker.fetch.port")
.categories("worker")
.doc("Server port for Worker to receive fetch data request from ShuffleClient.")
.version("0.2.0")
.intConf
.createWithDefault(0)
val WORKER_REPLICATE_PORT: ConfigEntry[Int] =
buildConf("celeborn.worker.replicate.port")
.categories("worker")
.doc("Server port for Worker to receive replicate data request from other Workers.")
.version("0.2.0")
.intConf
.createWithDefault(0)
val WORKER_PUSH_IO_THREADS: OptionalConfigEntry[Int] =
buildConf("celeborn.worker.push.io.threads")
.categories("worker")
.doc("Netty IO thread number of worker to handle client push data. " +
s"The default threads number is the number of flush thread.")
.version("0.2.0")
.intConf
.createOptional
val WORKER_FETCH_IO_THREADS: OptionalConfigEntry[Int] =
buildConf("celeborn.worker.fetch.io.threads")
.categories("worker")
.doc("Netty IO thread number of worker to handle client fetch data. " +
s"The default threads number is the number of flush thread.")
.version("0.2.0")
.intConf
.createOptional
val WORKER_REPLICATE_IO_THREADS: OptionalConfigEntry[Int] =
buildConf("celeborn.worker.replicate.io.threads")
.categories("worker")
.doc("Netty IO thread number of worker to replicate shuffle data. " +
s"The default threads number is the number of flush thread.")
.version("0.2.0")
.intConf
.createOptional
val WORKER_REGISTER_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.worker.register.timeout")
.categories("worker")
.doc("Worker register timeout.")
.version("0.2.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("180s")
val WORKER_CLOSE_IDLE_CONNECTIONS: ConfigEntry[Boolean] =
buildConf("celeborn.worker.closeIdleConnections")
.categories("worker")
.doc("Whether worker will close idle connections.")
.version("0.2.0")
.booleanConf
.createWithDefault(false)
val WORKER_REPLICATE_FAST_FAIL_DURATION: ConfigEntry[Long] =
buildConf("celeborn.worker.replicate.fastFail.duration")
.categories("worker")
.doc("If a replicate request not replied during the duration, worker will mark the replicate data request as failed. " +
"It's recommended to set at least `240s` when `HDFS` is enabled in `celeborn.storage.availableTypes`.")
.version("0.2.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("60s")
val WORKER_REPLICATE_RANDOM_CONNECTION_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.worker.replicate.randomConnection.enabled")
.categories("worker")
.doc("Whether worker will create random connection to peer when replicate data. When false, worker tend to " +
"reuse the same cached TransportClient to a specific replicate worker; when true, worker tend to use " +
"different cached TransportClient. Netty will use the same thread to serve the same connection, so " +
"with more connections replicate server can leverage more netty threads")
.version("0.2.1")
.booleanConf
.createWithDefault(true)
val WORKER_REPLICATE_THREADS: ConfigEntry[Int] =
buildConf("celeborn.worker.replicate.threads")
.categories("worker")
.version("0.2.0")
.doc("Thread number of worker to replicate shuffle data.")
.intConf
.createWithDefault(64)
val WORKER_COMMIT_THREADS: ConfigEntry[Int] =
buildConf("celeborn.worker.commitFiles.threads")
.withAlternative("celeborn.worker.commit.threads")
.categories("worker")
.version("0.3.0")
.doc("Thread number of worker to commit shuffle data files asynchronously. " +
"It's recommended to set at least `128` when `HDFS` is enabled in `celeborn.storage.availableTypes`.")
.intConf
.createWithDefault(32)
val WORKER_COMMIT_FILES_CHECK_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.worker.commitFiles.check.interval")
.categories("worker")
.version("0.6.0")
.doc("Time length for a window about checking whether commit shuffle data files finished.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("100")
val WORKER_CLEAN_THREADS: ConfigEntry[Int] =
buildConf("celeborn.worker.clean.threads")
.categories("worker")
.version("0.3.2")
.doc("Thread number of worker to clean up expired shuffle keys.")
.intConf
.createWithDefault(64)
val WORKER_SHUFFLE_COMMIT_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.worker.commitFiles.timeout")
.withAlternative("celeborn.worker.shuffle.commit.timeout")
.categories("worker")
.doc("Timeout for a Celeborn worker to commit files of a shuffle. " +
"It's recommended to set at least `240s` when `HDFS` is enabled in `celeborn.storage.availableTypes`.")
.version("0.3.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("120s")
val WORKER_PARTITION_SORTER_SORT_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.worker.sortPartition.timeout")
.withAlternative("celeborn.worker.partitionSorter.sort.timeout")
.categories("worker")
.doc("Timeout for a shuffle file to sort.")
.version("0.3.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("220s")
val WORKER_PARTITION_SORTER_THREADS: OptionalConfigEntry[Int] =
buildConf("celeborn.worker.sortPartition.threads")
.withAlternative("celeborn.worker.partitionSorter.threads")
.categories("worker")
.doc("PartitionSorter's thread counts. " +
"It's recommended to set at least `64` when `HDFS` is enabled in `celeborn.storage.availableTypes`.")
.version("0.3.0")
.intConf
.createOptional
val WORKER_PARTITION_SORTER_INDEX_CACHE_MAX_WEIGHT: ConfigEntry[Long] =
buildConf("celeborn.worker.sortPartition.indexCache.maxWeight")
.categories("worker")
.doc("PartitionSorter's cache max weight for index buffer.")
.version("0.4.0")
.longConf
.createWithDefault(100000)
val WORKER_PARTITION_SORTER_INDEX_CACHE_EXPIRE: ConfigEntry[Long] =
buildConf("celeborn.worker.sortPartition.indexCache.expire")
.categories("worker")
.doc("PartitionSorter's cache item expire time.")
.version("0.4.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("180s")
val WORKER_PARTITION_SORTER_RESERVED_MEMORY_PER_PARTITION: ConfigEntry[Long] =
buildConf("celeborn.worker.sortPartition.reservedMemoryPerPartition")
.withAlternative("celeborn.worker.partitionSorter.reservedMemoryPerPartition")
.categories("worker")
.doc("Reserved memory when sorting a shuffle file off-heap.")
.version("0.3.0")
.bytesConf(ByteUnit.BYTE)
.checkValue(v => v < Int.MaxValue, "Reserved memory per partition must be less than 2GB.")
.createWithDefaultString("1mb")
val WORKER_PARTITION_SORTER_PREFETCH_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.worker.sortPartition.prefetch.enabled")
.categories("worker")
.doc("When true, partition sorter will prefetch the original partition files to page cache " +
s"and reserve memory configured by `${WORKER_PARTITION_SORTER_RESERVED_MEMORY_PER_PARTITION.key}` " +
"to allocate a block of memory for prefetching while sorting a shuffle file off-heap with page cache for non-hdfs files. " +
"Otherwise, partition sorter seeks to position of each block and does not prefetch for non-hdfs files.")
.version("0.5.0")
.booleanConf
.createWithDefault(true)
val WORKER_SHUFFLE_BLOCK_COMPACTION_FACTOR: ConfigEntry[Double] =
buildConf("celeborn.shuffle.sortPartition.block.compactionFactor")
.categories("worker")
.version("0.4.2")
.doc("Combine sorted shuffle blocks such that size of compacted shuffle block does not " +
s"exceed compactionFactor * ${SHUFFLE_CHUNK_SIZE.key}")
.doubleConf
.checkValue(v => v >= 0.0 && v <= 1.0, "Should be in [0.0, 1.0].")
.createWithDefault(0.25)
val WORKER_FLUSHER_BUFFER_SIZE: ConfigEntry[Long] =
buildConf("celeborn.worker.flusher.buffer.size")
.categories("worker")
.version("0.2.0")
.doc("Size of buffer used by a single flusher.")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("256k")
val WORKER_HDFS_FLUSHER_BUFFER_SIZE: ConfigEntry[Long] =
buildConf("celeborn.worker.flusher.hdfs.buffer.size")
.categories("worker")
.version("0.3.0")
.doc("Size of buffer used by a HDFS flusher.")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("4m")
val WORKER_S3_FLUSHER_BUFFER_SIZE: ConfigEntry[Long] =
buildConf("celeborn.worker.flusher.s3.buffer.size")
.categories("worker")
.version("0.6.0")
.doc("Size of buffer used by a S3 flusher.")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("6m")
val WORKER_OSS_FLUSHER_BUFFER_SIZE: ConfigEntry[Long] =
buildConf("celeborn.worker.flusher.oss.buffer.size")
.categories("worker")
.version("0.6.0")
.doc("Size of buffer used by a OSS flusher.")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("6m")
val WORKER_WRITER_CLOSE_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.worker.writer.close.timeout")
.categories("worker")
.doc("Timeout for a file writer to close")
.version("0.2.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("120s")
val WORKER_FLUSHER_THREADS: ConfigEntry[Int] =
buildConf("celeborn.worker.flusher.threads")
.categories("worker")
.doc("Flusher's thread count per disk for unknown-type disks.")
.version("0.2.0")
.intConf
.createWithDefault(16)
val WORKER_FLUSHER_HDD_THREADS: ConfigEntry[Int] =
buildConf("celeborn.worker.flusher.hdd.threads")
.categories("worker")
.doc("Flusher's thread count per disk used for write data to HDD disks.")
.version("0.2.0")
.intConf
.createWithDefault(1)
val WORKER_FLUSHER_SSD_THREADS: ConfigEntry[Int] =
buildConf("celeborn.worker.flusher.ssd.threads")
.categories("worker")
.doc("Flusher's thread count per disk used for write data to SSD disks.")
.version("0.2.0")
.intConf
.createWithDefault(16)
val WORKER_FLUSHER_HDFS_THREADS: ConfigEntry[Int] =
buildConf("celeborn.worker.flusher.hdfs.threads")
.categories("worker")
.doc("Flusher's thread count used for write data to HDFS.")
.version("0.2.0")
.intConf
.createWithDefault(8)
val WORKER_FLUSHER_S3_THREADS: ConfigEntry[Int] =
buildConf("celeborn.worker.flusher.s3.threads")
.categories("worker")
.doc("Flusher's thread count used for write data to S3.")
.version("0.6.0")
.intConf
.createWithDefault(8)
val WORKER_FLUSHER_OSS_THREADS: ConfigEntry[Int] =
buildConf("celeborn.worker.flusher.oss.threads")
.categories("worker")
.doc("Flusher's thread count used for write data to OSS.")
.version("0.6.0")
.intConf
.createWithDefault(8)
val WORKER_FLUSHER_SHUTDOWN_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.worker.flusher.shutdownTimeout")
.categories("worker")
.doc("Timeout for a flusher to shutdown.")
.version("0.2.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("3s")
val WORKER_GRACEFUL_SHUTDOWN_SAVE_COMMITTED_FILEINFO_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.worker.graceful.shutdown.saveCommittedFileInfo.interval")
.categories("worker")
.doc("Interval for a Celeborn worker to flush committed file infos into Level DB.")
.version("0.3.1")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("5s")
val WORKER_GRACEFUL_SHUTDOWN_SAVE_COMMITTED_FILEINFO_SYNC: ConfigEntry[Boolean] =
buildConf("celeborn.worker.graceful.shutdown.saveCommittedFileInfo.sync")
.categories("worker")
.doc(
"Whether to call sync method to save committed file infos into Level DB to handle OS crash.")
.version("0.3.1")
.booleanConf
.createWithDefault(false)
val WORKER_DISKTIME_SLIDINGWINDOW_SIZE: ConfigEntry[Int] =
buildConf("celeborn.worker.flusher.diskTime.slidingWindow.size")
.withAlternative("celeborn.worker.flusher.avgFlushTime.slidingWindow.size")
.categories("worker")
.doc("The size of sliding windows used to calculate statistics about flushed time and count.")
.version("0.3.0")
.intConf
.createWithDefault(20)
val WORKER_DISKTIME_SLIDINGWINDOW_MINFLUSHCOUNT: ConfigEntry[Int] =
buildConf("celeborn.worker.diskTime.slidingWindow.minFlushCount")
.withAlternative("celeborn.worker.flusher.avgFlushTime.slidingWindow.minCount")
.categories("worker")
.doc("The minimum flush count to enter a sliding window" +
" to calculate statistics about flushed time and count.")
.version("0.3.0")
.internal
.intConf
.createWithDefault(500)
val WORKER_DISKTIME_SLIDINGWINDOW_MINFETCHCOUNT: ConfigEntry[Int] =
buildConf("celeborn.worker.diskTime.slidingWindow.minFetchCount")
.categories("worker")
.doc("The minimum fetch count to enter a sliding window" +
" to calculate statistics about fetched time and count.")
.version("0.2.1")
.internal
.intConf
.createWithDefault(100)
val WORKER_DIRECT_MEMORY_CHECK_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.worker.monitor.memory.check.interval")
.withAlternative("celeborn.worker.memory.checkInterval")
.categories("worker")
.doc("Interval of worker direct memory checking.")
.version("0.3.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("10ms")
val WORKER_PINNED_MEMORY_CHECK_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.worker.monitor.pinnedMemory.check.enabled")
.categories("worker")
.doc("If true, MemoryManager will check worker should resume by pinned memory used.")
.version("0.6.0")
.booleanConf
.createWithDefaultString("true")
val WORKER_PINNED_MEMORY_CHECK_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.worker.monitor.pinnedMemory.check.interval")
.categories("worker")
.doc("Interval of worker direct pinned memory checking, " +
"only takes effect when celeborn.network.memory.allocator.pooled and " +
"celeborn.worker.monitor.pinnedMemory.check.enabled are enabled.")
.version("0.6.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("10s")
val WORKER_PINNED_MEMORY_RESUME_KEEP_TIME: ConfigEntry[Long] =
buildConf("celeborn.worker.monitor.pinnedMemory.resumeKeepTime")
.categories("worker")
.doc("Time of worker to stay in resume state after resumeByPinnedMemory")
.version("0.6.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1s")
val WORKER_DIRECT_MEMORY_REPORT_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.worker.monitor.memory.report.interval")
.withAlternative("celeborn.worker.memory.reportInterval")
.categories("worker")
.doc("Interval of worker direct memory tracker reporting to log.")
.version("0.3.0")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("10s")
val WORKER_DIRECT_MEMORY_TRIM_CHANNEL_WAIT_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.worker.monitor.memory.trimChannelWaitInterval")
.categories("worker")
.doc("Wait time after worker trigger channel to trim cache.")
.version("0.3.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1s")
val WORKER_DIRECT_MEMORY_TRIM_FLUSH_WAIT_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.worker.monitor.memory.trimFlushWaitInterval")
.categories("worker")
.doc("Wait time after worker trigger StorageManger to flush data.")
.version("0.3.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1s")
val WORKER_DISK_MONITOR_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.worker.monitor.disk.enabled")
.categories("worker")
.version("0.3.0")
.doc("When true, worker will monitor device and report to master.")
.booleanConf
.createWithDefault(true)
val WORKER_DEVICE_STATUS_CHECK_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.worker.monitor.disk.check.timeout")
.withAlternative("celeborn.worker.disk.check.timeout")
.categories("worker")
.doc("Timeout time for worker check device status.")
.version("0.3.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("30s")
val WORKER_DISK_MONITOR_CHECKLIST: ConfigEntry[Seq[String]] =
buildConf("celeborn.worker.monitor.disk.checklist")
.categories("worker")
.version("0.2.0")
.doc("Monitor type for disk, available items are: " +
"iohang, readwrite and diskusage.")
.stringConf
.transform(_.toLowerCase)
.toSequence
.createWithDefaultString("readwrite,diskusage")
val WORKER_DISK_MONITOR_CHECK_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.worker.monitor.disk.check.interval")
.withAlternative("celeborn.worker.monitor.disk.checkInterval")
.categories("worker")
.version("0.3.0")
.doc("Intervals between device monitor to check disk.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("30s")
val WORKER_DISK_MONITOR_SYS_BLOCK_DIR: ConfigEntry[String] =
buildConf("celeborn.worker.monitor.disk.sys.block.dir")
.categories("worker")
.version("0.2.0")
.doc("The directory where linux file block information is stored.")
.stringConf
.createWithDefault("/sys/block")
val WORKER_DISK_MONITOR_NOTIFY_ERROR_THRESHOLD: ConfigEntry[Int] =
buildConf("celeborn.worker.monitor.disk.notifyError.threshold")
.categories("worker")
.version("0.3.0")
.doc("Device monitor will only notify critical error once the accumulated valid non-critical error number " +
"exceeding this threshold.")
.intConf
.createWithDefault(64)
val WORKER_DISK_MONITOR_NOTIFY_ERROR_EXPIRE_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.worker.monitor.disk.notifyError.expireTimeout")
.categories("worker")
.version("0.3.0")
.doc("The expire timeout of non-critical device error. Only notify critical error when the number of non-critical " +
"errors for a period of time exceeds threshold.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("10m")
val WORKER_WRITER_CREATE_MAX_ATTEMPTS: ConfigEntry[Int] =
buildConf("celeborn.worker.writer.create.maxAttempts")
.categories("worker")
.version("0.2.0")
.doc("Retry count for a file writer to create if its creation was failed.")
.intConf
.createWithDefault(3)
val WORKER_FLUSHER_LOCAL_GATHER_API_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.worker.flusher.local.gatherAPI.enabled")
.internal
.categories("worker")
.version("0.6.0")
.doc("Worker will use gather API if this config is true.")
.booleanConf
.createWithDefault(true)
val WORKER_PARTITION_SORTER_DIRECT_MEMORY_RATIO_THRESHOLD: ConfigEntry[Double] =
buildConf("celeborn.worker.partitionSorter.directMemoryRatioThreshold")
.categories("worker")
.doc("Max ratio of partition sorter's memory for sorting, when reserved memory is higher than max partition " +
"sorter memory, partition sorter will stop sorting." +
" If this value is set to 0, partition files sorter will skip memory check and ServingState check.")
.version("0.2.0")
.doubleConf
.checkValue(v => v >= 0.0 && v <= 1.0, "Should be in [0.0, 1.0].")
.createWithDefault(0.1)
val WORKER_DIRECT_MEMORY_RATIO_FOR_READ_BUFFER: ConfigEntry[Double] =
buildConf("celeborn.worker.directMemoryRatioForReadBuffer")
.categories("worker")
.doc("Max ratio of direct memory for read buffer")
.version("0.2.0")
.doubleConf
.createWithDefault(0.1)
val WORKER_DIRECT_MEMORY_RATIO_FOR_MEMORY_FILE_STORAGE: ConfigEntry[Double] =
buildConf("celeborn.worker.directMemoryRatioForMemoryFileStorage")
.categories("worker")
.doc("Max ratio of direct memory to store shuffle data. " +
"This feature is experimental and disabled by default.")
.version("0.5.0")
.doubleConf
.createWithDefault(0)
val WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE: ConfigEntry[Double] =
buildConf("celeborn.worker.directMemoryRatioToPauseReceive")
.categories("worker")
.doc("If direct memory usage reaches this limit, the worker will stop to receive data from Celeborn shuffle clients.")
.version("0.2.0")
.doubleConf
.createWithDefault(0.85)
val WORKER_DIRECT_MEMORY_RATIO_PAUSE_REPLICATE: ConfigEntry[Double] =
buildConf("celeborn.worker.directMemoryRatioToPauseReplicate")
.categories("worker")
.doc("If direct memory usage reaches this limit, the worker will stop to receive replication data from other workers. " +
s"This value should be higher than ${WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE.key}.")
.version("0.2.0")
.doubleConf
.createWithDefault(0.95)
val WORKER_DIRECT_MEMORY_RATIO_RESUME: ConfigEntry[Double] =
buildConf("celeborn.worker.directMemoryRatioToResume")
.categories("worker")
.doc("If direct memory usage is less than this limit, worker will resume.")
.version("0.2.0")
.doubleConf
.createWithDefault(0.7)
val WORKER_PINNED_MEMORY_RATIO_RESUME: ConfigEntry[Double] =
buildConf("celeborn.worker.pinnedMemoryRatioToResume")
.categories("worker")
.doc("If pinned memory usage is less than this limit, worker will resume, " +
"only takes effect when celeborn.network.memory.allocator.pooled and " +
"celeborn.worker.monitor.pinnedMemory.check.enabled are enabled")
.version("0.6.0")
.doubleConf
.createWithDefault(0.3)
val WORKER_MEMORY_FILE_STORAGE_MAX_FILE_SIZE: ConfigEntry[Long] =
buildConf("celeborn.worker.memoryFileStorage.maxFileSize")
.categories("worker")
.doc("Max size for a memory storage file. It must be less than 2GB.")
.version("0.5.0")
.bytesConf(ByteUnit.BYTE)
.checkValue(v => v < Int.MaxValue, "A single memory storage file can not be larger than 2GB")
.createWithDefaultString("8MB")
val WORKER_MEMORY_FILE_STORAGE_EVICT_AGGRESSIVE_MODE_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.worker.memoryFileStorage.evict.aggressiveMode.enabled")
.categories("worker")
.doc(
"If this set to true, memory shuffle files will be evicted when worker is in PAUSED state." +
" If the worker's offheap memory is not ample, set this to true " +
"and decrease `celeborn.worker.directMemoryRatioForMemoryFileStorage` will be helpful.")
.version("0.5.1")
.booleanConf
.createWithDefault(false)
val WORKER_MEMORY_FILE_STORAGE_EVICT_RATIO: ConfigEntry[Double] =
buildConf("celeborn.worker.memoryFileStorage.evict.ratio")
.categories("worker")
.doc("If memory shuffle storage usage rate is above this config, the memory storage shuffle files will evict to free memory.")
.version("0.5.1")
.doubleConf
.createWithDefault(0.5)
val WORKER_CONGESTION_CONTROL_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.worker.congestionControl.enabled")
.categories("worker")
.doc("Whether to enable congestion control or not.")
.version("0.3.0")
.booleanConf
.createWithDefault(false)
val WORKER_CONGESTION_CONTROL_SAMPLE_TIME_WINDOW: ConfigEntry[Long] =
buildConf("celeborn.worker.congestionControl.sample.time.window")
.categories("worker")
.doc("The worker holds a time sliding list to calculate users' produce/consume rate")
.version("0.3.0")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("10s")
val WORKER_CONGESTION_CONTROL_DISK_BUFFER_LOW_WATERMARK: ConfigEntry[Long] =
buildConf("celeborn.worker.congestionControl.diskBuffer.low.watermark")
.withAlternative("celeborn.worker.congestionControl.low.watermark")
.categories("worker")
.doc("Will stop congest users if the total pending bytes of disk buffer is lower than " +
"this configuration")
.version("0.3.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Long.MaxValue)
val WORKER_CONGESTION_CONTROL_DISK_BUFFER_HIGH_WATERMARK: ConfigEntry[Long] =
buildConf("celeborn.worker.congestionControl.diskBuffer.high.watermark")
.withAlternative("celeborn.worker.congestionControl.high.watermark")
.categories("worker")
.doc("If the total bytes in disk buffer exceeds this configure, will start to congest " +
"users whose produce rate is higher than the potential average consume rate. " +
"The congestion will stop if the produce rate is lower or equal to the " +
"average consume rate, or the total pending bytes lower than " +
s"${WORKER_CONGESTION_CONTROL_DISK_BUFFER_LOW_WATERMARK.key}")
.version("0.3.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Long.MaxValue)
val WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_LOW_WATERMARK: ConfigEntry[Long] =
buildConf("celeborn.worker.congestionControl.userProduceSpeed.low.watermark")
.categories("worker")
.doc("For those users that produce byte speeds less than this configuration, " +
"stop congestion for these users")
.version("0.6.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Long.MaxValue)
val WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_HIGH_WATERMARK: ConfigEntry[Long] =
buildConf("celeborn.worker.congestionControl.userProduceSpeed.high.watermark")
.categories("worker")
.doc("For those users that produce byte speeds greater than this configuration, " +
"start congestion for these users")
.version("0.6.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Long.MaxValue)
val WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_LOW_WATERMARK: ConfigEntry[Long] =
buildConf("celeborn.worker.congestionControl.workerProduceSpeed.low.watermark")
.categories("worker")
.doc("Stop congestion If worker total produce speed less than this configuration")
.version("0.6.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Long.MaxValue)
val WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_HIGH_WATERMARK: ConfigEntry[Long] =
buildConf("celeborn.worker.congestionControl.workerProduceSpeed.high.watermark")
.categories("worker")
.doc("Start congestion If worker total produce speed greater than this configuration")
.version("0.6.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Long.MaxValue)
val WORKER_CONGESTION_CONTROL_USER_INACTIVE_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.worker.congestionControl.user.inactive.interval")
.categories("worker")
.doc("How long will consider this user is inactive if it doesn't send data")
.version("0.3.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("10min")
val WORKER_CONGESTION_CONTROL_CHECK_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.worker.congestionControl.check.interval")
.categories("worker")
.doc(
s"Interval of worker checks congestion if ${WORKER_CONGESTION_CONTROL_ENABLED.key} is true.")
.version("0.3.2")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("10ms")
val WORKER_DECOMMISSION_CHECK_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.worker.decommission.checkInterval")
.categories("worker")
.doc(
"The wait interval of checking whether all the shuffle expired during worker decommission")
.version("0.4.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("30s")
val WORKER_DECOMMISSION_FORCE_EXIT_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.worker.decommission.forceExitTimeout")
.categories("worker")
.doc("The wait time of waiting for all the shuffle expire during worker decommission.")
.version("0.4.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("6h")
val WORKER_GRACEFUL_SHUTDOWN_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.worker.graceful.shutdown.enabled")
.categories("worker")
.doc("When true, during worker shutdown, the worker will wait for all released slots " +
s"to be committed or destroyed.")
.version("0.2.0")
.booleanConf
.createWithDefault(false)
val WORKER_GRACEFUL_SHUTDOWN_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.worker.graceful.shutdown.timeout")
.categories("worker")
.doc("The worker's graceful shutdown timeout time.")
.version("0.2.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("600s")
val WORKER_CHECK_SLOTS_FINISHED_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.worker.graceful.shutdown.checkSlotsFinished.interval")
.categories("worker")
.doc("The wait interval of checking whether all released slots " +
"to be committed or destroyed during worker graceful shutdown")
.version("0.2.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1s")
val WORKER_CHECK_SLOTS_FINISHED_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.worker.graceful.shutdown.checkSlotsFinished.timeout")
.categories("worker")
.doc("The wait time of waiting for the released slots" +
" to be committed or destroyed during worker graceful shutdown.")
.version("0.2.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("480s")
val WORKER_GRACEFUL_SHUTDOWN_RECOVER_PATH: ConfigEntry[String] =
buildConf("celeborn.worker.graceful.shutdown.recoverPath")
.categories("worker")
.doc("The path to store DB.")
.version("0.2.0")
.stringConf
.transform(_.replace("<tmp>", System.getProperty("java.io.tmpdir")))
.createWithDefault(s"<tmp>/recover")
val WORKER_GRACEFUL_SHUTDOWN_RECOVER_DB_BACKEND: ConfigEntry[String] =
buildConf("celeborn.worker.graceful.shutdown.recoverDbBackend")
.categories("worker")
.doc("Specifies a disk-based store used in local db. ROCKSDB or LEVELDB (deprecated).")
.version("0.4.0")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValues(Set("LEVELDB", "ROCKSDB"))
.createWithDefault("ROCKSDB")
val WORKER_PARTITION_SORTER_SHUTDOWN_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.worker.graceful.shutdown.partitionSorter.shutdownTimeout")
.categories("worker")
.doc("The wait time of waiting for sorting partition files" +
" during worker graceful shutdown.")
.version("0.2.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("120s")
val WORKER_PARTITION_READ_BUFFERS_MIN: ConfigEntry[Int] =
buildConf("celeborn.worker.partition.initial.readBuffersMin")
.categories("worker")
.version("0.3.0")
.doc("Min number of initial read buffers")
.intConf
.createWithDefault(1)
val WORKER_PARTITION_READ_BUFFERS_MAX: ConfigEntry[Int] =
buildConf("celeborn.worker.partition.initial.readBuffersMax")
.categories("worker")
.version("0.3.0")
.doc("Max number of initial read buffers")
.intConf
.createWithDefault(1024)
val WORKER_BUFFERSTREAM_THREADS_PER_MOUNTPOINT: ConfigEntry[Int] =
buildConf("celeborn.worker.bufferStream.threadsPerMountpoint")
.categories("worker")
.version("0.3.0")
.doc("Threads count for read buffer per mount point.")
.intConf
.createWithDefault(8)
val WORKER_READBUFFER_ALLOCATIONWAIT: ConfigEntry[Long] =
buildConf("celeborn.worker.readBuffer.allocationWait")
.categories("worker")
.version("0.3.0")
.doc("The time to wait when buffer dispatcher can not allocate a buffer.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("50ms")
val WORKER_READBUFFER_TARGET_RATIO: ConfigEntry[Double] =
buildConf("celeborn.worker.readBuffer.target.ratio")
.categories("worker")
.version("0.3.0")
.doc("The target ratio for read ahead buffer's memory usage.")
.doubleConf
.createWithDefault(0.9)
val WORKER_READBUFFER_TARGET_UPDATE_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.worker.readBuffer.target.updateInterval")
.categories("worker")
.version("0.3.0")
.doc("The interval for memory manager to calculate new read buffer's target memory.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("100ms")
val WORKER_READBUFFER_TARGET_NOTIFY_THRESHOLD: ConfigEntry[Long] =
buildConf("celeborn.worker.readBuffer.target.changeThreshold")
.categories("worker")
.version("0.3.0")
.doc("The target ratio for pre read memory usage.")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("1mb")
val WORKER_READBUFFERS_TOTRIGGERREAD_MIN: ConfigEntry[Int] =
buildConf("celeborn.worker.readBuffer.toTriggerReadMin")
.categories("worker")
.version("0.3.0")
.doc("Min buffers count for map data partition to trigger read.")
.intConf
.createWithDefault(32)
val WORKER_READBUFFER_CHECK_THREAD_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.worker.readBufferDispatcherThreadWatchdog.checkInterval")
.categories("worker")
.version("0.5.2")
.internal
.doc("The interval for worker to check read buffer dispatcher thread. 0 means disable.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefault(0)
val WORKER_PUSH_HEARTBEAT_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.worker.push.heartbeat.enabled")
.categories("worker")
.version("0.3.0")
.doc("enable the heartbeat from worker to client when pushing data")
.booleanConf
.createWithDefault(false)
val WORKER_PUSH_COMPOSITEBUFFER_MAXCOMPONENTS: ConfigEntry[Int] =
buildConf("celeborn.worker.push.compositeBuffer.maxComponents")
.internal
.categories("worker")
.version("0.3.0")
.doc("Max components of Netty `CompositeByteBuf` in `FileWriter`'s `flushBuffer`. " +
"When this value is too big, i.e. 256, there will be many memory fragments in Netty's memory pool, " +
"and total direct memory can be significantly larger than the disk buffer. " +
"When set to 1, Netty's direct memory is close to disk buffer, but performance " +
"might decrease due to frequent memory copy during compaction.")
.intConf
.createWithDefault(128)
val WORKER_FETCH_HEARTBEAT_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.worker.fetch.heartbeat.enabled")
.categories("worker")
.version("0.3.0")
.doc("enable the heartbeat from worker to client when fetching data")
.booleanConf
.createWithDefault(false)
val WORKER_ACTIVE_CONNECTION_MAX: OptionalConfigEntry[Long] =
buildConf("celeborn.worker.activeConnection.max")
.categories("worker")
.doc("If the number of active connections on a worker exceeds this configuration value, " +
"the worker will be marked as high-load in the heartbeat report, " +
"and the master will not include that node in the response of RequestSlots.")
.version("0.3.1")
.longConf
.createOptional
val WORKER_JVM_PROFILER_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.worker.jvmProfiler.enabled")
.categories("worker")
.version("0.5.0")
.doc("Turn on code profiling via async_profiler in workers.")
.booleanConf
.createWithDefault(false)
val WORKER_JVM_PROFILER_OPTIONS: ConfigEntry[String] =
buildConf("celeborn.worker.jvmProfiler.options")
.categories("worker")
.version("0.5.0")
.doc("Options to pass on to the async profiler.")
.stringConf
.createWithDefault("event=wall,interval=10ms,alloc=2m,lock=10ms,chunktime=300s")
val WORKER_JVM_PROFILER_LOCAL_DIR: ConfigEntry[String] =
buildConf("celeborn.worker.jvmProfiler.localDir")
.categories("worker")
.version("0.5.0")
.doc("Local file system path on worker where profiler output is saved. "
+ "Defaults to the working directory of the worker process.")
.stringConf
.createWithDefault(".")
val WORKER_JVM_QUAKE_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.worker.jvmQuake.enabled")
.categories("worker")
.version("0.4.0")
.doc("When true, Celeborn worker will start the jvm quake to monitor of gc behavior, " +
"which enables early detection of memory management issues and facilitates fast failure.")
.booleanConf
.createWithDefault(false)
val WORKER_JVM_QUAKE_CHECK_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.worker.jvmQuake.check.interval")
.categories("worker")
.version("0.4.0")
.doc("Interval of gc behavior checking for worker jvm quake.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1s")
val WORKER_JVM_QUAKE_RUNTIME_WEIGHT: ConfigEntry[Double] =
buildConf("celeborn.worker.jvmQuake.runtimeWeight")
.categories("worker")
.version("0.4.0")
.doc(
"The factor by which to multiply running JVM time, when weighing it against GCing time. " +
"'Deficit' is accumulated as `gc_time - runtime * runtime_weight`, and is compared against threshold " +
"to determine whether to take action.")
.doubleConf
.createWithDefault(5)
val WORKER_JVM_QUAKE_DUMP_THRESHOLD: ConfigEntry[Long] =
buildConf("celeborn.worker.jvmQuake.dump.threshold")
.categories("worker")
.version("0.4.0")
.doc("The threshold of heap dump for the maximum GC 'deficit' which can be accumulated before jvmquake takes action. " +
"Meanwhile, there is no heap dump generated when dump threshold is greater than kill threshold.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("30s")
val WORKER_JVM_QUAKE_KILL_THRESHOLD: ConfigEntry[Long] =
buildConf("celeborn.worker.jvmQuake.kill.threshold")
.categories("worker")
.version("0.4.0")
.doc("The threshold of system kill for the maximum GC 'deficit' which can be accumulated before jvmquake takes action.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("60s")
val WORKER_JVM_QUAKE_DUMP_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.worker.jvmQuake.dump.enabled")
.categories("worker")
.version("0.4.0")
.doc("Whether to heap dump for the maximum GC 'deficit' during worker jvm quake.")
.booleanConf
.createWithDefault(true)
val WORKER_JVM_QUAKE_DUMP_PATH: ConfigEntry[String] =
buildConf("celeborn.worker.jvmQuake.dump.path")
.categories("worker")
.version("0.4.0")
.doc("The path of heap dump for the maximum GC 'deficit' during worker jvm quake.")
.stringConf
.transform(_.replace("<tmp>", System.getProperty("java.io.tmpdir"))
.replace("<pid>", Utils.getProcessId))
.createWithDefault(s"<tmp>/jvm-quake/dump/<pid>")
val WORKER_JVM_QUAKE_EXIT_CODE: ConfigEntry[Int] =
buildConf("celeborn.worker.jvmQuake.exitCode")
.categories("worker")
.version("0.4.0")
.doc("The exit code of system kill for the maximum GC 'deficit' during worker jvm quake.")
.intConf
.createWithDefault(502)
val APPLICATION_HEARTBEAT_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.client.application.heartbeatInterval")
.withAlternative("celeborn.application.heartbeatInterval")
.categories("client")
.version("0.3.0")
.doc("Interval for client to send heartbeat message to master.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("10s")
val APPLICATION_UNREGISTER_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.client.application.unregister.enabled")
.categories("client")
.version("0.3.2")
.doc("When true, Celeborn client will inform celeborn master the application is already shutdown during client " +
"exit, this allows the cluster to release resources immediately, resulting in resource savings.")
.booleanConf
.createWithDefault(true)
val CLIENT_EXCLUDE_PEER_WORKER_ON_FAILURE_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.client.excludePeerWorkerOnFailure.enabled")
.categories("client")
.version("0.3.0")
.doc("When true, Celeborn will exclude partition's peer worker on failure " +
"when push data to replica failed.")
.booleanConf
.createWithDefault(true)
val CLIENT_EXCLUDED_WORKER_EXPIRE_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.client.excludedWorker.expireTimeout")
.withAlternative("celeborn.worker.excluded.expireTimeout")
.categories("client")
.version("0.3.0")
.doc("Timeout time for LifecycleManager to clear reserved excluded worker. Default to be 1.5 * `celeborn.master.heartbeat.worker.timeout` " +
"to cover worker heartbeat timeout check period")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("180s")
val CLIENT_CHECKED_USE_ALLOCATED_WORKERS: ConfigEntry[Boolean] =
buildConf("celeborn.client.checked.useAllocatedWorkers")
.internal
.categories("client")
.version("0.3.0")
.doc("When true, Celeborn will use local allocated workers as candidate being checked workers(check the workers " +
"whether unKnown in master), this may be more useful for map partition to regenerate the lost data), " +
"otherwise use local black list as candidate being checked workers.")
.booleanConf
.createWithDefault(false)
val TEST_CLIENT_MOCK_DESTROY_SLOTS_FAILURE: ConfigEntry[Boolean] =
buildConf("celeborn.test.client.mockDestroySlotsFailure")
.internal
.categories("test", "client")
.doc("Fail destroy slots request for test")
.version("0.3.2")
.booleanConf
.createWithDefault(false)
val TEST_MOCK_COMMIT_FILES_FAILURE: ConfigEntry[Boolean] =
buildConf("celeborn.test.mockCommitFilesFailure")
.internal
.categories("test", "client", "worker")
.doc("Fail commit files request for test")
.version("0.3.2")
.booleanConf
.createWithDefault(false)
val TEST_CLIENT_MOCK_SHUFFLE_LOST: ConfigEntry[Boolean] =
buildConf("celeborn.test.client.mockShuffleLost")
.internal
.categories("test", "client")
.doc("Mock shuffle lost.")
.version("0.5.2")
.internal
.booleanConf
.createWithDefault(false)
val TEST_CLIENT_MOCK_SHUFFLE_LOST_SHUFFLE: ConfigEntry[Int] =
buildConf("celeborn.test.client.mockShuffleLostShuffle")
.internal
.categories("test", "client")
.doc("Mock shuffle lost for shuffle")
.version("0.5.2")
.internal
.intConf
.createWithDefault(0)
val CLIENT_PUSH_REPLICATE_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.client.push.replicate.enabled")
.withAlternative("celeborn.push.replicate.enabled")
.categories("client")
.doc("When true, Celeborn worker will replicate shuffle data to another Celeborn worker " +
"asynchronously to ensure the pushed shuffle data won't be lost after the node failure. " +
"It's recommended to set `false` when `HDFS` is enabled in `celeborn.storage.availableTypes`.")
.version("0.3.0")
.booleanConf
.createWithDefault(false)
val CLIENT_PUSH_BUFFER_INITIAL_SIZE: ConfigEntry[Long] =
buildConf("celeborn.client.push.buffer.initial.size")
.withAlternative("celeborn.push.buffer.initial.size")
.categories("client")
.version("0.3.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("8k")
val CLIENT_PUSH_BUFFER_MAX_SIZE: ConfigEntry[Long] =
buildConf("celeborn.client.push.buffer.max.size")
.withAlternative("celeborn.push.buffer.max.size")
.categories("client")
.version("0.3.0")
.doc("Max size of reducer partition buffer memory for shuffle hash writer. The pushed " +
"data will be buffered in memory before sending to Celeborn worker. For performance " +
"consideration keep this buffer size higher than 32K. Example: If reducer amount is " +
"2000, buffer size is 64K, then each task will consume up to `64KiB * 2000 = 125MiB` " +
"heap memory.")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("64k")
val CLIENT_PUSH_QUEUE_CAPACITY: ConfigEntry[Int] =
buildConf("celeborn.client.push.queue.capacity")
.withAlternative("celeborn.push.queue.capacity")
.categories("client")
.version("0.3.0")
.doc("Push buffer queue size for a task. The maximum memory is " +
"`celeborn.client.push.buffer.max.size` * `celeborn.client.push.queue.capacity`, " +
"default: 64KiB * 512 = 32MiB")
.intConf
.createWithDefault(512)
val CLIENT_PUSH_MAX_REQS_IN_FLIGHT_TOTAL: ConfigEntry[Int] =
buildConf("celeborn.client.push.maxReqsInFlight.total")
.withAlternative("celeborn.push.maxReqsInFlight")
.categories("client")
.version("0.3.0")
.doc("Amount of total Netty in-flight requests. The maximum memory is " +
"`celeborn.client.push.maxReqsInFlight.total` * `celeborn.client.push.buffer.max.size` " +
"* compression ratio(1 in worst case): 64KiB * 256 = 16MiB")
.intConf
.createWithDefault(256)
val CLIENT_PUSH_MAX_REQS_IN_FLIGHT_PERWORKER: ConfigEntry[Int] =
buildConf("celeborn.client.push.maxReqsInFlight.perWorker")
.categories("client")
.version("0.3.0")
.doc(
"Amount of Netty in-flight requests per worker. Default max memory of in flight requests " +
" per worker is `celeborn.client.push.maxReqsInFlight.perWorker` * `celeborn.client.push.buffer.max.size` " +
"* compression ratio(1 in worst case): 64KiB * 32 = 2MiB. The maximum memory will " +
"not exceed `celeborn.client.push.maxReqsInFlight.total`.")
.intConf
.createWithDefault(32)
val CLIENT_PUSH_MAX_REVIVE_TIMES: ConfigEntry[Int] =
buildConf("celeborn.client.push.revive.maxRetries")
.categories("client")
.version("0.3.0")
.doc("Max retry times for reviving when celeborn push data failed.")
.intConf
.createWithDefault(5)
val CLIENT_PUSH_REVIVE_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.client.push.revive.interval")
.categories("client")
.version("0.3.0")
.doc("Interval for client to trigger Revive to LifecycleManager. The number of partitions in one Revive " +
"request is `celeborn.client.push.revive.batchSize`.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("100ms")
val CLIENT_PUSH_REVIVE_BATCHSIZE: ConfigEntry[Int] =
buildConf("celeborn.client.push.revive.batchSize")
.categories("client")
.version("0.3.0")
.doc("Max number of partitions in one Revive request.")
.intConf
.createWithDefault(2048)
val CLIENT_PUSH_EXCLUDE_WORKER_ON_FAILURE_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.client.push.excludeWorkerOnFailure.enabled")
.categories("client")
.doc("Whether to enable shuffle client-side push exclude workers on failures.")
.version("0.3.0")
.booleanConf
.createWithDefault(false)
val CLIENT_PUSH_LIMIT_STRATEGY: ConfigEntry[String] =
buildConf("celeborn.client.push.limit.strategy")
.categories("client")
.doc("The strategy used to control the push speed. " +
"Valid strategies are SIMPLE and SLOWSTART. The SLOWSTART strategy usually works with " +
"congestion control mechanism on the worker side.")
.version("0.3.0")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValues(Set("SIMPLE", "SLOWSTART"))
.createWithDefaultString("SIMPLE")
val CLIENT_PUSH_SLOW_START_INITIAL_SLEEP_TIME: ConfigEntry[Long] =
buildConf("celeborn.client.push.slowStart.initialSleepTime")
.categories("client")
.version("0.3.0")
.doc(s"The initial sleep time if the current max in flight requests is 0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("500ms")
val CLIENT_PUSH_SLOW_START_MAX_SLEEP_TIME: ConfigEntry[Long] =
buildConf("celeborn.client.push.slowStart.maxSleepTime")
.categories("client")
.version("0.3.0")
.doc(s"If ${CLIENT_PUSH_LIMIT_STRATEGY.key} is set to SLOWSTART, push side will " +
"take a sleep strategy for each batch of requests, this controls " +
"the max sleep time if the max in flight requests limit is 1 for a long time")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("2s")
val CLIENT_PUSH_DATA_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.client.push.timeout")
.withAlternative("celeborn.push.data.timeout")
.categories("client")
.version("0.3.0")
.doc(s"Timeout for a task to push data rpc message. This value should better be more than twice of `${PUSH_TIMEOUT_CHECK_INTERVAL.key}`")
.timeConf(TimeUnit.MILLISECONDS)
.checkValue(_ > 0, "Value must be positive!")
.createWithDefaultString("120s")
val TEST_CLIENT_PUSH_PRIMARY_DATA_TIMEOUT: ConfigEntry[Boolean] =
buildConf("celeborn.test.worker.pushPrimaryDataTimeout")
.withAlternative("celeborn.test.pushMasterDataTimeout")
.internal
.categories("test", "worker")
.version("0.3.0")
.doc("Whether to test push primary data timeout")
.booleanConf
.createWithDefault(false)
val TEST_WORKER_PUSH_REPLICA_DATA_TIMEOUT: ConfigEntry[Boolean] =
buildConf("celeborn.test.worker.pushReplicaDataTimeout")
.internal
.categories("test", "worker")
.version("0.3.0")
.doc("Whether to test push replica data timeout")
.booleanConf
.createWithDefault(false)
val CLIENT_PUSH_LIMIT_IN_FLIGHT_TIMEOUT: OptionalConfigEntry[Long] =
buildConf("celeborn.client.push.limit.inFlight.timeout")
.withAlternative("celeborn.push.limit.inFlight.timeout")
.categories("client")
.doc("Timeout for netty in-flight requests to be done. " +
s"Default value should be `${CLIENT_PUSH_DATA_TIMEOUT.key} * 2`.")
.version("0.3.0")
.timeConf(TimeUnit.MILLISECONDS)
.createOptional
val CLIENT_PUSH_LIMIT_IN_FLIGHT_SLEEP_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.client.push.limit.inFlight.sleepInterval")
.withAlternative("celeborn.push.limit.inFlight.sleepInterval")
.categories("client")
.doc("Sleep interval when check netty in-flight requests to be done.")
.version("0.3.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("50ms")
val CLIENT_PUSH_SORT_RANDOMIZE_PARTITION_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.client.push.sort.randomizePartitionId.enabled")
.withAlternative("celeborn.push.sort.randomizePartitionId.enabled")
.categories("client")
.doc(
"Whether to randomize partitionId in push sorter. If true, partitionId will be randomized " +
"when sort data to avoid skew when push to worker")
.version("0.3.0")
.booleanConf
.createWithDefault(false)
val CLIENT_PUSH_RETRY_THREADS: ConfigEntry[Int] =
buildConf("celeborn.client.push.retry.threads")
.withAlternative("celeborn.push.retry.threads")
.categories("client")
.doc("Thread number to process shuffle re-send push data requests.")
.version("0.3.0")
.intConf
.createWithDefault(8)
val CLIENT_PUSH_TAKE_TASK_WAIT_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.client.push.takeTaskWaitInterval")
.categories("client")
.doc("Wait interval if no task available to push to worker.")
.version("0.3.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("50ms")
val CLIENT_PUSH_TAKE_TASK_MAX_WAIT_ATTEMPTS: ConfigEntry[Int] =
buildConf("celeborn.client.push.takeTaskMaxWaitAttempts")
.categories("client")
.doc("Max wait times if no task available to push to worker.")
.version("0.3.0")
.intConf
.createWithDefault(1)
val CLIENT_PUSH_SENDBUFFERPOOL_EXPIRETIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.client.push.sendBufferPool.expireTimeout")
.categories("client")
.doc("Timeout before clean up SendBufferPool. If SendBufferPool is idle for more than this time, " +
"the send buffers and push tasks will be cleaned up.")
.version("0.3.1")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("60s")
val CLIENT_PUSH_SENDBUFFERPOOL_CHECKEXPIREINTERVAL: ConfigEntry[Long] =
buildConf("celeborn.client.push.sendBufferPool.checkExpireInterval")
.categories("client")
.doc("Interval to check expire for send buffer pool. If the pool has been idle " +
s"for more than `${CLIENT_PUSH_SENDBUFFERPOOL_EXPIRETIMEOUT.key}`, the pooled send buffers and push tasks will be cleaned up.")
.version("0.3.1")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("30s")
val TEST_CLIENT_RETRY_REVIVE: ConfigEntry[Boolean] =
buildConf("celeborn.test.client.retryRevive")
.withAlternative("celeborn.test.retryRevive")
.internal
.categories("test", "client")
.doc("Fail push data and request for test")
.version("0.3.0")
.booleanConf
.createWithDefault(false)
val CLIENT_FETCH_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.client.fetch.timeout")
.withAlternative("celeborn.fetch.timeout")
.categories("client")
.version("0.3.0")
.doc("Timeout for a task to open stream and fetch chunk.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("600s")
val CLIENT_FETCH_BUFFER_SIZE: ConfigEntry[Long] =
buildConf("celeborn.client.fetch.buffer.size")
.categories("client")
.version("0.4.0")
.doc("Size of reducer partition buffer memory for shuffle reader. The fetched data " +
"will be buffered in memory before consuming. For performance consideration keep " +
s"this buffer size not less than `${CLIENT_PUSH_BUFFER_MAX_SIZE.key}`.")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("64k")
val PARTITION_READER_CHECKPOINT_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.client.partition.reader.checkpoint.enabled")
.categories("client")
.version("0.6.0")
.doc("Whether or not checkpoint reads when re-creating a partition reader. Setting to true minimizes" +
" the amount of unnecessary reads during partition read retries")
.booleanConf
.createWithDefault(false)
val CLIENT_FETCH_MAX_REQS_IN_FLIGHT: ConfigEntry[Int] =
buildConf("celeborn.client.fetch.maxReqsInFlight")
.withAlternative("celeborn.fetch.maxReqsInFlight")
.categories("client")
.version("0.3.0")
.doc("Amount of in-flight chunk fetch request.")
.intConf
.createWithDefault(3)
val CLIENT_FETCH_MAX_RETRIES_FOR_EACH_REPLICA: ConfigEntry[Int] =
buildConf("celeborn.client.fetch.maxRetriesForEachReplica")
.withAlternative("celeborn.fetch.maxRetriesForEachReplica")
.withAlternative("celeborn.fetch.maxRetries")
.categories("client")
.version("0.3.0")
.doc("Max retry times of fetch chunk on each replica")
.intConf
.createWithDefault(3)
val CLIENT_STAGE_RERUN_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.client.spark.stageRerun.enabled")
.withAlternative("celeborn.client.spark.fetch.throwsFetchFailure")
.categories("client")
.version("0.4.0")
.doc("Whether to enable stage rerun. If true, client throws FetchFailedException instead of CelebornIOException.")
.booleanConf
.createWithDefault(true)
val CLIENT_FETCH_EXCLUDE_WORKER_ON_FAILURE_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.client.fetch.excludeWorkerOnFailure.enabled")
.categories("client")
.doc("Whether to enable shuffle client-side fetch exclude workers on failure.")
.version("0.3.0")
.booleanConf
.createWithDefault(false)
val CLIENT_FETCH_EXCLUDED_WORKER_EXPIRE_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.client.fetch.excludedWorker.expireTimeout")
.categories("client")
.doc(
"ShuffleClient is a static object, it will be used in the whole lifecycle of Executor, " +
"We give a expire time for excluded workers to avoid a transient worker issues.")
.version("0.3.0")
.fallbackConf(CLIENT_EXCLUDED_WORKER_EXPIRE_TIMEOUT)
val TEST_CLIENT_FETCH_FAILURE: ConfigEntry[Boolean] =
buildConf("celeborn.test.client.fetchFailure")
.withAlternative("celeborn.test.fetchFailure")
.internal
.categories("test", "client")
.version("0.3.0")
.doc("Whether to test fetch chunk failure")
.booleanConf
.createWithDefault(false)
val SHUFFLE_RANGE_READ_FILTER_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.client.shuffle.rangeReadFilter.enabled")
.withAlternative("celeborn.shuffle.rangeReadFilter.enabled")
.categories("client")
.version("0.2.0")
.doc("If a spark application have skewed partition, this value can set to true to improve performance.")
.booleanConf
.createWithDefault(false)
val SHUFFLE_PARTITION_TYPE: ConfigEntry[String] =
buildConf("celeborn.client.shuffle.partition.type")
.withAlternative("celeborn.shuffle.partition.type")
.categories("client")
.doc("Type of shuffle's partition.")
.version("0.3.0")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValues(Set(
PartitionType.REDUCE.name,
PartitionType.MAP.name,
PartitionType.MAPGROUP.name))
.createWithDefault(PartitionType.REDUCE.name)
val SHUFFLE_PARTITION_SPLIT_THRESHOLD: ConfigEntry[Long] =
buildConf("celeborn.client.shuffle.partitionSplit.threshold")
.withAlternative("celeborn.shuffle.partitionSplit.threshold")
.categories("client")
.doc("Shuffle file size threshold, if file size exceeds this, trigger split.")
.version("0.3.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("1G")
val SHUFFLE_PARTITION_SPLIT_MODE: ConfigEntry[String] =
buildConf("celeborn.client.shuffle.partitionSplit.mode")
.withAlternative("celeborn.shuffle.partitionSplit.mode")
.categories("client")
.doc("soft: the shuffle file size might be larger than split threshold. " +
"hard: the shuffle file size will be limited to split threshold.")
.version("0.3.0")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValues(Set(PartitionSplitMode.SOFT.name, PartitionSplitMode.HARD.name))
.createWithDefault(PartitionSplitMode.SOFT.name)
val SHUFFLE_COMPRESSION_CODEC: ConfigEntry[String] =
buildConf("celeborn.client.shuffle.compression.codec")
.withAlternative("celeborn.shuffle.compression.codec")
.withAlternative("remote-shuffle.job.compression.codec")
.categories("client")
.doc("The codec used to compress shuffle data. By default, Celeborn provides three codecs: `lz4`, `zstd`, `none`. " +
"`none` means that shuffle compression is disabled. " +
"Since Flink version 1.16, zstd is supported for Flink shuffle client.")
.version("0.3.0")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValues(Set(
CompressionCodec.LZ4.name,
CompressionCodec.ZSTD.name,
CompressionCodec.NONE.name))
.createWithDefault(CompressionCodec.LZ4.name)
val SHUFFLE_DECOMPRESSION_LZ4_XXHASH_INSTANCE: OptionalConfigEntry[String] =
buildConf("celeborn.client.shuffle.decompression.lz4.xxhash.instance")
.categories("client")
.doc("Decompression XXHash instance for Lz4. Available options: JNI, JAVASAFE, JAVAUNSAFE.")
.version("0.3.2")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValues(Set("JNI", "JAVASAFE", "JAVAUNSAFE"))
.createOptional
val SHUFFLE_COMPRESSION_ZSTD_LEVEL: ConfigEntry[Int] =
buildConf("celeborn.client.shuffle.compression.zstd.level")
.withAlternative("celeborn.shuffle.compression.zstd.level")
.categories("client")
.doc("Compression level for Zstd compression codec, its value should be an integer " +
"between -5 and 22. Increasing the compression level will result in better compression " +
"at the expense of more CPU and memory.")
.version("0.3.0")
.intConf
.checkValue(
value => value >= -5 && value <= 22,
s"Compression level for Zstd compression codec should be an integer between -5 and 22.")
.createWithDefault(1)
val SHUFFLE_EXPIRED_CHECK_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.client.shuffle.expired.checkInterval")
.withAlternative("celeborn.shuffle.expired.checkInterval")
.categories("client")
.version("0.3.0")
.doc("Interval for client to check expired shuffles.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("60s")
val CLIENT_SHUFFLE_MANAGER_PORT: ConfigEntry[Int] =
buildConf("celeborn.client.shuffle.manager.port")
.withAlternative("celeborn.shuffle.manager.port")
.categories("client")
.version("0.3.0")
.doc("Port used by the LifecycleManager on the Driver.")
.intConf
.checkValue(
(port: Int) => {
if (port != 0) {
logWarning(
"The user specifies the port used by the LifecycleManager on the Driver, and its" +
s" values is $port, which may cause port conflicts and startup failure.")
}
true
},