private def buildConf()

in common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala [1209:3124]


  private def buildConf(key: String): ConfigBuilder = ConfigBuilder(key).onCreate(register)

  val NETWORK_BIND_PREFER_IP: ConfigEntry[Boolean] =
    buildConf("celeborn.network.bind.preferIpAddress")
      .categories("network")
      .version("0.3.0")
      .doc("When `ture`, prefer to use IP address, otherwise FQDN. This configuration only " +
        "takes effects when the bind hostname is not set explicitly, in such case, Celeborn " +
        "will find the first non-loopback address to bind.")
      .booleanConf
      .createWithDefault(true)

  val NETWORK_TIMEOUT: ConfigEntry[Long] =
    buildConf("celeborn.network.timeout")
      .categories("network")
      .version("0.2.0")
      .doc("Default timeout for network operations.")
      .timeConf(TimeUnit.MILLISECONDS)
      .createWithDefaultString("240s")

  val NETWORK_CONNECT_TIMEOUT: ConfigEntry[Long] =
    buildConf("celeborn.network.connect.timeout")
      .categories("network")
      .doc("Default socket connect timeout.")
      .version("0.2.0")
      .timeConf(TimeUnit.MILLISECONDS)
      .createWithDefaultString("10s")

  val NETWORK_MEMORY_ALLOCATOR_SHARE: ConfigEntry[Boolean] =
    buildConf("celeborn.network.memory.allocator.share")
      .categories("network")
      .internal
      .version("0.3.0")
      .doc("Whether to share memory allocator.")
      .booleanConf
      .createWithDefault(true)

  val NETWORK_MEMORY_ALLOCATOR_ARENAS: OptionalConfigEntry[Int] =
    buildConf("celeborn.network.memory.allocator.numArenas")
      .categories("network")
      .version("0.3.0")
      .doc("Number of arenas for pooled memory allocator. Default value is Runtime.getRuntime.availableProcessors, min value is 2.")
      .intConf
      .createOptional

  val NETWORK_MEMORY_ALLOCATOR_VERBOSE_METRIC: ConfigEntry[Boolean] =
    buildConf("celeborn.network.memory.allocator.verbose.metric")
      .categories("network")
      .version("0.3.0")
      .doc("Weather to enable verbose metric for pooled allocator.")
      .booleanConf
      .createWithDefault(false)

  val PORT_MAX_RETRY: ConfigEntry[Int] =
    buildConf("celeborn.port.maxRetries")
      .categories("network")
      .doc("When port is occupied, we will retry for max retry times.")
      .version("0.2.0")
      .intConf
      .createWithDefault(1)

  val RPC_IO_THREAD: OptionalConfigEntry[Int] =
    buildConf("celeborn.rpc.io.threads")
      .categories("network")
      .doc("Netty IO thread number of NettyRpcEnv to handle RPC request. " +
        "The default threads number is the number of runtime available processors.")
      .version("0.2.0")
      .intConf
      .createOptional

  val RPC_CONNECT_THREADS: ConfigEntry[Int] =
    buildConf("celeborn.rpc.connect.threads")
      .categories("network")
      .version("0.2.0")
      .intConf
      .createWithDefault(64)

  val RPC_LOOKUP_TIMEOUT: ConfigEntry[Long] =
    buildConf("celeborn.rpc.lookupTimeout")
      .categories("network")
      .version("0.2.0")
      .doc("Timeout for RPC lookup operations.")
      .timeConf(TimeUnit.MILLISECONDS)
      .createWithDefaultString("30s")

  val RPC_ASK_TIMEOUT: ConfigEntry[Long] =
    buildConf("celeborn.rpc.askTimeout")
      .categories("network")
      .version("0.2.0")
      .doc("Timeout for RPC ask operations. " +
        "It's recommended to set at least `240s` when `HDFS` is enabled in `celeborn.storage.activeTypes`")
      .timeConf(TimeUnit.MILLISECONDS)
      .createWithDefaultString("60s")

  val RPC_DISPATCHER_THREADS: OptionalConfigEntry[Int] =
    buildConf("celeborn.rpc.dispatcher.threads")
      .withAlternative("celeborn.rpc.dispatcher.numThreads")
      .categories("network")
      .doc("Threads number of message dispatcher event loop")
      .version("0.3.0")
      .intConf
      .createOptional

  val NETWORK_IO_MODE: ConfigEntry[String] =
    buildConf("celeborn.<module>.io.mode")
      .categories("network")
      .doc("Netty EventLoopGroup backend, available options: NIO, EPOLL.")
      .stringConf
      .transform(_.toUpperCase)
      .checkValues(Set("NIO", "EPOLL"))
      .createWithDefault("NIO")

  val NETWORK_IO_PREFER_DIRECT_BUFS: ConfigEntry[Boolean] =
    buildConf("celeborn.<module>.io.preferDirectBufs")
      .categories("network")
      .doc("If true, we will prefer allocating off-heap byte buffers within Netty.")
      .booleanConf
      .createWithDefault(true)

  val NETWORK_IO_CONNECT_TIMEOUT: ConfigEntry[Long] =
    buildConf("celeborn.<module>.io.connectTimeout")
      .categories("network")
      .doc("Socket connect timeout.")
      .fallbackConf(NETWORK_CONNECT_TIMEOUT)

  val NETWORK_IO_CONNECTION_TIMEOUT: ConfigEntry[Long] =
    buildConf("celeborn.<module>.io.connectionTimeout")
      .categories("network")
      .doc("Connection active timeout.")
      .fallbackConf(NETWORK_TIMEOUT)

  val NETWORK_IO_NUM_CONNECTIONS_PER_PEER: ConfigEntry[Int] =
    buildConf("celeborn.<module>.io.numConnectionsPerPeer")
      .categories("network")
      .doc("Number of concurrent connections between two nodes.")
      .intConf
      .createWithDefault(2)

  val NETWORK_IO_BACKLOG: ConfigEntry[Int] =
    buildConf("celeborn.<module>.io.backLog")
      .categories("network")
      .doc(
        "Requested maximum length of the queue of incoming connections. Default 0 for no backlog.")
      .intConf
      .createWithDefault(0)

  val NETWORK_IO_SERVER_THREADS: ConfigEntry[Int] =
    buildConf("celeborn.<module>.io.serverThreads")
      .categories("network")
      .doc("Number of threads used in the server thread pool. Default to 0, which is 2x#cores.")
      .intConf
      .createWithDefault(0)

  val NETWORK_IO_CLIENT_THREADS: ConfigEntry[Int] =
    buildConf("celeborn.<module>.io.clientThreads")
      .categories("network")
      .doc("Number of threads used in the client thread pool. Default to 0, which is 2x#cores.")
      .intConf
      .createWithDefault(0)

  val NETWORK_IO_RECEIVE_BUFFER: ConfigEntry[Long] =
    buildConf("celeborn.<module>.io.receiveBuffer")
      .categories("network")
      .doc("Receive buffer size (SO_RCVBUF). Note: the optimal size for receive buffer and send buffer " +
        "should be latency * network_bandwidth. Assuming latency = 1ms, network_bandwidth = 10Gbps " +
        "buffer size should be ~ 1.25MB.")
      .version("0.2.0")
      .bytesConf(ByteUnit.BYTE)
      .createWithDefault(0)

  val NETWORK_IO_SEND_BUFFER: ConfigEntry[Long] =
    buildConf("celeborn.<module>.io.sendBuffer")
      .categories("network")
      .doc("Send buffer size (SO_SNDBUF).")
      .version("0.2.0")
      .bytesConf(ByteUnit.BYTE)
      .createWithDefault(0)

  val NETWORK_IO_MAX_RETRIES: ConfigEntry[Int] =
    buildConf("celeborn.<module>.io.maxRetries")
      .categories("network")
      .doc(
        "Max number of times we will try IO exceptions (such as connection timeouts) per request. " +
          "If set to 0, we will not do any retries.")
      .intConf
      .createWithDefault(3)

  val NETWORK_IO_RETRY_WAIT: ConfigEntry[Long] =
    buildConf("celeborn.<module>.io.retryWait")
      .categories("network")
      .doc("Time that we will wait in order to perform a retry after an IOException. " +
        "Only relevant if maxIORetries > 0.")
      .version("0.2.0")
      .timeConf(TimeUnit.MILLISECONDS)
      .createWithDefaultString("5s")

  val NETWORK_IO_LAZY_FD: ConfigEntry[Boolean] =
    buildConf("celeborn.<module>.io.lazyFD")
      .categories("network")
      .doc("Whether to initialize FileDescriptor lazily or not. If true, file descriptors are created only " +
        "when data is going to be transferred. This can reduce the number of open files.")
      .booleanConf
      .createWithDefault(true)

  val NETWORK_VERBOSE_METRICS: ConfigEntry[Boolean] =
    buildConf("celeborn.<module>.io.enableVerboseMetrics")
      .categories("network")
      .doc("Whether to track Netty memory detailed metrics. If true, the detailed metrics of Netty " +
        "PoolByteBufAllocator will be gotten, otherwise only general memory usage will be tracked.")
      .booleanConf
      .createWithDefault(false)

  val NETWORK_IO_STORAGE_MEMORY_MAP_THRESHOLD: ConfigEntry[Long] =
    buildConf("celeborn.<module>.storage.memoryMapThreshold")
      .withAlternative("celeborn.storage.memoryMapThreshold")
      .categories("network")
      .internal
      .doc("Minimum size of a block that we should start using memory map rather than reading in through " +
        "normal IO operations. This prevents Celeborn from memory mapping very small blocks. In general, " +
        "memory mapping has high overhead for blocks close to or below the page size of the OS.")
      .version("0.3.0")
      .bytesConf(ByteUnit.BYTE)
      .createWithDefaultString("2m")

  val MAX_CHUNKS_BEING_TRANSFERRED: OptionalConfigEntry[Long] =
    buildConf("celeborn.shuffle.io.maxChunksBeingTransferred")
      .categories("network")
      .doc("The max number of chunks allowed to be transferred at the same time on shuffle service. Note " +
        "that new incoming connections will be closed when the max number is hit. The client will retry " +
        "according to the shuffle retry configs (see `celeborn.<module>.io.maxRetries` and " +
        "`celeborn.<module>.io.retryWait`), if those limits are reached the task will fail with fetch failure.")
      .version("0.2.0")
      .longConf
      .createOptional

  val PUSH_TIMEOUT_CHECK_INTERVAL: ConfigEntry[Long] =
    buildConf("celeborn.<module>.push.timeoutCheck.interval")
      .categories("network")
      .doc("Interval for checking push data timeout. " +
        s"If setting <module> to `${TransportModuleConstants.DATA_MODULE}`, " +
        s"it works for shuffle client push data and should be configured on client side. " +
        s"If setting <module> to `${TransportModuleConstants.REPLICATE_MODULE}`, " +
        s"it works for worker replicate data to peer worker and should be configured on worker side.")
      .version("0.3.0")
      .timeConf(TimeUnit.MILLISECONDS)
      .createWithDefaultString("5s")

  val PUSH_TIMEOUT_CHECK_THREADS: ConfigEntry[Int] =
    buildConf("celeborn.<module>.push.timeoutCheck.threads")
      .categories("network")
      .doc("Threads num for checking push data timeout. " +
        s"If setting <module> to `${TransportModuleConstants.DATA_MODULE}`, " +
        s"it works for shuffle client push data and should be configured on client side. " +
        s"If setting <module> to `${TransportModuleConstants.REPLICATE_MODULE}`, " +
        s"it works for worker replicate data to peer worker and should be configured on worker side.")
      .version("0.3.0")
      .intConf
      .createWithDefault(4)

  val FETCH_TIMEOUT_CHECK_INTERVAL: ConfigEntry[Long] =
    buildConf("celeborn.<module>.fetch.timeoutCheck.interval")
      .categories("network")
      .doc("Interval for checking fetch data timeout. " +
        s"It only support setting <module> to `${TransportModuleConstants.DATA_MODULE}` " +
        s"since it works for shuffle client fetch data and should be configured on client side.")
      .version("0.3.0")
      .timeConf(TimeUnit.MILLISECONDS)
      .createWithDefaultString("5s")

  val FETCH_TIMEOUT_CHECK_THREADS: ConfigEntry[Int] =
    buildConf("celeborn.<module>.fetch.timeoutCheck.threads")
      .categories("network")
      .doc("Threads num for checking fetch data timeout. " +
        s"It only support setting <module> to `${TransportModuleConstants.DATA_MODULE}` " +
        s"since it works for shuffle client fetch data and should be configured on client side.")
      .version("0.3.0")
      .intConf
      .createWithDefault(4)

  val CHANNEL_HEARTBEAT_INTERVAL: ConfigEntry[Long] =
    buildConf("celeborn.<module>.heartbeat.interval")
      .withAlternative("celeborn.client.heartbeat.interval")
      .categories("network")
      .version("0.3.0")
      .doc("The heartbeat interval between worker and client. " +
        s"If setting <module> to `${TransportModuleConstants.DATA_MODULE}`, " +
        s"it works for shuffle client push and fetch data and should be configured on client side. " +
        s"If setting <module> to `${TransportModuleConstants.REPLICATE_MODULE}`, " +
        s"it works for worker replicate data to peer worker and should be configured on worker side.")
      .timeConf(TimeUnit.MILLISECONDS)
      .createWithDefaultString("60s")

  val MASTER_ENDPOINTS: ConfigEntry[Seq[String]] =
    buildConf("celeborn.master.endpoints")
      .categories("client", "worker")
      .doc("Endpoints of master nodes for celeborn client to connect, allowed pattern " +
        "is: `<host1>:<port1>[,<host2>:<port2>]*`, e.g. `clb1:9097,clb2:9098,clb3:9099`. " +
        "If the port is omitted, 9097 will be used.")
      .version("0.2.0")
      .stringConf
      .toSequence
      .checkValue(
        endpoints => endpoints.map(_ => Try(Utils.parseHostPort(_))).forall(_.isSuccess),
        "Allowed pattern is: `<host1>:<port1>[,<host2>:<port2>]*`")
      .createWithDefaultString(s"<localhost>:9097")

  val MASTER_CLIENT_RPC_ASK_TIMEOUT: ConfigEntry[Long] =
    buildConf("celeborn.masterClient.rpc.askTimeout")
      .withAlternative("celeborn.rpc.haClient.askTimeout")
      .internal
      .categories("client", "worker")
      .version("0.3.0")
      .doc("Timeout for HA client RPC ask operations.")
      .fallbackConf(RPC_ASK_TIMEOUT)

  val MASTER_CLIENT_MAX_RETRIES: ConfigEntry[Int] =
    buildConf("celeborn.masterClient.maxRetries")
      .withAlternative("celeborn.client.maxRetries")
      .internal
      .categories("client", "worker")
      .doc("Max retry times for client to connect master endpoint")
      .version("0.3.0")
      .intConf
      .createWithDefault(15)

  val APPLICATION_HEARTBEAT_TIMEOUT: ConfigEntry[Long] =
    buildConf("celeborn.master.heartbeat.application.timeout")
      .withAlternative("celeborn.application.heartbeat.timeout")
      .categories("master")
      .version("0.3.0")
      .doc("Application heartbeat timeout.")
      .timeConf(TimeUnit.MILLISECONDS)
      .createWithDefaultString("300s")

  val HDFS_EXPIRE_DIRS_TIMEOUT: ConfigEntry[Long] =
    buildConf("celeborn.master.hdfs.expireDirs.timeout")
      .categories("master")
      .version("0.3.0")
      .doc("The timeout for a expire dirs to be deleted on HDFS.")
      .timeConf(TimeUnit.MILLISECONDS)
      .createWithDefaultString("1h")

  val WORKER_HEARTBEAT_TIMEOUT: ConfigEntry[Long] =
    buildConf("celeborn.master.heartbeat.worker.timeout")
      .withAlternative("celeborn.worker.heartbeat.timeout")
      .categories("master")
      .version("0.3.0")
      .doc("Worker heartbeat timeout.")
      .timeConf(TimeUnit.MILLISECONDS)
      .createWithDefaultString("120s")

  val MASTER_HOST: ConfigEntry[String] =
    buildConf("celeborn.master.host")
      .categories("master")
      .version("0.2.0")
      .doc("Hostname for master to bind.")
      .stringConf
      .createWithDefaultString("<localhost>")

  val MASTER_PORT: ConfigEntry[Int] =
    buildConf("celeborn.master.port")
      .categories("master")
      .version("0.2.0")
      .doc("Port for master to bind.")
      .intConf
      .checkValue(p => p >= 1024 && p < 65535, "Invalid port")
      .createWithDefault(9097)

  val HA_ENABLED: ConfigEntry[Boolean] =
    buildConf("celeborn.master.ha.enabled")
      .withAlternative("celeborn.ha.enabled")
      .categories("ha")
      .version("0.3.0")
      .doc("When true, master nodes run as Raft cluster mode.")
      .booleanConf
      .createWithDefault(false)

  val HA_MASTER_NODE_ID: OptionalConfigEntry[String] =
    buildConf("celeborn.master.ha.node.id")
      .withAlternative("celeborn.ha.master.node.id")
      .doc("Node id for master raft cluster in HA mode, if not define, " +
        "will be inferred by hostname.")
      .version("0.3.0")
      .stringConf
      .createOptional

  val HA_MASTER_NODE_HOST: ConfigEntry[String] =
    buildConf("celeborn.master.ha.node.<id>.host")
      .withAlternative("celeborn.ha.master.node.<id>.host")
      .categories("ha")
      .doc("Host to bind of master node <id> in HA mode.")
      .version("0.3.0")
      .stringConf
      .createWithDefaultString("<required>")

  val HA_MASTER_NODE_PORT: ConfigEntry[Int] =
    buildConf("celeborn.master.ha.node.<id>.port")
      .withAlternative("celeborn.ha.master.node.<id>.port")
      .categories("ha")
      .doc("Port to bind of master node <id> in HA mode.")
      .version("0.3.0")
      .intConf
      .checkValue(p => p >= 1024 && p < 65535, "Invalid port")
      .createWithDefault(9097)

  val HA_MASTER_NODE_RATIS_HOST: OptionalConfigEntry[String] =
    buildConf("celeborn.master.ha.node.<id>.ratis.host")
      .withAlternative("celeborn.ha.master.node.<id>.ratis.host")
      .internal
      .categories("ha")
      .doc("Ratis host to bind of master node <id> in HA mode. If not provided, " +
        s"fallback to ${HA_MASTER_NODE_HOST.key}.")
      .version("0.3.0")
      .stringConf
      .createOptional

  val HA_MASTER_NODE_RATIS_PORT: ConfigEntry[Int] =
    buildConf("celeborn.master.ha.node.<id>.ratis.port")
      .withAlternative("celeborn.ha.master.node.<id>.ratis.port")
      .categories("ha")
      .doc("Ratis port to bind of master node <id> in HA mode.")
      .version("0.3.0")
      .intConf
      .checkValue(p => p >= 1024 && p < 65535, "Invalid port")
      .createWithDefault(9872)

  val HA_MASTER_RATIS_RPC_TYPE: ConfigEntry[String] =
    buildConf("celeborn.master.ha.ratis.raft.rpc.type")
      .withAlternative("celeborn.ha.master.ratis.raft.rpc.type")
      .categories("ha")
      .doc("RPC type for Ratis, available options: netty, grpc.")
      .version("0.3.0")
      .stringConf
      .transform(_.toLowerCase)
      .checkValues(Set("netty", "grpc"))
      .createWithDefault("netty")

  val HA_MASTER_RATIS_STORAGE_DIR: ConfigEntry[String] =
    buildConf("celeborn.master.ha.ratis.raft.server.storage.dir")
      .withAlternative("celeborn.ha.master.ratis.raft.server.storage.dir")
      .categories("ha")
      .version("0.3.0")
      .stringConf
      .createWithDefault("/tmp/ratis")

  val HA_MASTER_RATIS_LOG_SEGMENT_SIZE_MAX: ConfigEntry[Long] =
    buildConf("celeborn.master.ha.ratis.raft.server.log.segment.size.max")
      .withAlternative("celeborn.ha.master.ratis.raft.server.log.segment.size.max")
      .internal
      .categories("ha")
      .version("0.3.0")
      .bytesConf(ByteUnit.BYTE)
      .createWithDefaultString("4MB")

  val HA_MASTER_RATIS_LOG_PREALLOCATED_SIZE: ConfigEntry[Long] =
    buildConf("celeborn.master.ha.ratis.raft.server.log.preallocated.size")
      .withAlternative("celeborn.ha.master.ratis.raft.server.log.preallocated.size")
      .internal
      .categories("ha")
      .version("0.3.0")
      .bytesConf(ByteUnit.BYTE)
      .createWithDefaultString("4MB")

  val HA_MASTER_RATIS_LOG_APPENDER_QUEUE_NUM_ELEMENTS: ConfigEntry[Int] =
    buildConf("celeborn.master.ha.ratis.raft.server.log.appender.buffer.element-limit")
      .withAlternative("celeborn.ha.master.ratis.raft.server.log.appender.buffer.element-limit")
      .internal
      .categories("ha")
      .version("0.3.0")
      .intConf
      .createWithDefault(1024)

  val HA_MASTER_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT: ConfigEntry[Long] =
    buildConf("celeborn.master.ha.ratis.raft.server.log.appender.buffer.byte-limit")
      .withAlternative("celeborn.ha.master.ratis.raft.server.log.appender.buffer.byte-limit")
      .internal
      .categories("ha")
      .version("0.3.0")
      .bytesConf(ByteUnit.BYTE)
      .createWithDefaultString("32MB")

  val HA_MASTER_RATIS_LOG_INSTALL_SNAPSHOT_ENABLED: ConfigEntry[Boolean] =
    buildConf("celeborn.master.ha.ratis.raft.server.log.appender.install.snapshot.enabled")
      .withAlternative("celeborn.ha.master.ratis.raft.server.log.appender.install.snapshot.enabled")
      .internal
      .categories("ha")
      .version("0.3.0")
      .booleanConf
      .createWithDefault(true)

  val HA_MASTER_RATIS_LOG_PURGE_GAP: ConfigEntry[Int] =
    buildConf("celeborn.master.ha.ratis.raft.server.log.purge.gap")
      .withAlternative("celeborn.ha.master.ratis.raft.server.log.purge.gap")
      .internal
      .categories("ha")
      .version("0.3.0")
      .intConf
      .createWithDefault(1000000)

  val HA_MASTER_RATIS_RPC_REQUEST_TIMEOUT: ConfigEntry[Long] =
    buildConf("celeborn.master.ha.ratis.raft.server.rpc.request.timeout")
      .withAlternative("celeborn.ha.master.ratis.raft.server.rpc.request.timeout")
      .internal
      .categories("ha")
      .version("0.3.0")
      .timeConf(TimeUnit.SECONDS)
      .createWithDefaultString("3s")

  val HA_MASTER_RATIS_SERVER_RETRY_CACHE_EXPIRY_TIME: ConfigEntry[Long] =
    buildConf("celeborn.master.ha.ratis.raft.server.retrycache.expirytime")
      .withAlternative("celeborn.ha.master.ratis.raft.server.retrycache.expirytime")
      .internal
      .categories("ha")
      .version("0.3.0")
      .timeConf(TimeUnit.SECONDS)
      .createWithDefaultString("600s")

  val HA_MASTER_RATIS_RPC_TIMEOUT_MIN: ConfigEntry[Long] =
    buildConf("celeborn.master.ha.ratis.raft.server.rpc.timeout.min")
      .withAlternative("celeborn.ha.master.ratis.raft.server.rpc.timeout.min")
      .internal
      .categories("ha")
      .version("0.3.0")
      .timeConf(TimeUnit.SECONDS)
      .createWithDefaultString("3s")

  val HA_MASTER_RATIS_RPC_TIMEOUT_MAX: ConfigEntry[Long] =
    buildConf("celeborn.master.ha.ratis.raft.server.rpc.timeout.max")
      .withAlternative("celeborn.ha.master.ratis.raft.server.rpc.timeout.max")
      .internal
      .categories("ha")
      .version("0.3.0")
      .timeConf(TimeUnit.SECONDS)
      .createWithDefaultString("5s")

  val HA_MASTER_RATIS_FIRSTELECTION_TIMEOUT_MIN: ConfigEntry[Long] =
    buildConf("celeborn.master.ha.ratis.first.election.timeout.min")
      .withAlternative("celeborn.ha.master.ratis.first.election.timeout.min")
      .internal
      .categories("ha")
      .version("0.3.0")
      .timeConf(TimeUnit.SECONDS)
      .createWithDefaultString("3s")

  val HA_MASTER_RATIS_FIRSTELECTION_TIMEOUT_MAX: ConfigEntry[Long] =
    buildConf("celeborn.master.ha.ratis.first.election.timeout.max")
      .withAlternative("celeborn.ha.master.ratis.first.election.timeout.max")
      .internal
      .categories("ha")
      .version("0.3.0")
      .timeConf(TimeUnit.SECONDS)
      .createWithDefaultString("5s")

  val HA_MASTER_RATIS_NOTIFICATION_NO_LEADER_TIMEOUT: ConfigEntry[Long] =
    buildConf("celeborn.master.ha.ratis.raft.server.notification.no-leader.timeout")
      .withAlternative("celeborn.ha.master.ratis.raft.server.notification.no-leader.timeout")
      .internal
      .categories("ha")
      .version("0.3.0")
      .timeConf(TimeUnit.SECONDS)
      .createWithDefaultString("30s")

  val HA_MASTER_RATIS_RPC_SLOWNESS_TIMEOUT: ConfigEntry[Long] =
    buildConf("celeborn.master.ha.ratis.raft.server.rpc.slowness.timeout")
      .withAlternative("celeborn.ha.master.ratis.raft.server.rpc.slowness.timeout")
      .internal
      .categories("ha")
      .version("0.3.0")
      .timeConf(TimeUnit.SECONDS)
      .createWithDefaultString("120s")

  val HA_MASTER_RATIS_ROLE_CHECK_INTERVAL: ConfigEntry[Long] =
    buildConf("celeborn.master.ha.ratis.raft.server.role.check.interval")
      .withAlternative("celeborn.ha.master.ratis.raft.server.role.check.interval")
      .internal
      .categories("ha")
      .version("0.3.0")
      .timeConf(TimeUnit.MILLISECONDS)
      .createWithDefaultString("1s")

  val HA_MASTER_RATIS_SNAPSHOT_AUTO_TRIGGER_ENABLED: ConfigEntry[Boolean] =
    buildConf("celeborn.master.ha.ratis.raft.server.snapshot.auto.trigger.enabled")
      .withAlternative("celeborn.ha.master.ratis.raft.server.snapshot.auto.trigger.enabled")
      .internal
      .categories("ha")
      .version("0.3.0")
      .booleanConf
      .createWithDefault(true)

  val HA_MASTER_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD: ConfigEntry[Long] =
    buildConf("celeborn.master.ha.ratis.raft.server.snapshot.auto.trigger.threshold")
      .withAlternative("celeborn.ha.master.ratis.raft.server.snapshot.auto.trigger.threshold")
      .internal
      .categories("ha")
      .version("0.3.0")
      .longConf
      .createWithDefault(200000L)

  val HA_MASTER_RATIS_SNAPSHOT_RETENTION_FILE_NUM: ConfigEntry[Int] =
    buildConf("celeborn.master.ha.ratis.raft.server.snapshot.retention.file.num")
      .withAlternative("celeborn.ha.master.ratis.raft.server.snapshot.retention.file.num")
      .internal
      .categories("ha")
      .version("0.3.0")
      .intConf
      .createWithDefault(3)

  val MASTER_SLOT_ASSIGN_POLICY: ConfigEntry[String] =
    buildConf("celeborn.master.slot.assign.policy")
      .withAlternative("celeborn.slots.assign.policy")
      .categories("master")
      .version("0.3.0")
      .doc("Policy for master to assign slots, Celeborn supports two types of policy: roundrobin and loadaware. " +
        "Loadaware policy will be ignored when `HDFS` is enabled in `celeborn.storage.activeTypes`")
      .stringConf
      .transform(_.toUpperCase(Locale.ROOT))
      .checkValues(Set(
        SlotsAssignPolicy.ROUNDROBIN.name,
        SlotsAssignPolicy.LOADAWARE.name))
      .createWithDefault(SlotsAssignPolicy.ROUNDROBIN.name)

  val MASTER_SLOT_ASSIGN_LOADAWARE_DISKGROUP_NUM: ConfigEntry[Int] =
    buildConf("celeborn.master.slot.assign.loadAware.numDiskGroups")
      .withAlternative("celeborn.slots.assign.loadAware.numDiskGroups")
      .categories("master")
      .doc("This configuration is a guidance for load-aware slot allocation algorithm. " +
        "This value is control how many disk groups will be created.")
      .version("0.3.0")
      .intConf
      .createWithDefault(5)

  val MASTER_SLOT_ASSIGN_LOADAWARE_DISKGROUP_GRADIENT: ConfigEntry[Double] =
    buildConf("celeborn.master.slot.assign.loadAware.diskGroupGradient")
      .withAlternative("celeborn.slots.assign.loadAware.diskGroupGradient")
      .categories("master")
      .doc("This value means how many more workload will be placed into a faster disk group " +
        "than a slower group.")
      .version("0.3.0")
      .doubleConf
      .createWithDefault(0.1)

  val MASTER_SLOT_ASSIGN_LOADAWARE_FLUSHTIME_WEIGHT: ConfigEntry[Double] =
    buildConf("celeborn.master.slot.assign.loadAware.flushTimeWeight")
      .withAlternative("celeborn.slots.assign.loadAware.flushTimeWeight")
      .categories("master")
      .doc(
        "Weight of average flush time when calculating ordering in load-aware assignment strategy")
      .version("0.3.0")
      .doubleConf
      .createWithDefault(0)

  val MASTER_SLOT_ASSIGN_LOADAWARE_FETCHTIME_WEIGHT: ConfigEntry[Double] =
    buildConf("celeborn.master.slot.assign.loadAware.fetchTimeWeight")
      .withAlternative("celeborn.slots.assign.loadAware.fetchTimeWeight")
      .categories("master")
      .doc(
        "Weight of average fetch time when calculating ordering in load-aware assignment strategy")
      .version("0.3.0")
      .doubleConf
      .createWithDefault(1)

  val MASTER_SLOT_ASSIGN_EXTRA_SLOTS: ConfigEntry[Int] =
    buildConf("celeborn.master.slot.assign.extraSlots")
      .withAlternative("celeborn.slots.assign.extraSlots")
      .categories("master")
      .version("0.3.0")
      .doc("Extra slots number when master assign slots.")
      .intConf
      .createWithDefault(2)

  val MASTER_SLOT_ASSIGN_MAX_WORKERS: ConfigEntry[Int] =
    buildConf("celeborn.master.slot.assign.maxWorkers")
      .categories("master")
      .version("0.3.1")
      .doc("Max workers that slots of one shuffle can be allocated on. Will choose the smaller positive one " +
        s"from Master side and Client side, see `celeborn.client.slot.assign.maxWorkers`.")
      .intConf
      .createWithDefault(10000)

  val ESTIMATED_PARTITION_SIZE_INITIAL_SIZE: ConfigEntry[Long] =
    buildConf("celeborn.master.estimatedPartitionSize.initialSize")
      .withAlternative("celeborn.shuffle.initialEstimatedPartitionSize")
      .categories("master")
      .doc("Initial partition size for estimation, it will change according to runtime stats.")
      .version("0.3.0")
      .bytesConf(ByteUnit.BYTE)
      .createWithDefaultString("64mb")

  val ESTIMATED_PARTITION_SIZE_MIN_SIZE: ConfigEntry[Long] =
    buildConf("celeborn.master.estimatedPartitionSize.minSize")
      .withAlternative("celeborn.shuffle.minPartitionSizeToEstimate")
      .categories("worker")
      .doc(
        "Ignore partition size smaller than this configuration of partition size for estimation.")
      .version("0.3.0")
      .bytesConf(ByteUnit.BYTE)
      .createWithDefaultString("8mb")

  val ESTIMATED_PARTITION_SIZE_UPDATE_INITIAL_DELAY: ConfigEntry[Long] =
    buildConf("celeborn.master.estimatedPartitionSize.update.initialDelay")
      .withAlternative("celeborn.shuffle.estimatedPartitionSize.update.initialDelay")
      .categories("master")
      .doc("Initial delay time before start updating partition size for estimation.")
      .version("0.3.0")
      .timeConf(TimeUnit.MILLISECONDS)
      .createWithDefaultString("5min")

  val ESTIMATED_PARTITION_SIZE_UPDATE_INTERVAL: ConfigEntry[Long] =
    buildConf("celeborn.master.estimatedPartitionSize.update.interval")
      .withAlternative("celeborn.shuffle.estimatedPartitionSize.update.interval")
      .categories("master")
      .doc("Interval of updating partition size for estimation.")
      .version("0.3.0")
      .timeConf(TimeUnit.MILLISECONDS)
      .createWithDefaultString("10min")

  val MASTER_RESOURCE_CONSUMPTION_INTERVAL: ConfigEntry[Long] =
    buildConf("celeborn.master.userResourceConsumption.update.interval")
      .categories("master")
      .doc("Time length for a window about compute user resource consumption.")
      .version("0.3.0")
      .timeConf(TimeUnit.MILLISECONDS)
      .createWithDefaultString("30s")

  val SHUFFLE_CHUNK_SIZE: ConfigEntry[Long] =
    buildConf("celeborn.shuffle.chunk.size")
      .categories("client", "worker")
      .version("0.2.0")
      .doc("Max chunk size of reducer's merged shuffle data. For example, if a reducer's " +
        "shuffle data is 128M and the data will need 16 fetch chunk requests to fetch.")
      .bytesConf(ByteUnit.BYTE)
      .createWithDefaultString("8m")

  val WORKER_PARTITION_SPLIT_ENABLED: ConfigEntry[Boolean] =
    buildConf("celeborn.worker.shuffle.partitionSplit.enabled")
      .withAlternative("celeborn.worker.partition.split.enabled")
      .categories("worker")
      .version("0.3.0")
      .doc("enable the partition split on worker side")
      .booleanConf
      .createWithDefault(true)

  val WORKER_PARTITION_SPLIT_MIN_SIZE: ConfigEntry[Long] =
    buildConf("celeborn.worker.shuffle.partitionSplit.min")
      .withAlternative("celeborn.shuffle.partitionSplit.min")
      .categories("worker")
      .doc("Min size for a partition to split")
      .version("0.3.0")
      .bytesConf(ByteUnit.BYTE)
      .createWithDefaultString("1m")

  val WORKER_PARTITION_SPLIT_MAX_SIZE: ConfigEntry[Long] =
    buildConf("celeborn.worker.shuffle.partitionSplit.max")
      .categories("worker")
      .doc("Specify the maximum partition size for splitting, and ensure that individual partition files are always smaller than this limit.")
      .version("0.3.0")
      .bytesConf(ByteUnit.BYTE)
      .createWithDefaultString("2g")

  val WORKER_STORAGE_DIRS: OptionalConfigEntry[Seq[String]] =
    buildConf("celeborn.worker.storage.dirs")
      .categories("worker")
      .version("0.2.0")
      .doc("Directory list to store shuffle data. It's recommended to configure one directory " +
        "on each disk. Storage size limit can be set for each directory. For the sake of " +
        "performance, there should be no more than 2 flush threads " +
        "on the same disk partition if you are using HDD, and should be 8 or more flush threads " +
        "on the same disk partition if you are using SSD. For example: " +
        "`dir1[:capacity=][:disktype=][:flushthread=],dir2[:capacity=][:disktype=][:flushthread=]`")
      .stringConf
      .toSequence
      .createOptional

  val WORKER_WORKING_DIR: ConfigEntry[String] =
    buildConf("celeborn.worker.storage.workingDir")
      .withAlternative("celeborn.worker.workingDir")
      .categories("worker")
      .doc("Worker's working dir path name.")
      .version("0.3.0")
      .stringConf
      .createWithDefault("celeborn-worker/shuffle_data")

  val WORKER_STORAGE_BASE_DIR_PREFIX: ConfigEntry[String] =
    buildConf("celeborn.worker.storage.baseDir.prefix")
      .internal
      .categories("worker")
      .version("0.2.0")
      .doc("Base directory for Celeborn worker to write if " +
        s"`${WORKER_STORAGE_DIRS.key}` is not set.")
      .stringConf
      .createWithDefault("/mnt/disk")

  val WORKER_STORAGE_BASE_DIR_COUNT: ConfigEntry[Int] =
    buildConf("celeborn.worker.storage.baseDir.number")
      .internal
      .categories("worker")
      .version("0.2.0")
      .doc(s"How many directories will be used if `${WORKER_STORAGE_DIRS.key}` is not set. " +
        s"The directory name is a combination of `${WORKER_STORAGE_BASE_DIR_PREFIX.key}` " +
        "and from one(inclusive) to `celeborn.worker.storage.baseDir.number`(inclusive) " +
        "step by one.")
      .intConf
      .createWithDefault(16)

  val HDFS_DIR: OptionalConfigEntry[String] =
    buildConf("celeborn.storage.hdfs.dir")
      .categories("worker", "master", "client")
      .version("0.2.0")
      .doc("HDFS base directory for Celeborn to store shuffle data.")
      .stringConf
      .createOptional

  val WORKER_DISK_RESERVE_SIZE: ConfigEntry[Long] =
    buildConf("celeborn.worker.storage.disk.reserve.size")
      .withAlternative("celeborn.worker.disk.reserve.size")
      .categories("worker")
      .doc("Celeborn worker reserved space for each disk.")
      .version("0.3.0")
      .bytesConf(ByteUnit.BYTE)
      .createWithDefaultString("5G")

  val WORKER_CHECK_FILE_CLEAN_MAX_RETRIES: ConfigEntry[Int] =
    buildConf("celeborn.worker.storage.checkDirsEmpty.maxRetries")
      .withAlternative("celeborn.worker.disk.checkFileClean.maxRetries")
      .categories("worker")
      .doc("The number of retries for a worker to check if the working directory is cleaned up before registering with the master.")
      .version("0.3.0")
      .intConf
      .createWithDefault(3)

  val WORKER_CHECK_FILE_CLEAN_TIMEOUT: ConfigEntry[Long] =
    buildConf("celeborn.worker.storage.checkDirsEmpty.timeout")
      .withAlternative("celeborn.worker.disk.checkFileClean.timeout")
      .categories("worker")
      .doc("The wait time per retry for a worker to check if the working directory is cleaned up before registering with the master.")
      .version("0.3.0")
      .timeConf(TimeUnit.MILLISECONDS)
      .createWithDefaultString("1000ms")

  val WORKER_RPC_PORT: ConfigEntry[Int] =
    buildConf("celeborn.worker.rpc.port")
      .categories("worker")
      .doc("Server port for Worker to receive RPC request.")
      .version("0.2.0")
      .intConf
      .createWithDefault(0)

  val WORKER_PUSH_PORT: ConfigEntry[Int] =
    buildConf("celeborn.worker.push.port")
      .categories("worker")
      .doc("Server port for Worker to receive push data request from ShuffleClient.")
      .version("0.2.0")
      .intConf
      .createWithDefault(0)

  val WORKER_FETCH_PORT: ConfigEntry[Int] =
    buildConf("celeborn.worker.fetch.port")
      .categories("worker")
      .doc("Server port for Worker to receive fetch data request from ShuffleClient.")
      .version("0.2.0")
      .intConf
      .createWithDefault(0)

  val WORKER_REPLICATE_PORT: ConfigEntry[Int] =
    buildConf("celeborn.worker.replicate.port")
      .categories("worker")
      .doc("Server port for Worker to receive replicate data request from other Workers.")
      .version("0.2.0")
      .intConf
      .createWithDefault(0)

  val WORKER_PUSH_IO_THREADS: OptionalConfigEntry[Int] =
    buildConf("celeborn.worker.push.io.threads")
      .categories("worker")
      .doc("Netty IO thread number of worker to handle client push data. " +
        s"The default threads number is the number of flush thread.")
      .version("0.2.0")
      .intConf
      .createOptional

  val WORKER_FETCH_IO_THREADS: OptionalConfigEntry[Int] =
    buildConf("celeborn.worker.fetch.io.threads")
      .categories("worker")
      .doc("Netty IO thread number of worker to handle client fetch data. " +
        s"The default threads number is the number of flush thread.")
      .version("0.2.0")
      .intConf
      .createOptional

  val WORKER_REPLICATE_IO_THREADS: OptionalConfigEntry[Int] =
    buildConf("celeborn.worker.replicate.io.threads")
      .categories("worker")
      .doc("Netty IO thread number of worker to replicate shuffle data. " +
        s"The default threads number is the number of flush thread.")
      .version("0.2.0")
      .intConf
      .createOptional

  val WORKER_REGISTER_TIMEOUT: ConfigEntry[Long] =
    buildConf("celeborn.worker.register.timeout")
      .categories("worker")
      .doc("Worker register timeout.")
      .version("0.2.0")
      .timeConf(TimeUnit.MILLISECONDS)
      .createWithDefaultString("180s")

  val WORKER_CLOSE_IDLE_CONNECTIONS: ConfigEntry[Boolean] =
    buildConf("celeborn.worker.closeIdleConnections")
      .categories("worker")
      .doc("Whether worker will close idle connections.")
      .version("0.2.0")
      .booleanConf
      .createWithDefault(false)

  val WORKER_REPLICATE_FAST_FAIL_DURATION: ConfigEntry[Long] =
    buildConf("celeborn.worker.replicate.fastFail.duration")
      .categories("worker")
      .doc("If a replicate request not replied during the duration, worker will mark the replicate data request as failed." +
        "It's recommended to set at least `240s` when `HDFS` is enabled in `celeborn.storage.activeTypes`.")
      .version("0.2.0")
      .timeConf(TimeUnit.MILLISECONDS)
      .createWithDefaultString("60s")

  val WORKER_REPLICATE_RANDOM_CONNECTION_ENABLED: ConfigEntry[Boolean] =
    buildConf("celeborn.worker.replicate.randomConnection.enabled")
      .categories("worker")
      .doc("Whether worker will create random connection to peer when replicate data. When false, worker tend to " +
        "reuse the same cached TransportClient to a specific replicate worker; when true, worker tend to use " +
        "different cached TransportClient. Netty will use the same thread to serve the same connection, so " +
        "with more connections replicate server can leverage more netty threads")
      .version("0.2.1")
      .booleanConf
      .createWithDefault(true)

  val WORKER_REPLICATE_THREADS: ConfigEntry[Int] =
    buildConf("celeborn.worker.replicate.threads")
      .categories("worker")
      .version("0.2.0")
      .doc("Thread number of worker to replicate shuffle data.")
      .intConf
      .createWithDefault(64)

  val WORKER_COMMIT_THREADS: ConfigEntry[Int] =
    buildConf("celeborn.worker.commitFiles.threads")
      .withAlternative("celeborn.worker.commit.threads")
      .categories("worker")
      .version("0.3.0")
      .doc("Thread number of worker to commit shuffle data files asynchronously. " +
        "It's recommended to set at least `128` when `HDFS` is enabled in `celeborn.storage.activeTypes`.")
      .intConf
      .createWithDefault(32)

  val WORKER_SHUFFLE_COMMIT_TIMEOUT: ConfigEntry[Long] =
    buildConf("celeborn.worker.commitFiles.timeout")
      .withAlternative("celeborn.worker.shuffle.commit.timeout")
      .categories("worker")
      .doc("Timeout for a Celeborn worker to commit files of a shuffle. " +
        "It's recommended to set at least `240s` when `HDFS` is enabled in `celeborn.storage.activeTypes`.")
      .version("0.3.0")
      .timeConf(TimeUnit.MILLISECONDS)
      .createWithDefaultString("120s")

  val PARTITION_SORTER_SORT_TIMEOUT: ConfigEntry[Long] =
    buildConf("celeborn.worker.sortPartition.timeout")
      .withAlternative("celeborn.worker.partitionSorter.sort.timeout")
      .categories("worker")
      .doc("Timeout for a shuffle file to sort.")
      .version("0.3.0")
      .timeConf(TimeUnit.MILLISECONDS)
      .createWithDefaultString("220s")

  val PARTITION_SORTER_THREADS: OptionalConfigEntry[Int] =
    buildConf("celeborn.worker.sortPartition.threads")
      .withAlternative("celeborn.worker.partitionSorter.threads")
      .categories("worker")
      .doc("PartitionSorter's thread counts. " +
        "It's recommended to set at least `64` when `HDFS` is enabled in `celeborn.storage.activeTypes`.")
      .version("0.3.0")
      .intConf
      .createOptional

  val WORKER_PARTITION_SORTER_PER_PARTITION_RESERVED_MEMORY: ConfigEntry[Long] =
    buildConf("celeborn.worker.sortPartition.reservedMemoryPerPartition")
      .withAlternative("celeborn.worker.partitionSorter.reservedMemoryPerPartition")
      .categories("worker")
      .doc("Reserved memory when sorting a shuffle file off-heap.")
      .version("0.3.0")
      .bytesConf(ByteUnit.BYTE)
      .checkValue(v => v < Int.MaxValue, "Reserved memory per partition must be less than 2GB.")
      .createWithDefaultString("1mb")

  val WORKER_FLUSHER_BUFFER_SIZE: ConfigEntry[Long] =
    buildConf("celeborn.worker.flusher.buffer.size")
      .categories("worker")
      .version("0.2.0")
      .doc("Size of buffer used by a single flusher.")
      .bytesConf(ByteUnit.BYTE)
      .createWithDefaultString("256k")

  val WORKER_HDFS_FLUSHER_BUFFER_SIZE: ConfigEntry[Long] =
    buildConf("celeborn.worker.flusher.hdfs.buffer.size")
      .categories("worker")
      .version("0.3.0")
      .doc("Size of buffer used by a HDFS flusher.")
      .bytesConf(ByteUnit.BYTE)
      .createWithDefaultString("4m")

  val WORKER_WRITER_CLOSE_TIMEOUT: ConfigEntry[Long] =
    buildConf("celeborn.worker.writer.close.timeout")
      .categories("worker")
      .doc("Timeout for a file writer to close")
      .version("0.2.0")
      .timeConf(TimeUnit.MILLISECONDS)
      .createWithDefaultString("120s")

  val WORKER_FLUSHER_THREADS: ConfigEntry[Int] =
    buildConf("celeborn.worker.flusher.threads")
      .categories("worker")
      .doc("Flusher's thread count per disk for unkown-type disks.")
      .version("0.2.0")
      .intConf
      .createWithDefault(16)

  val WORKER_FLUSHER_HDD_THREADS: ConfigEntry[Int] =
    buildConf("celeborn.worker.flusher.hdd.threads")
      .categories("worker")
      .doc("Flusher's thread count per disk used for write data to HDD disks.")
      .version("0.2.0")
      .intConf
      .createWithDefault(1)

  val WORKER_FLUSHER_SSD_THREADS: ConfigEntry[Int] =
    buildConf("celeborn.worker.flusher.ssd.threads")
      .categories("worker")
      .doc("Flusher's thread count per disk used for write data to SSD disks.")
      .version("0.2.0")
      .intConf
      .createWithDefault(16)

  val WORKER_FLUSHER_HDFS_THREADS: ConfigEntry[Int] =
    buildConf("celeborn.worker.flusher.hdfs.threads")
      .categories("worker")
      .doc("Flusher's thread count used for write data to HDFS.")
      .version("0.2.0")
      .intConf
      .createWithDefault(8)

  val WORKER_FLUSHER_SHUTDOWN_TIMEOUT: ConfigEntry[Long] =
    buildConf("celeborn.worker.flusher.shutdownTimeout")
      .categories("worker")
      .doc("Timeout for a flusher to shutdown.")
      .version("0.2.0")
      .timeConf(TimeUnit.MILLISECONDS)
      .createWithDefaultString("3s")

  val WORKER_GRACEFUL_SHUTDOWN_SAVE_COMMITTED_FILEINFO_INTERVAL: ConfigEntry[Long] =
    buildConf("celeborn.worker.graceful.shutdown.saveCommittedFileInfo.interval")
      .categories("worker")
      .doc("Interval for a Celeborn worker to flush committed file infos into Level DB.")
      .version("0.3.1")
      .timeConf(TimeUnit.MILLISECONDS)
      .createWithDefaultString("5s")

  val WORKER_GRACEFUL_SHUTDOWN_SAVE_COMMITTED_FILEINFO_SYNC: ConfigEntry[Boolean] =
    buildConf("celeborn.worker.graceful.shutdown.saveCommittedFileInfo.sync")
      .categories("worker")
      .doc(
        "Whether to call sync method to save committed file infos into Level DB to handle OS crash.")
      .version("0.3.1")
      .booleanConf
      .createWithDefault(false)

  val WORKER_DISKTIME_SLIDINGWINDOW_SIZE: ConfigEntry[Int] =
    buildConf("celeborn.worker.flusher.diskTime.slidingWindow.size")
      .withAlternative("celeborn.worker.flusher.avgFlushTime.slidingWindow.size")
      .categories("worker")
      .doc("The size of sliding windows used to calculate statistics about flushed time and count.")
      .version("0.3.0")
      .intConf
      .createWithDefault(20)

  val WORKER_DISKTIME_SLIDINGWINDOW_MINFLUSHCOUNT: ConfigEntry[Int] =
    buildConf("celeborn.worker.diskTime.slidingWindow.minFlushCount")
      .withAlternative("celeborn.worker.flusher.avgFlushTime.slidingWindow.minCount")
      .categories("worker")
      .doc("The minimum flush count to enter a sliding window" +
        " to calculate statistics about flushed time and count.")
      .version("0.3.0")
      .internal
      .intConf
      .createWithDefault(500)

  val WORKER_DISKTIME_SLIDINGWINDOW_MINFETCHCOUNT: ConfigEntry[Int] =
    buildConf("celeborn.worker.diskTime.slidingWindow.minFetchCount")
      .categories("worker")
      .doc("The minimum fetch count to enter a sliding window" +
        " to calculate statistics about flushed time and count.")
      .version("0.2.1")
      .internal
      .intConf
      .createWithDefault(100)

  val WORKER_DIRECT_MEMORY_CHECK_INTERVAL: ConfigEntry[Long] =
    buildConf("celeborn.worker.monitor.memory.check.interval")
      .withAlternative("celeborn.worker.memory.checkInterval")
      .categories("worker")
      .doc("Interval of worker direct memory checking.")
      .version("0.3.0")
      .timeConf(TimeUnit.MILLISECONDS)
      .createWithDefaultString("10ms")

  val WORKER_DIRECT_MEMORY_REPORT_INTERVAL: ConfigEntry[Long] =
    buildConf("celeborn.worker.monitor.memory.report.interval")
      .withAlternative("celeborn.worker.memory.reportInterval")
      .categories("worker")
      .doc("Interval of worker direct memory tracker reporting to log.")
      .version("0.3.0")
      .timeConf(TimeUnit.SECONDS)
      .createWithDefaultString("10s")

  val WORKER_DIRECT_MEMORY_TRIM_CHANNEL_WAIT_INTERVAL: ConfigEntry[Long] =
    buildConf("celeborn.worker.monitor.memory.trimChannelWaitInterval")
      .categories("worker")
      .doc("Wait time after worker trigger channel to trim cache.")
      .version("0.3.0")
      .timeConf(TimeUnit.MILLISECONDS)
      .createWithDefaultString("1s")

  val WORKER_DIRECT_MEMORY_TRIM_FLUSH_WAIT_INTERVAL: ConfigEntry[Long] =
    buildConf("celeborn.worker.monitor.memory.trimFlushWaitInterval")
      .categories("worker")
      .doc("Wait time after worker trigger StorageManger to flush data.")
      .version("0.3.0")
      .timeConf(TimeUnit.MILLISECONDS)
      .createWithDefaultString("1s")

  val WORKER_DISK_MONITOR_ENABLED: ConfigEntry[Boolean] =
    buildConf("celeborn.worker.monitor.disk.enabled")
      .categories("worker")
      .version("0.3.0")
      .doc("When true, worker will monitor device and report to master.")
      .booleanConf
      .createWithDefault(true)

  val WORKER_DEVICE_STATUS_CHECK_TIMEOUT: ConfigEntry[Long] =
    buildConf("celeborn.worker.monitor.disk.check.timeout")
      .withAlternative("celeborn.worker.disk.check.timeout")
      .categories("worker")
      .doc("Timeout time for worker check device status.")
      .version("0.3.0")
      .timeConf(TimeUnit.MILLISECONDS)
      .createWithDefaultString("30s")

  val WORKER_DISK_MONITOR_CHECKLIST: ConfigEntry[Seq[String]] =
    buildConf("celeborn.worker.monitor.disk.checklist")
      .categories("worker")
      .version("0.2.0")
      .doc("Monitor type for disk, available items are: " +
        "iohang, readwrite and diskusage.")
      .stringConf
      .transform(_.toLowerCase)
      .toSequence
      .createWithDefaultString("readwrite,diskusage")

  val WORKER_DISK_MONITOR_CHECK_INTERVAL: ConfigEntry[Long] =
    buildConf("celeborn.worker.monitor.disk.check.interval")
      .withAlternative("celeborn.worker.monitor.disk.checkInterval")
      .categories("worker")
      .version("0.3.0")
      .doc("Intervals between device monitor to check disk.")
      .timeConf(TimeUnit.MILLISECONDS)
      .createWithDefaultString("60s")

  val WORKER_DISK_MONITOR_SYS_BLOCK_DIR: ConfigEntry[String] =
    buildConf("celeborn.worker.monitor.disk.sys.block.dir")
      .categories("worker")
      .version("0.2.0")
      .doc("The directory where linux file block information is stored.")
      .stringConf
      .createWithDefault("/sys/block")

  val WORKER_DISK_MONITOR_NOTIFY_ERROR_THRESHOLD: ConfigEntry[Int] =
    buildConf("celeborn.worker.monitor.disk.notifyError.threshold")
      .categories("worker")
      .version("0.3.0")
      .doc("Device monitor will only notify critical error once the accumulated valid non-critical error number " +
        "exceeding this threshold.")
      .intConf
      .createWithDefault(64)

  val WORKER_DISK_MONITOR_NOTIFY_ERROR_EXPIRE_TIMEOUT: ConfigEntry[Long] =
    buildConf("celeborn.worker.monitor.disk.notifyError.expireTimeout")
      .categories("worker")
      .version("0.3.0")
      .doc("The expire timeout of non-critical device error. Only notify critical error when the number of non-critical " +
        "errors for a period of time exceeds threshold.")
      .timeConf(TimeUnit.MILLISECONDS)
      .createWithDefaultString("10m")

  val WORKER_WRITER_CREATE_MAX_ATTEMPTS: ConfigEntry[Int] =
    buildConf("celeborn.worker.writer.create.maxAttempts")
      .categories("worker")
      .version("0.2.0")
      .doc("Retry count for a file writer to create if its creation was failed.")
      .intConf
      .createWithDefault(3)

  val PARTITION_SORTER_DIRECT_MEMORY_RATIO_THRESHOLD: ConfigEntry[Double] =
    buildConf("celeborn.worker.partitionSorter.directMemoryRatioThreshold")
      .categories("worker")
      .doc("Max ratio of partition sorter's memory for sorting, when reserved memory is higher than max partition " +
        "sorter memory, partition sorter will stop sorting.")
      .version("0.2.0")
      .doubleConf
      .checkValue(v => v >= 0.0 && v <= 1.0, "Should be in [0.0, 1.0].")
      .createWithDefault(0.1)

  val WORKER_DIRECT_MEMORY_RATIO_FOR_READ_BUFFER: ConfigEntry[Double] =
    buildConf("celeborn.worker.directMemoryRatioForReadBuffer")
      .categories("worker")
      .doc("Max ratio of direct memory for read buffer")
      .version("0.2.0")
      .doubleConf
      .createWithDefault(0.1)

  val WORKER_DIRECT_MEMORY_RATIO_FOR_SHUFFLE_STORAGE: ConfigEntry[Double] =
    buildConf("celeborn.worker.directMemoryRatioForMemoryShuffleStorage")
      .categories("worker")
      .doc("Max ratio of direct memory to store shuffle data")
      .version("0.2.0")
      .doubleConf
      .createWithDefault(0.0)

  val WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE: ConfigEntry[Double] =
    buildConf("celeborn.worker.directMemoryRatioToPauseReceive")
      .categories("worker")
      .doc("If direct memory usage reaches this limit, the worker will stop to receive data from Celeborn shuffle clients.")
      .version("0.2.0")
      .doubleConf
      .createWithDefault(0.85)

  val WORKER_DIRECT_MEMORY_RATIO_PAUSE_REPLICATE: ConfigEntry[Double] =
    buildConf("celeborn.worker.directMemoryRatioToPauseReplicate")
      .categories("worker")
      .doc("If direct memory usage reaches this limit, the worker will stop to receive replication data from other workers.")
      .version("0.2.0")
      .doubleConf
      .createWithDefault(0.95)

  val WORKER_DIRECT_MEMORY_RATIO_RESUME: ConfigEntry[Double] =
    buildConf("celeborn.worker.directMemoryRatioToResume")
      .categories("worker")
      .doc("If direct memory usage is less than this limit, worker will resume.")
      .version("0.2.0")
      .doubleConf
      .createWithDefault(0.5)

  val WORKER_CONGESTION_CONTROL_ENABLED: ConfigEntry[Boolean] =
    buildConf("celeborn.worker.congestionControl.enabled")
      .categories("worker")
      .doc("Whether to enable congestion control or not.")
      .version("0.3.0")
      .booleanConf
      .createWithDefault(false)

  val WORKER_CONGESTION_CONTROL_SAMPLE_TIME_WINDOW: ConfigEntry[Long] =
    buildConf("celeborn.worker.congestionControl.sample.time.window")
      .categories("worker")
      .doc("The worker holds a time sliding list to calculate users' produce/consume rate")
      .version("0.3.0")
      .timeConf(TimeUnit.SECONDS)
      .createWithDefaultString("10s")

  val WORKER_CONGESTION_CONTROL_LOW_WATERMARK: OptionalConfigEntry[Long] =
    buildConf("celeborn.worker.congestionControl.low.watermark")
      .categories("worker")
      .doc("Will stop congest users if the total pending bytes of disk buffer is lower than " +
        "this configuration")
      .version("0.3.0")
      .bytesConf(ByteUnit.BYTE)
      .createOptional

  val WORKER_CONGESTION_CONTROL_HIGH_WATERMARK: OptionalConfigEntry[Long] =
    buildConf("celeborn.worker.congestionControl.high.watermark")
      .categories("worker")
      .doc("If the total bytes in disk buffer exceeds this configure, will start to congest" +
        "users whose produce rate is higher than the potential average consume rate. " +
        "The congestion will stop if the produce rate is lower or equal to the " +
        "average consume rate, or the total pending bytes lower than " +
        s"${WORKER_CONGESTION_CONTROL_LOW_WATERMARK.key}")
      .version("0.3.0")
      .bytesConf(ByteUnit.BYTE)
      .createOptional

  val WORKER_CONGESTION_CONTROL_USER_INACTIVE_INTERVAL: ConfigEntry[Long] =
    buildConf("celeborn.worker.congestionControl.user.inactive.interval")
      .categories("worker")
      .doc("How long will consider this user is inactive if it doesn't send data")
      .version("0.3.0")
      .timeConf(TimeUnit.MILLISECONDS)
      .createWithDefaultString("10min")

  val WORKER_DECOMMISSION_CHECK_INTERVAL: ConfigEntry[Long] =
    buildConf("celeborn.worker.decommission.checkInterval")
      .categories("worker")
      .doc(
        "The wait interval of checking whether all the shuffle expired during worker decomission")
      .version("0.4.0")
      .timeConf(TimeUnit.MILLISECONDS)
      .createWithDefaultString("30s")

  val WORKER_DECOMMISSION_FORCE_EXIT_TIMEOUT: ConfigEntry[Long] =
    buildConf("celeborn.worker.decommission.forceExitTimeout")
      .categories("worker")
      .doc("The wait time of waiting for all the shuffle expire during worker decommission.")
      .version("0.4.0")
      .timeConf(TimeUnit.MILLISECONDS)
      .createWithDefaultString("6h")

  val WORKER_GRACEFUL_SHUTDOWN_ENABLED: ConfigEntry[Boolean] =
    buildConf("celeborn.worker.graceful.shutdown.enabled")
      .categories("worker")
      .doc("When true, during worker shutdown, the worker will wait for all released slots " +
        s"to be committed or destroyed.")
      .version("0.2.0")
      .booleanConf
      .createWithDefault(false)

  val WORKER_GRACEFUL_SHUTDOWN_TIMEOUT: ConfigEntry[Long] =
    buildConf("celeborn.worker.graceful.shutdown.timeout")
      .categories("worker")
      .doc("The worker's graceful shutdown timeout time.")
      .version("0.2.0")
      .timeConf(TimeUnit.MILLISECONDS)
      .createWithDefaultString("600s")

  val WORKER_CHECK_SLOTS_FINISHED_INTERVAL: ConfigEntry[Long] =
    buildConf("celeborn.worker.graceful.shutdown.checkSlotsFinished.interval")
      .categories("worker")
      .doc("The wait interval of checking whether all released slots " +
        "to be committed or destroyed during worker graceful shutdown")
      .version("0.2.0")
      .timeConf(TimeUnit.MILLISECONDS)
      .createWithDefaultString("1s")

  val WORKER_CHECK_SLOTS_FINISHED_TIMEOUT: ConfigEntry[Long] =
    buildConf("celeborn.worker.graceful.shutdown.checkSlotsFinished.timeout")
      .categories("worker")
      .doc("The wait time of waiting for the released slots" +
        " to be committed or destroyed during worker graceful shutdown.")
      .version("0.2.0")
      .timeConf(TimeUnit.MILLISECONDS)
      .createWithDefaultString("480s")

  val WORKER_GRACEFUL_SHUTDOWN_RECOVER_PATH: ConfigEntry[String] =
    buildConf("celeborn.worker.graceful.shutdown.recoverPath")
      .categories("worker")
      .doc("The path to store levelDB.")
      .version("0.2.0")
      .stringConf
      .transform(_.replace("<tmp>", System.getProperty("java.io.tmpdir")))
      .createWithDefault(s"<tmp>/recover")

  val WORKER_PARTITION_SORTER_SHUTDOWN_TIMEOUT: ConfigEntry[Long] =
    buildConf("celeborn.worker.graceful.shutdown.partitionSorter.shutdownTimeout")
      .categories("worker")
      .doc("The wait time of waiting for sorting partition files" +
        " during worker graceful shutdown.")
      .version("0.2.0")
      .timeConf(TimeUnit.MILLISECONDS)
      .createWithDefaultString("120s")

  val WORKER_PARTITION_READ_BUFFERS_MIN: ConfigEntry[Int] =
    buildConf("celeborn.worker.partition.initial.readBuffersMin")
      .categories("worker")
      .version("0.3.0")
      .doc("Min number of initial read buffers")
      .intConf
      .createWithDefault(1)

  val WORKER_PARTITION_READ_BUFFERS_MAX: ConfigEntry[Int] =
    buildConf("celeborn.worker.partition.initial.readBuffersMax")
      .categories("worker")
      .version("0.3.0")
      .doc("Max number of initial read buffers")
      .intConf
      .createWithDefault(1024)

  val WORKER_BUFFERSTREAM_THREADS_PER_MOUNTPOINT: ConfigEntry[Int] =
    buildConf("celeborn.worker.bufferStream.threadsPerMountpoint")
      .categories("worker")
      .version("0.3.0")
      .doc("Threads count for read buffer per mount point.")
      .intConf
      .createWithDefault(8)

  val WORKER_READBUFFER_ALLOCATIONWAIT: ConfigEntry[Long] =
    buildConf("celeborn.worker.readBuffer.allocationWait")
      .categories("worker")
      .version("0.3.0")
      .doc("The time to wait when buffer dispatcher can not allocate a buffer.")
      .timeConf(TimeUnit.MILLISECONDS)
      .createWithDefaultString("50ms")

  val WORKER_READBUFFER_TARGET_RATIO: ConfigEntry[Double] =
    buildConf("celeborn.worker.readBuffer.target.ratio")
      .categories("worker")
      .version("0.3.0")
      .doc("The target ratio for read ahead buffer's memory usage.")
      .doubleConf
      .createWithDefault(0.9)

  val WORKER_READBUFFER_TARGET_UPDATE_INTERVAL: ConfigEntry[Long] =
    buildConf("celeborn.worker.readBuffer.target.updateInterval")
      .categories("worker")
      .version("0.3.0")
      .doc("The interval for memory manager to calculate new read buffer's target memory.")
      .timeConf(TimeUnit.MILLISECONDS)
      .createWithDefaultString("100ms")

  val WORKER_READBUFFER_TARGET_NOTIFY_THRESHOLD: ConfigEntry[Long] =
    buildConf("celeborn.worker.readBuffer.target.changeThreshold")
      .categories("worker")
      .version("0.3.0")
      .doc("The target ratio for pre read memory usage.")
      .bytesConf(ByteUnit.BYTE)
      .createWithDefaultString("1mb")

  val WORKER_READBUFFERS_TOTRIGGERREAD_MIN: ConfigEntry[Int] =
    buildConf("celeborn.worker.readBuffer.toTriggerReadMin")
      .categories("worker")
      .version("0.3.0")
      .doc("Min buffers count for map data partition to trigger read.")
      .intConf
      .createWithDefault(32)

  val WORKER_PUSH_HEARTBEAT_ENABLED: ConfigEntry[Boolean] =
    buildConf("celeborn.worker.push.heartbeat.enabled")
      .categories("worker")
      .version("0.3.0")
      .doc("enable the heartbeat from worker to client when pushing data")
      .booleanConf
      .createWithDefault(false)

  val WORKER_PUSH_COMPOSITEBUFFER_MAXCOMPONENTS: ConfigEntry[Int] =
    buildConf("celeborn.worker.push.compositeBuffer.maxComponents")
      .internal
      .categories("worker")
      .version("0.3.0")
      .doc("Max components of Netty `CompositeByteBuf` in `FileWriter`'s `flushBuffer`. " +
        "When this value is too big, i.e. 256, there will be many memory fragments in Netty's memory pool, " +
        "and total direct memory can be significantly larger than the disk buffer. " +
        "When set to 1, Netty's direct memory is close to disk buffer, but performance " +
        "might decrease due to frequent memory copy during compaction.")
      .intConf
      .createWithDefault(128)

  val WORKER_FETCH_HEARTBEAT_ENABLED: ConfigEntry[Boolean] =
    buildConf("celeborn.worker.fetch.heartbeat.enabled")
      .categories("worker")
      .version("0.3.0")
      .doc("enable the heartbeat from worker to client when fetching data")
      .booleanConf
      .createWithDefault(false)

  val APPLICATION_HEARTBEAT_INTERVAL: ConfigEntry[Long] =
    buildConf("celeborn.client.application.heartbeatInterval")
      .withAlternative("celeborn.application.heartbeatInterval")
      .categories("client")
      .version("0.3.0")
      .doc("Interval for client to send heartbeat message to master.")
      .timeConf(TimeUnit.MILLISECONDS)
      .createWithDefaultString("10s")

  val CLIENT_EXCLUDE_PEER_WORKER_ON_FAILURE_ENABLED: ConfigEntry[Boolean] =
    buildConf("celeborn.client.excludePeerWorkerOnFailure.enabled")
      .categories("client")
      .version("0.3.0")
      .doc("When true, Celeborn will exclude partition's peer worker on failure " +
        "when push data to replica failed.")
      .booleanConf
      .createWithDefault(true)

  val CLIENT_EXCLUDED_WORKER_EXPIRE_TIMEOUT: ConfigEntry[Long] =
    buildConf("celeborn.client.excludedWorker.expireTimeout")
      .withAlternative("celeborn.worker.excluded.expireTimeout")
      .categories("client")
      .version("0.3.0")
      .doc("Timeout time for LifecycleManager to clear reserved excluded worker. Default to be 1.5 * `celeborn.master.heartbeat.worker.timeout`" +
        "to cover worker heartbeat timeout check period")
      .timeConf(TimeUnit.MILLISECONDS)
      .createWithDefaultString("180s")

  val CLIENT_CHECKED_USE_ALLOCATED_WORKERS: ConfigEntry[Boolean] =
    buildConf("celeborn.client.checked.useAllocatedWorkers")
      .internal
      .categories("client")
      .version("0.3.0")
      .doc("When true, Celeborn will use local allocated workers as candidate being checked workers(check the workers" +
        "whether unKnown in master), this may be more useful for map partition to regenerate the lost data), " +
        "otherwise use local black list as candidate being checked workers.")
      .booleanConf
      .createWithDefault(false)

  val TEST_CLIENT_RETRY_COMMIT_FILE: ConfigEntry[Boolean] =
    buildConf("celeborn.test.client.retryCommitFiles")
      .withAlternative("celeborn.test.retryCommitFiles")
      .internal
      .categories("test", "client")
      .doc("Fail commitFile request for test")
      .version("0.3.0")
      .booleanConf
      .createWithDefault(false)

  val CLIENT_PUSH_REPLICATE_ENABLED: ConfigEntry[Boolean] =
    buildConf("celeborn.client.push.replicate.enabled")
      .withAlternative("celeborn.push.replicate.enabled")
      .categories("client")
      .doc("When true, Celeborn worker will replicate shuffle data to another Celeborn worker " +
        "asynchronously to ensure the pushed shuffle data won't be lost after the node failure. " +
        "It's recommended to set `false` when `HDFS` is enabled in `celeborn.storage.activeTypes`.")
      .version("0.3.0")
      .booleanConf
      .createWithDefault(false)

  val CLIENT_PUSH_BUFFER_INITIAL_SIZE: ConfigEntry[Long] =
    buildConf("celeborn.client.push.buffer.initial.size")
      .withAlternative("celeborn.push.buffer.initial.size")
      .categories("client")
      .version("0.3.0")
      .bytesConf(ByteUnit.BYTE)
      .createWithDefaultString("8k")

  val CLIENT_PUSH_BUFFER_MAX_SIZE: ConfigEntry[Long] =
    buildConf("celeborn.client.push.buffer.max.size")
      .withAlternative("celeborn.push.buffer.max.size")
      .categories("client")
      .version("0.3.0")
      .doc("Max size of reducer partition buffer memory for shuffle hash writer. The pushed " +
        "data will be buffered in memory before sending to Celeborn worker. For performance " +
        "consideration keep this buffer size higher than 32K. Example: If reducer amount is " +
        "2000, buffer size is 64K, then each task will consume up to `64KiB * 2000 = 125MiB` " +
        "heap memory.")
      .bytesConf(ByteUnit.BYTE)
      .createWithDefaultString("64k")

  val CLIENT_PUSH_QUEUE_CAPACITY: ConfigEntry[Int] =
    buildConf("celeborn.client.push.queue.capacity")
      .withAlternative("celeborn.push.queue.capacity")
      .categories("client")
      .version("0.3.0")
      .doc("Push buffer queue size for a task. The maximum memory is " +
        "`celeborn.client.push.buffer.max.size` * `celeborn.client.push.queue.capacity`, " +
        "default: 64KiB * 512 = 32MiB")
      .intConf
      .createWithDefault(512)

  val CLIENT_PUSH_MAX_REQS_IN_FLIGHT_TOTAL: ConfigEntry[Int] =
    buildConf("celeborn.client.push.maxReqsInFlight.total")
      .withAlternative("celeborn.push.maxReqsInFlight")
      .categories("client")
      .version("0.3.0")
      .doc("Amount of total Netty in-flight requests. The maximum memory is " +
        "`celeborn.client.push.maxReqsInFlight.total` * `celeborn.client.push.buffer.max.size` " +
        "* compression ratio(1 in worst case): 64KiB * 256 = 16MiB")
      .intConf
      .createWithDefault(256)

  val CLIENT_PUSH_MAX_REQS_IN_FLIGHT_PERWORKER: ConfigEntry[Int] =
    buildConf("celeborn.client.push.maxReqsInFlight.perWorker")
      .categories("client")
      .version("0.3.0")
      .doc(
        "Amount of Netty in-flight requests per worker. Default max memory of in flight requests " +
          " per worker is `celeborn.client.push.maxReqsInFlight.perWorker` * `celeborn.client.push.buffer.max.size` " +
          "* compression ratio(1 in worst case): 64KiB * 32 = 2MiB. The maximum memory will " +
          "not exceed `celeborn.client.push.maxReqsInFlight.total`.")
      .intConf
      .createWithDefault(32)

  val CLIENT_PUSH_MAX_REVIVE_TIMES: ConfigEntry[Int] =
    buildConf("celeborn.client.push.revive.maxRetries")
      .categories("client")
      .version("0.3.0")
      .doc("Max retry times for reviving when celeborn push data failed.")
      .intConf
      .createWithDefault(5)

  val CLIENT_PUSH_REVIVE_INTERVAL: ConfigEntry[Long] =
    buildConf("celeborn.client.push.revive.interval")
      .categories("client")
      .version("0.3.0")
      .doc("Interval for client to trigger Revive to LifecycleManager. The number of partitions in one Revive " +
        "request is `celeborn.client.push.revive.batchSize`.")
      .timeConf(TimeUnit.MILLISECONDS)
      .createWithDefaultString("100ms")

  val CLIENT_PUSH_REVIVE_BATCHSIZE: ConfigEntry[Int] =
    buildConf("celeborn.client.push.revive.batchSize")
      .categories("client")
      .version("0.3.0")
      .doc("Max number of partitions in one Revive request.")
      .intConf
      .createWithDefault(2048)

  val CLIENT_PUSH_EXCLUDE_WORKER_ON_FAILURE_ENABLED: ConfigEntry[Boolean] =
    buildConf("celeborn.client.push.excludeWorkerOnFailure.enabled")
      .categories("client")
      .doc("Whether to enable shuffle client-side push exclude workers on failures.")
      .version("0.3.0")
      .booleanConf
      .createWithDefault(false)

  val CLIENT_PUSH_LIMIT_STRATEGY: ConfigEntry[String] =
    buildConf("celeborn.client.push.limit.strategy")
      .categories("client")
      .doc("The strategy used to control the push speed. " +
        "Valid strategies are SIMPLE and SLOWSTART. The SLOWSTART strategy usually works with " +
        "congestion control mechanism on the worker side.")
      .version("0.3.0")
      .stringConf
      .transform(_.toUpperCase(Locale.ROOT))
      .checkValues(Set("SIMPLE", "SLOWSTART"))
      .createWithDefaultString("SIMPLE")

  val CLIENT_PUSH_SLOW_START_INITIAL_SLEEP_TIME: ConfigEntry[Long] =
    buildConf("celeborn.client.push.slowStart.initialSleepTime")
      .categories("client")
      .version("0.3.0")
      .doc(s"The initial sleep time if the current max in flight requests is 0")
      .timeConf(TimeUnit.MILLISECONDS)
      .createWithDefaultString("500ms")

  val CLIENT_PUSH_SLOW_START_MAX_SLEEP_TIME: ConfigEntry[Long] =
    buildConf("celeborn.client.push.slowStart.maxSleepTime")
      .categories("client")
      .version("0.3.0")
      .doc(s"If ${CLIENT_PUSH_LIMIT_STRATEGY.key} is set to SLOWSTART, push side will " +
        "take a sleep strategy for each batch of requests, this controls " +
        "the max sleep time if the max in flight requests limit is 1 for a long time")
      .timeConf(TimeUnit.MILLISECONDS)
      .createWithDefaultString("2s")

  val CLIENT_PUSH_DATA_TIMEOUT: ConfigEntry[Long] =
    buildConf("celeborn.client.push.timeout")
      .withAlternative("celeborn.push.data.timeout")
      .categories("client")
      .version("0.3.0")
      .doc(s"Timeout for a task to push data rpc message. This value should better be more than twice of `${PUSH_TIMEOUT_CHECK_INTERVAL.key}`")
      .timeConf(TimeUnit.MILLISECONDS)
      .checkValue(_ > 0, "Value must be positive!")
      .createWithDefaultString("120s")

  val TEST_CLIENT_PUSH_PRIMARY_DATA_TIMEOUT: ConfigEntry[Boolean] =
    buildConf("celeborn.test.worker.pushPrimaryDataTimeout")
      .withAlternative("celeborn.test.pushMasterDataTimeout")
      .internal
      .categories("test", "worker")
      .version("0.3.0")
      .doc("Whether to test push primary data timeout")
      .booleanConf
      .createWithDefault(false)

  val TEST_WORKER_PUSH_REPLICA_DATA_TIMEOUT: ConfigEntry[Boolean] =
    buildConf("celeborn.test.worker.pushReplicaDataTimeout")
      .internal
      .categories("test", "worker")
      .version("0.3.0")
      .doc("Whether to test push replica data timeout")
      .booleanConf
      .createWithDefault(false)

  val CLIENT_PUSH_LIMIT_IN_FLIGHT_TIMEOUT: OptionalConfigEntry[Long] =
    buildConf("celeborn.client.push.limit.inFlight.timeout")
      .withAlternative("celeborn.push.limit.inFlight.timeout")
      .categories("client")
      .doc("Timeout for netty in-flight requests to be done." +
        s"Default value should be `${CLIENT_PUSH_DATA_TIMEOUT.key} * 2`.")
      .version("0.3.0")
      .timeConf(TimeUnit.MILLISECONDS)
      .createOptional

  val CLIENT_PUSH_LIMIT_IN_FLIGHT_SLEEP_INTERVAL: ConfigEntry[Long] =
    buildConf("celeborn.client.push.limit.inFlight.sleepInterval")
      .withAlternative("celeborn.push.limit.inFlight.sleepInterval")
      .categories("client")
      .doc("Sleep interval when check netty in-flight requests to be done.")
      .version("0.3.0")
      .timeConf(TimeUnit.MILLISECONDS)
      .createWithDefaultString("50ms")

  val CLIENT_PUSH_SORT_RANDOMIZE_PARTITION_ENABLED: ConfigEntry[Boolean] =
    buildConf("celeborn.client.push.sort.randomizePartitionId.enabled")
      .withAlternative("celeborn.push.sort.randomizePartitionId.enabled")
      .categories("client")
      .doc(
        "Whether to randomize partitionId in push sorter. If true, partitionId will be randomized " +
          "when sort data to avoid skew when push to worker")
      .version("0.3.0")
      .booleanConf
      .createWithDefault(false)

  val CLIENT_PUSH_RETRY_THREADS: ConfigEntry[Int] =
    buildConf("celeborn.client.push.retry.threads")
      .withAlternative("celeborn.push.retry.threads")
      .categories("client")
      .doc("Thread number to process shuffle re-send push data requests.")
      .version("0.3.0")
      .intConf
      .createWithDefault(8)

  val CLIENT_PUSH_SPLIT_PARTITION_THREADS: ConfigEntry[Int] =
    buildConf("celeborn.client.push.splitPartition.threads")
      .withAlternative("celeborn.push.splitPartition.threads")
      .categories("client")
      .doc("Thread number to process shuffle split request in shuffle client.")
      .version("0.3.0")
      .intConf
      .createWithDefault(8)

  val CLIENT_PUSH_TAKE_TASK_WAIT_INTERVAL: ConfigEntry[Long] =
    buildConf("celeborn.client.push.takeTaskWaitInterval")
      .categories("client")
      .doc("Wait interval if no task available to push to worker.")
      .version("0.3.0")
      .timeConf(TimeUnit.MILLISECONDS)
      .createWithDefaultString("50ms")

  val CLIENT_PUSH_TAKE_TASK_MAX_WAIT_ATTEMPTS: ConfigEntry[Int] =
    buildConf("celeborn.client.push.takeTaskMaxWaitAttempts")
      .categories("client")
      .doc("Max wait times if no task available to push to worker.")
      .version("0.3.0")
      .intConf
      .createWithDefault(1)

  val CLIENT_PUSH_SENDBUFFERPOOL_EXPIRETIMEOUT: ConfigEntry[Long] =
    buildConf("celeborn.client.push.sendBufferPool.expireTimeout")
      .categories("client")
      .doc("Timeout before clean up SendBufferPool. If SendBufferPool is idle for more than this time, " +
        "the send buffers and push tasks will be cleaned up.")
      .version("0.3.1")
      .timeConf(TimeUnit.MILLISECONDS)
      .createWithDefaultString("60s")

  val CLIENT_PUSH_SENDBUFFERPOOL_CHECKEXPIREINTERVAL: ConfigEntry[Long] =
    buildConf("celeborn.client.push.sendBufferPool.checkExpireInterval")
      .categories("client")
      .doc("Interval to check expire for send buffer pool. If the pool has been idle " +
        s"for more than `${CLIENT_PUSH_SENDBUFFERPOOL_EXPIRETIMEOUT.key}`, the pooled send buffers and push tasks will be cleaned up.")
      .version("0.3.1")
      .timeConf(TimeUnit.MILLISECONDS)
      .createWithDefaultString("30s")

  val TEST_CLIENT_RETRY_REVIVE: ConfigEntry[Boolean] =
    buildConf("celeborn.test.client.retryRevive")
      .withAlternative("celeborn.test.retryRevive")
      .internal
      .categories("test", "client")
      .doc("Fail push data and request for test")
      .version("0.3.0")
      .booleanConf
      .createWithDefault(false)

  val CLIENT_FETCH_TIMEOUT: ConfigEntry[Long] =
    buildConf("celeborn.client.fetch.timeout")
      .withAlternative("celeborn.fetch.timeout")
      .categories("client")
      .version("0.3.0")
      .doc("Timeout for a task to open stream and fetch chunk.")
      .timeConf(TimeUnit.MILLISECONDS)
      .createWithDefaultString("600s")

  val CLIENT_FETCH_MAX_REQS_IN_FLIGHT: ConfigEntry[Int] =
    buildConf("celeborn.client.fetch.maxReqsInFlight")
      .withAlternative("celeborn.fetch.maxReqsInFlight")
      .categories("client")
      .version("0.3.0")
      .doc("Amount of in-flight chunk fetch request.")
      .intConf
      .createWithDefault(3)

  val CLIENT_FETCH_MAX_RETRIES_FOR_EACH_REPLICA: ConfigEntry[Int] =
    buildConf("celeborn.client.fetch.maxRetriesForEachReplica")
      .withAlternative("celeborn.fetch.maxRetriesForEachReplica")
      .withAlternative("celeborn.fetch.maxRetries")
      .categories("client")
      .version("0.3.0")
      .doc("Max retry times of fetch chunk on each replica")
      .intConf
      .createWithDefault(3)

  val CLIENT_FETCH_EXCLUDE_WORKER_ON_FAILURE_ENABLED: ConfigEntry[Boolean] =
    buildConf("celeborn.client.fetch.excludeWorkerOnFailure.enabled")
      .categories("client")
      .doc("Whether to enable shuffle client-side fetch exclude workers on failure.")
      .version("0.3.0")
      .booleanConf
      .createWithDefault(false)

  val CLIENT_FETCH_EXCLUDED_WORKER_EXPIRE_TIMEOUT: ConfigEntry[Long] =
    buildConf("celeborn.client.fetch.excludedWorker.expireTimeout")
      .categories("client")
      .doc("ShuffleClient is a static object, it will be used in the whole lifecycle of Executor," +
        "We give a expire time for excluded workers to avoid a transient worker issues.")
      .version("0.3.0")
      .fallbackConf(CLIENT_EXCLUDED_WORKER_EXPIRE_TIMEOUT)

  val TEST_CLIENT_FETCH_FAILURE: ConfigEntry[Boolean] =
    buildConf("celeborn.test.client.fetchFailure")
      .withAlternative("celeborn.test.fetchFailure")
      .internal
      .categories("test", "client")
      .version("0.3.0")
      .doc("Whether to test fetch chunk failure")
      .booleanConf
      .createWithDefault(false)

  val SHUFFLE_RANGE_READ_FILTER_ENABLED: ConfigEntry[Boolean] =
    buildConf("celeborn.client.shuffle.rangeReadFilter.enabled")
      .withAlternative("celeborn.shuffle.rangeReadFilter.enabled")
      .categories("client")
      .version("0.2.0")
      .doc("If a spark application have skewed partition, this value can set to true to improve performance.")
      .booleanConf
      .createWithDefault(false)

  val SHUFFLE_PARTITION_TYPE: ConfigEntry[String] =
    buildConf("celeborn.client.shuffle.partition.type")
      .withAlternative("celeborn.shuffle.partition.type")
      .categories("client")
      .doc("Type of shuffle's partition.")
      .version("0.3.0")
      .stringConf
      .transform(_.toUpperCase(Locale.ROOT))
      .checkValues(Set(
        PartitionType.REDUCE.name,
        PartitionType.MAP.name,
        PartitionType.MAPGROUP.name))
      .createWithDefault(PartitionType.REDUCE.name)

  val SHUFFLE_PARTITION_SPLIT_THRESHOLD: ConfigEntry[Long] =
    buildConf("celeborn.client.shuffle.partitionSplit.threshold")
      .withAlternative("celeborn.shuffle.partitionSplit.threshold")
      .categories("client")
      .doc("Shuffle file size threshold, if file size exceeds this, trigger split.")
      .version("0.3.0")
      .bytesConf(ByteUnit.BYTE)
      .createWithDefaultString("1G")

  val SHUFFLE_PARTITION_SPLIT_MODE: ConfigEntry[String] =
    buildConf("celeborn.client.shuffle.partitionSplit.mode")
      .withAlternative("celeborn.shuffle.partitionSplit.mode")
      .categories("client")
      .doc("soft: the shuffle file size might be larger than split threshold. " +
        "hard: the shuffle file size will be limited to split threshold.")
      .version("0.3.0")
      .stringConf
      .transform(_.toUpperCase(Locale.ROOT))
      .checkValues(Set(PartitionSplitMode.SOFT.name, PartitionSplitMode.HARD.name))
      .createWithDefault(PartitionSplitMode.SOFT.name)

  val SHUFFLE_COMPRESSION_CODEC: ConfigEntry[String] =
    buildConf("celeborn.client.shuffle.compression.codec")
      .withAlternative("celeborn.shuffle.compression.codec")
      .withAlternative("remote-shuffle.job.compression.codec")
      .categories("client")
      .doc("The codec used to compress shuffle data. By default, Celeborn provides three codecs: `lz4`, `zstd`, `none`.")
      .version("0.3.0")
      .stringConf
      .transform(_.toUpperCase(Locale.ROOT))
      .checkValues(Set(
        CompressionCodec.LZ4.name,
        CompressionCodec.ZSTD.name,
        CompressionCodec.NONE.name))
      .createWithDefault(CompressionCodec.LZ4.name)

  val SHUFFLE_COMPRESSION_ZSTD_LEVEL: ConfigEntry[Int] =
    buildConf("celeborn.client.shuffle.compression.zstd.level")
      .withAlternative("celeborn.shuffle.compression.zstd.level")
      .categories("client")
      .doc("Compression level for Zstd compression codec, its value should be an integer " +
        "between -5 and 22. Increasing the compression level will result in better compression " +
        "at the expense of more CPU and memory.")
      .version("0.3.0")
      .intConf
      .checkValue(
        value => value >= -5 && value <= 22,
        s"Compression level for Zstd compression codec should be an integer between -5 and 22.")
      .createWithDefault(1)

  val SHUFFLE_EXPIRED_CHECK_INTERVAL: ConfigEntry[Long] =
    buildConf("celeborn.client.shuffle.expired.checkInterval")
      .withAlternative("celeborn.shuffle.expired.checkInterval")
      .categories("client")
      .version("0.3.0")
      .doc("Interval for client to check expired shuffles.")
      .timeConf(TimeUnit.MILLISECONDS)
      .createWithDefaultString("60s")

  val CLIENT_SHUFFLE_MANAGER_PORT: ConfigEntry[Int] =
    buildConf("celeborn.client.shuffle.manager.port")
      .withAlternative("celeborn.shuffle.manager.port")
      .categories("client")
      .version("0.3.0")
      .doc("Port used by the LifecycleManager on the Driver.")
      .intConf
      .checkValue(
        (port: Int) => {
          if (port != 0) {
            logWarning(
              "The user specifies the port used by the LifecycleManager on the Driver, and its" +
                s" values is $port, which may cause port conflicts and startup failure.")
          }
          true
        },