in master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java [284:408]
private RaftProperties newRaftProperties(CelebornConf conf, RpcType rpc) {
final RaftProperties properties = new RaftProperties();
// Set RPC type
RaftConfigKeys.Rpc.setType(properties, rpc);
// Set the ratis port number
if (rpc == SupportedRpcType.GRPC) {
GrpcConfigKeys.Server.setPort(properties, ratisAddr.getPort());
} else if (rpc == SupportedRpcType.NETTY) {
NettyConfigKeys.Server.setPort(properties, ratisAddr.getPort());
}
// Set Ratis storage directory
String storageDir = conf.haMasterRatisStorageDir();
RaftServerConfigKeys.setStorageDir(properties, Collections.singletonList(new File(storageDir)));
// Set RAFT segment size
long raftSegmentSize = conf.haMasterRatisLogSegmentSizeMax();
RaftServerConfigKeys.Log.setSegmentSizeMax(properties, SizeInBytes.valueOf(raftSegmentSize));
RaftServerConfigKeys.Log.setPurgeUptoSnapshotIndex(properties, true);
// Set RAFT segment pre-allocated size
long raftSegmentPreallocatedSize = conf.haMasterRatisLogPreallocatedSize();
long raftSegmentWriteBufferSize = conf.haMasterRatisLogWriteBufferSize();
int logAppenderQueueNumElements = conf.haMasterRatisLogAppenderQueueNumElements();
long logAppenderQueueByteLimit = conf.haMasterRatisLogAppenderQueueBytesLimit();
// RATIS-589. Eliminate buffer copying in SegmentedRaftLogOutputStream.
// 4 bytes (serialized size) + logAppenderQueueByteLimit + 4 bytes (checksum)
if (raftSegmentWriteBufferSize < logAppenderQueueByteLimit + 8) {
throw new IllegalArgumentException(
CelebornConf.HA_MASTER_RATIS_LOG_WRITE_BUFFER_SIZE().key()
+ " (= "
+ raftSegmentWriteBufferSize
+ ") is less than "
+ CelebornConf.HA_MASTER_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT().key()
+ " + 8 (= "
+ (logAppenderQueueByteLimit + 8)
+ ")");
}
boolean shouldInstallSnapshot = conf.haMasterRatisLogInstallSnapshotEnabled();
RaftServerConfigKeys.Log.Appender.setBufferElementLimit(
properties, logAppenderQueueNumElements);
RaftServerConfigKeys.Log.Appender.setBufferByteLimit(
properties, SizeInBytes.valueOf(logAppenderQueueByteLimit));
RaftServerConfigKeys.Log.setPreallocatedSize(
properties, SizeInBytes.valueOf(raftSegmentPreallocatedSize));
RaftServerConfigKeys.Log.setWriteBufferSize(
properties, SizeInBytes.valueOf(raftSegmentWriteBufferSize));
RaftServerConfigKeys.Log.Appender.setInstallSnapshotEnabled(properties, shouldInstallSnapshot);
int logPurgeGap = conf.haMasterRatisLogPurgeGap();
RaftServerConfigKeys.Log.setPurgeGap(properties, logPurgeGap);
// For grpc set the maximum message size
// RATIS-2135. The leader keeps sending inconsistent entries repeatedly to followers.
// raft.grpc.message.size.max must be 1m larger than raft.server.log.appender.buffer.byte-limit
GrpcConfigKeys.setMessageSizeMax(
properties, SizeInBytes.valueOf(logAppenderQueueByteLimit + SizeInBytes.ONE_MB.getSize()));
// Set the server request timeout
TimeDuration serverRequestTimeout =
TimeDuration.valueOf(conf.haMasterRatisRpcRequestTimeout(), TimeUnit.SECONDS);
RaftServerConfigKeys.Rpc.setRequestTimeout(properties, serverRequestTimeout);
// Set timeout for server retry cache entry
TimeDuration retryCacheExpiryTime =
TimeDuration.valueOf(conf.haMasterRatisRetryCacheExpiryTime(), TimeUnit.SECONDS);
RaftServerConfigKeys.RetryCache.setExpiryTime(properties, retryCacheExpiryTime);
// Set the server min and max timeout
TimeDuration rpcTimeoutMin =
TimeDuration.valueOf(conf.haMasterRatisRpcTimeoutMin(), TimeUnit.SECONDS);
TimeDuration rpcTimeoutMax =
TimeDuration.valueOf(conf.haMasterRatisRpcTimeoutMax(), TimeUnit.SECONDS);
RaftServerConfigKeys.Rpc.setTimeoutMin(properties, rpcTimeoutMin);
RaftServerConfigKeys.Rpc.setTimeoutMax(properties, rpcTimeoutMax);
TimeDuration firstElectionTimeoutMin =
TimeDuration.valueOf(conf.haMasterRatisFirstElectionTimeoutMin(), TimeUnit.SECONDS);
TimeDuration firstElectionTimeoutMax =
TimeDuration.valueOf(conf.haMasterRatisFirstElectionTimeoutMax(), TimeUnit.SECONDS);
RaftServerConfigKeys.Rpc.setFirstElectionTimeoutMin(properties, firstElectionTimeoutMin);
RaftServerConfigKeys.Rpc.setFirstElectionTimeoutMax(properties, firstElectionTimeoutMax);
boolean leaderElectionMemberMajorityAdd = conf.hasMasterRatisLeaderElectionMemberMajorityAdd();
RaftServerConfigKeys.LeaderElection.setMemberMajorityAdd(
properties, leaderElectionMemberMajorityAdd);
// Set the rpc client timeout
TimeDuration clientRpcTimeout =
TimeDuration.valueOf(conf.haMasterRatisClientRpcTimeout(), TimeUnit.SECONDS);
TimeDuration clientRpcWatchTimeout =
TimeDuration.valueOf(conf.haMasterRatisClientRpcWatchTimeout(), TimeUnit.SECONDS);
RaftClientConfigKeys.Rpc.setRequestTimeout(properties, clientRpcTimeout);
RaftClientConfigKeys.Rpc.setWatchRequestTimeout(properties, clientRpcWatchTimeout);
// Set the number of maximum cached segments
RaftServerConfigKeys.Log.setSegmentCacheNumMax(properties, 2);
TimeDuration noLeaderTimeout =
TimeDuration.valueOf(conf.haMasterRatisNotificationNoLeaderTimeout(), TimeUnit.SECONDS);
RaftServerConfigKeys.Notification.setNoLeaderTimeout(properties, noLeaderTimeout);
TimeDuration slownessTimeout =
TimeDuration.valueOf(conf.haMasterRatisRpcSlownessTimeout(), TimeUnit.SECONDS);
RaftServerConfigKeys.Rpc.setSlownessTimeout(properties, slownessTimeout);
// Set role checker time
this.roleCheckIntervalMs = conf.haMasterRatisRoleCheckInterval();
// snapshot retention
int numSnapshotRetentionFileNum = conf.haMasterRatisSnapshotRetentionFileNum();
RaftServerConfigKeys.Snapshot.setRetentionFileNum(properties, numSnapshotRetentionFileNum);
// snapshot interval
RaftServerConfigKeys.Snapshot.setAutoTriggerEnabled(
properties, conf.haMasterRatisSnapshotAutoTriggerEnabled());
long snapshotAutoTriggerThreshold = conf.haMasterRatisSnapshotAutoTriggerThreshold();
RaftServerConfigKeys.Snapshot.setAutoTriggerThreshold(properties, snapshotAutoTriggerThreshold);
for (Map.Entry<String, String> ratisEntry : conf.haRatisCustomConfigs().entrySet()) {
properties.set(ratisEntry.getKey().replace("celeborn.ratis.", ""), ratisEntry.getValue());
}
return properties;
}