private RaftProperties newRaftProperties()

in master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java [284:408]


  private RaftProperties newRaftProperties(CelebornConf conf, RpcType rpc) {
    final RaftProperties properties = new RaftProperties();
    // Set RPC type
    RaftConfigKeys.Rpc.setType(properties, rpc);

    // Set the ratis port number
    if (rpc == SupportedRpcType.GRPC) {
      GrpcConfigKeys.Server.setPort(properties, ratisAddr.getPort());
    } else if (rpc == SupportedRpcType.NETTY) {
      NettyConfigKeys.Server.setPort(properties, ratisAddr.getPort());
    }

    // Set Ratis storage directory
    String storageDir = conf.haMasterRatisStorageDir();
    RaftServerConfigKeys.setStorageDir(properties, Collections.singletonList(new File(storageDir)));

    // Set RAFT segment size
    long raftSegmentSize = conf.haMasterRatisLogSegmentSizeMax();
    RaftServerConfigKeys.Log.setSegmentSizeMax(properties, SizeInBytes.valueOf(raftSegmentSize));
    RaftServerConfigKeys.Log.setPurgeUptoSnapshotIndex(properties, true);

    // Set RAFT segment pre-allocated size
    long raftSegmentPreallocatedSize = conf.haMasterRatisLogPreallocatedSize();
    long raftSegmentWriteBufferSize = conf.haMasterRatisLogWriteBufferSize();
    int logAppenderQueueNumElements = conf.haMasterRatisLogAppenderQueueNumElements();
    long logAppenderQueueByteLimit = conf.haMasterRatisLogAppenderQueueBytesLimit();
    // RATIS-589. Eliminate buffer copying in SegmentedRaftLogOutputStream.
    // 4 bytes (serialized size) + logAppenderQueueByteLimit + 4 bytes (checksum)
    if (raftSegmentWriteBufferSize < logAppenderQueueByteLimit + 8) {
      throw new IllegalArgumentException(
          CelebornConf.HA_MASTER_RATIS_LOG_WRITE_BUFFER_SIZE().key()
              + " (= "
              + raftSegmentWriteBufferSize
              + ") is less than "
              + CelebornConf.HA_MASTER_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT().key()
              + " + 8 (= "
              + (logAppenderQueueByteLimit + 8)
              + ")");
    }
    boolean shouldInstallSnapshot = conf.haMasterRatisLogInstallSnapshotEnabled();
    RaftServerConfigKeys.Log.Appender.setBufferElementLimit(
        properties, logAppenderQueueNumElements);
    RaftServerConfigKeys.Log.Appender.setBufferByteLimit(
        properties, SizeInBytes.valueOf(logAppenderQueueByteLimit));
    RaftServerConfigKeys.Log.setPreallocatedSize(
        properties, SizeInBytes.valueOf(raftSegmentPreallocatedSize));
    RaftServerConfigKeys.Log.setWriteBufferSize(
        properties, SizeInBytes.valueOf(raftSegmentWriteBufferSize));
    RaftServerConfigKeys.Log.Appender.setInstallSnapshotEnabled(properties, shouldInstallSnapshot);
    int logPurgeGap = conf.haMasterRatisLogPurgeGap();
    RaftServerConfigKeys.Log.setPurgeGap(properties, logPurgeGap);

    // For grpc set the maximum message size
    // RATIS-2135. The leader keeps sending inconsistent entries repeatedly to followers.
    // raft.grpc.message.size.max must be 1m larger than raft.server.log.appender.buffer.byte-limit
    GrpcConfigKeys.setMessageSizeMax(
        properties, SizeInBytes.valueOf(logAppenderQueueByteLimit + SizeInBytes.ONE_MB.getSize()));

    // Set the server request timeout
    TimeDuration serverRequestTimeout =
        TimeDuration.valueOf(conf.haMasterRatisRpcRequestTimeout(), TimeUnit.SECONDS);
    RaftServerConfigKeys.Rpc.setRequestTimeout(properties, serverRequestTimeout);

    // Set timeout for server retry cache entry
    TimeDuration retryCacheExpiryTime =
        TimeDuration.valueOf(conf.haMasterRatisRetryCacheExpiryTime(), TimeUnit.SECONDS);
    RaftServerConfigKeys.RetryCache.setExpiryTime(properties, retryCacheExpiryTime);

    // Set the server min and max timeout
    TimeDuration rpcTimeoutMin =
        TimeDuration.valueOf(conf.haMasterRatisRpcTimeoutMin(), TimeUnit.SECONDS);
    TimeDuration rpcTimeoutMax =
        TimeDuration.valueOf(conf.haMasterRatisRpcTimeoutMax(), TimeUnit.SECONDS);
    RaftServerConfigKeys.Rpc.setTimeoutMin(properties, rpcTimeoutMin);
    RaftServerConfigKeys.Rpc.setTimeoutMax(properties, rpcTimeoutMax);

    TimeDuration firstElectionTimeoutMin =
        TimeDuration.valueOf(conf.haMasterRatisFirstElectionTimeoutMin(), TimeUnit.SECONDS);
    TimeDuration firstElectionTimeoutMax =
        TimeDuration.valueOf(conf.haMasterRatisFirstElectionTimeoutMax(), TimeUnit.SECONDS);
    RaftServerConfigKeys.Rpc.setFirstElectionTimeoutMin(properties, firstElectionTimeoutMin);
    RaftServerConfigKeys.Rpc.setFirstElectionTimeoutMax(properties, firstElectionTimeoutMax);

    boolean leaderElectionMemberMajorityAdd = conf.hasMasterRatisLeaderElectionMemberMajorityAdd();
    RaftServerConfigKeys.LeaderElection.setMemberMajorityAdd(
        properties, leaderElectionMemberMajorityAdd);

    // Set the rpc client timeout
    TimeDuration clientRpcTimeout =
        TimeDuration.valueOf(conf.haMasterRatisClientRpcTimeout(), TimeUnit.SECONDS);
    TimeDuration clientRpcWatchTimeout =
        TimeDuration.valueOf(conf.haMasterRatisClientRpcWatchTimeout(), TimeUnit.SECONDS);
    RaftClientConfigKeys.Rpc.setRequestTimeout(properties, clientRpcTimeout);
    RaftClientConfigKeys.Rpc.setWatchRequestTimeout(properties, clientRpcWatchTimeout);

    // Set the number of maximum cached segments
    RaftServerConfigKeys.Log.setSegmentCacheNumMax(properties, 2);

    TimeDuration noLeaderTimeout =
        TimeDuration.valueOf(conf.haMasterRatisNotificationNoLeaderTimeout(), TimeUnit.SECONDS);
    RaftServerConfigKeys.Notification.setNoLeaderTimeout(properties, noLeaderTimeout);
    TimeDuration slownessTimeout =
        TimeDuration.valueOf(conf.haMasterRatisRpcSlownessTimeout(), TimeUnit.SECONDS);
    RaftServerConfigKeys.Rpc.setSlownessTimeout(properties, slownessTimeout);

    // Set role checker time
    this.roleCheckIntervalMs = conf.haMasterRatisRoleCheckInterval();

    // snapshot retention
    int numSnapshotRetentionFileNum = conf.haMasterRatisSnapshotRetentionFileNum();
    RaftServerConfigKeys.Snapshot.setRetentionFileNum(properties, numSnapshotRetentionFileNum);

    // snapshot interval
    RaftServerConfigKeys.Snapshot.setAutoTriggerEnabled(
        properties, conf.haMasterRatisSnapshotAutoTriggerEnabled());

    long snapshotAutoTriggerThreshold = conf.haMasterRatisSnapshotAutoTriggerThreshold();
    RaftServerConfigKeys.Snapshot.setAutoTriggerThreshold(properties, snapshotAutoTriggerThreshold);

    for (Map.Entry<String, String> ratisEntry : conf.haRatisCustomConfigs().entrySet()) {
      properties.set(ratisEntry.getKey().replace("celeborn.ratis.", ""), ratisEntry.getValue());
    }

    return properties;
  }