in zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java [274:499]
public void parseProperties(Properties zkProp) throws IOException, ConfigException {
Integer clientPort = null;
Integer secureClientPort = null;
int observerMasterPort = 0;
String clientPortAddress = null;
String secureClientPortAddress = null;
VerifyingFileFactory vff = new VerifyingFileFactory.Builder(LOG).warnForRelativePath().build();
for (Entry<Object, Object> entry : zkProp.entrySet()) {
String key = entry.getKey().toString().trim();
String value = entry.getValue().toString().trim();
if (key.equals("dataDir")) {
dataDir = vff.create(value);
} else if (key.equals("dataLogDir")) {
dataLogDir = vff.create(value);
} else if (key.equals("clientPort")) {
clientPort = Integer.parseInt(value);
} else if (key.equals("localSessionsEnabled")) {
localSessionsEnabled = parseBoolean(key, value);
} else if (key.equals("localSessionsUpgradingEnabled")) {
localSessionsUpgradingEnabled = parseBoolean(key, value);
} else if (key.equals("clientPortAddress")) {
clientPortAddress = value.trim();
} else if (key.equals("secureClientPort")) {
secureClientPort = Integer.parseInt(value);
} else if (key.equals("secureClientPortAddress")) {
secureClientPortAddress = value.trim();
} else if (key.equals("observerMasterPort")) {
observerMasterPort = Integer.parseInt(value);
} else if (key.equals("clientPortListenBacklog")) {
clientPortListenBacklog = Integer.parseInt(value);
} else if (key.equals("tickTime")) {
tickTime = Integer.parseInt(value);
} else if (key.equals("maxClientCnxns")) {
maxClientCnxns = Integer.parseInt(value);
} else if (key.equals("minSessionTimeout")) {
minSessionTimeout = Integer.parseInt(value);
} else if (key.equals("maxSessionTimeout")) {
maxSessionTimeout = Integer.parseInt(value);
} else if (key.equals("initLimit")) {
initLimit = Integer.parseInt(value);
} else if (key.equals("syncLimit")) {
syncLimit = Integer.parseInt(value);
} else if (key.equals("connectToLearnerMasterLimit")) {
connectToLearnerMasterLimit = Integer.parseInt(value);
} else if (key.equals("electionAlg")) {
electionAlg = Integer.parseInt(value);
if (electionAlg != 3) {
throw new ConfigException("Invalid electionAlg value. Only 3 is supported.");
}
} else if (key.equals("quorumListenOnAllIPs")) {
quorumListenOnAllIPs = parseBoolean(key, value);
} else if (key.equals("peerType")) {
if (value.toLowerCase().equals("observer")) {
peerType = LearnerType.OBSERVER;
} else if (value.toLowerCase().equals("participant")) {
peerType = LearnerType.PARTICIPANT;
} else {
throw new ConfigException("Unrecognised peertype: " + value);
}
} else if (key.equals("syncEnabled")) {
syncEnabled = parseBoolean(key, value);
} else if (key.equals("dynamicConfigFile")) {
dynamicConfigFileStr = value;
} else if (key.equals("autopurge.snapRetainCount")) {
snapRetainCount = Integer.parseInt(value);
} else if (key.equals("autopurge.purgeInterval")) {
if (Character.isDigit(value.charAt(value.length() - 1))) {
purgeIntervalInMs = Time.parseTimeInterval(value) * Time.HOUR; // default is hours for backward compatibility
} else {
purgeIntervalInMs = Time.parseTimeInterval(value);
}
} else if (key.equals("standaloneEnabled")) {
setStandaloneEnabled(parseBoolean(key, value));
} else if (key.equals("reconfigEnabled")) {
setReconfigEnabled(parseBoolean(key, value));
} else if (key.equals("sslQuorum")) {
sslQuorum = parseBoolean(key, value);
} else if (key.equals("portUnification")) {
shouldUsePortUnification = parseBoolean(key, value);
} else if (key.equals("sslQuorumReloadCertFiles")) {
sslQuorumReloadCertFiles = parseBoolean(key, value);
} else if ((key.startsWith("server.") || key.startsWith("group") || key.startsWith("weight"))
&& zkProp.containsKey("dynamicConfigFile")) {
throw new ConfigException("parameter: " + key + " must be in a separate dynamic config file");
} else if (key.equals(QuorumAuth.QUORUM_SASL_AUTH_ENABLED)) {
quorumEnableSasl = parseBoolean(key, value);
} else if (key.equals(QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED)) {
quorumServerRequireSasl = parseBoolean(key, value);
} else if (key.equals(QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED)) {
quorumLearnerRequireSasl = parseBoolean(key, value);
} else if (key.equals(QuorumAuth.QUORUM_LEARNER_SASL_LOGIN_CONTEXT)) {
quorumLearnerLoginContext = value;
} else if (key.equals(QuorumAuth.QUORUM_SERVER_SASL_LOGIN_CONTEXT)) {
quorumServerLoginContext = value;
} else if (key.equals(QuorumAuth.QUORUM_KERBEROS_SERVICE_PRINCIPAL)) {
quorumServicePrincipal = value;
} else if (key.equals("quorum.cnxn.threads.size")) {
quorumCnxnThreadsSize = Integer.parseInt(value);
} else if (key.equals(JvmPauseMonitor.INFO_THRESHOLD_KEY)) {
jvmPauseInfoThresholdMs = Long.parseLong(value);
} else if (key.equals(JvmPauseMonitor.WARN_THRESHOLD_KEY)) {
jvmPauseWarnThresholdMs = Long.parseLong(value);
} else if (key.equals(JvmPauseMonitor.SLEEP_TIME_MS_KEY)) {
jvmPauseSleepTimeMs = Long.parseLong(value);
} else if (key.equals(JvmPauseMonitor.JVM_PAUSE_MONITOR_FEATURE_SWITCH_KEY)) {
jvmPauseMonitorToRun = parseBoolean(key, value);
} else if (key.equals("metricsProvider.className")) {
metricsProviderClassName = value;
} else if (key.startsWith("metricsProvider.")) {
String keyForMetricsProvider = key.substring(16);
metricsProviderConfiguration.put(keyForMetricsProvider, value);
} else if (key.equals("multiAddress.enabled")) {
multiAddressEnabled = parseBoolean(key, value);
} else if (key.equals("multiAddress.reachabilityCheckTimeoutMs")) {
multiAddressReachabilityCheckTimeoutMs = Integer.parseInt(value);
} else if (key.equals("multiAddress.reachabilityCheckEnabled")) {
multiAddressReachabilityCheckEnabled = parseBoolean(key, value);
} else if (key.equals("oraclePath")) {
oraclePath = value;
} else {
System.setProperty("zookeeper." + key, value);
}
}
if (!quorumEnableSasl && quorumServerRequireSasl) {
throw new IllegalArgumentException(QuorumAuth.QUORUM_SASL_AUTH_ENABLED
+ " is disabled, so cannot enable "
+ QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED);
}
if (!quorumEnableSasl && quorumLearnerRequireSasl) {
throw new IllegalArgumentException(QuorumAuth.QUORUM_SASL_AUTH_ENABLED
+ " is disabled, so cannot enable "
+ QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED);
}
// If quorumpeer learner is not auth enabled then self won't be able to
// join quorum. So this condition is ensuring that the quorumpeer learner
// is also auth enabled while enabling quorum server require sasl.
if (!quorumLearnerRequireSasl && quorumServerRequireSasl) {
throw new IllegalArgumentException(QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED
+ " is disabled, so cannot enable "
+ QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED);
}
// Reset to MIN_SNAP_RETAIN_COUNT if invalid (less than 3)
// PurgeTxnLog.purge(File, File, int) will not allow to purge less
// than 3.
if (snapRetainCount < MIN_SNAP_RETAIN_COUNT) {
LOG.warn("Invalid autopurge.snapRetainCount: "
+ snapRetainCount
+ ". Defaulting to "
+ MIN_SNAP_RETAIN_COUNT);
snapRetainCount = MIN_SNAP_RETAIN_COUNT;
}
if (dataDir == null) {
throw new IllegalArgumentException("dataDir is not set");
}
if (dataLogDir == null) {
dataLogDir = dataDir;
}
if (clientPort == null) {
LOG.info("clientPort is not set");
if (clientPortAddress != null) {
throw new IllegalArgumentException("clientPortAddress is set but clientPort is not set");
}
} else if (clientPortAddress != null) {
this.clientPortAddress = new InetSocketAddress(InetAddress.getByName(clientPortAddress), clientPort);
LOG.info("clientPortAddress is {}", formatInetAddr(this.clientPortAddress));
} else {
this.clientPortAddress = new InetSocketAddress(clientPort);
LOG.info("clientPortAddress is {}", formatInetAddr(this.clientPortAddress));
}
if (secureClientPort == null) {
LOG.info("secureClientPort is not set");
if (secureClientPortAddress != null) {
throw new IllegalArgumentException("secureClientPortAddress is set but secureClientPort is not set");
}
} else if (secureClientPortAddress != null) {
this.secureClientPortAddress = new InetSocketAddress(InetAddress.getByName(secureClientPortAddress), secureClientPort);
LOG.info("secureClientPortAddress is {}", formatInetAddr(this.secureClientPortAddress));
} else {
this.secureClientPortAddress = new InetSocketAddress(secureClientPort);
LOG.info("secureClientPortAddress is {}", formatInetAddr(this.secureClientPortAddress));
}
if (this.secureClientPortAddress != null) {
configureSSLAuth();
}
if (observerMasterPort <= 0) {
LOG.info("observerMasterPort is not set");
} else {
this.observerMasterPort = observerMasterPort;
LOG.info("observerMasterPort is {}", observerMasterPort);
}
if (tickTime == 0) {
throw new IllegalArgumentException("tickTime is not set");
}
minSessionTimeout = minSessionTimeout == -1 ? tickTime * 2 : minSessionTimeout;
maxSessionTimeout = maxSessionTimeout == -1 ? tickTime * 20 : maxSessionTimeout;
if (minSessionTimeout > maxSessionTimeout) {
throw new IllegalArgumentException("minSessionTimeout must not be larger than maxSessionTimeout");
}
LOG.info("metricsProvider.className is {}", metricsProviderClassName);
try {
Class.forName(metricsProviderClassName, false, Thread.currentThread().getContextClassLoader());
} catch (ClassNotFoundException error) {
throw new IllegalArgumentException("metrics provider class was not found", error);
}
// backward compatibility - dynamic configuration in the same file as
// static configuration params see writeDynamicConfig()
if (dynamicConfigFileStr == null) {
setupQuorumPeerConfig(zkProp, true);
if (isDistributed() && isReconfigEnabled()) {
// we don't backup static config for standalone mode.
// we also don't backup if reconfig feature is disabled.
backupOldConfig();
}
}
}