in inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/MasterConfig.java [343:536]
private void loadSystemConf(final Ini iniConf) {
final Profile.Section masterConf = iniConf.get(SECT_TOKEN_MASTER);
if (masterConf == null) {
throw new IllegalArgumentException(new StringBuilder(256)
.append(SECT_TOKEN_MASTER).append(" configure section is required!").toString());
}
Set<String> configKeySet = masterConf.keySet();
if (configKeySet.isEmpty()) { /* Should have a least one config item */
throw new IllegalArgumentException(new StringBuilder(256)
.append("Empty configure item in ").append(SECT_TOKEN_MASTER)
.append(" section!").toString());
}
// port
this.port = this.getInt(masterConf, "port",
TBaseConstants.META_DEFAULT_MASTER_PORT);
// hostname
if (TStringUtils.isNotBlank(masterConf.get("hostName"))) {
this.hostName = masterConf.get("hostName").trim();
} else {
try {
this.hostName = AddressUtils.getIPV4LocalAddress();
} catch (Throwable e) {
throw new IllegalArgumentException(new StringBuilder(256)
.append("Get default master hostName failure : ")
.append(e.getMessage()).toString());
}
}
// web port
if (TStringUtils.isNotBlank(masterConf.get("webPort"))) {
this.webPort = this.getInt(masterConf, "webPort");
}
// web resource path
if (TStringUtils.isBlank(masterConf.get("webResourcePath"))) {
throw new IllegalArgumentException(new StringBuilder(256)
.append("webResourcePath is null or Blank in ").append(SECT_TOKEN_MASTER)
.append(" section!").toString());
}
this.webResourcePath = masterConf.get("webResourcePath").trim();
if (TStringUtils.isNotBlank(masterConf.get("consumerBalancePeriodMs"))) {
this.consumerBalancePeriodMs =
this.getInt(masterConf, "consumerBalancePeriodMs");
}
if (TStringUtils.isNotBlank(masterConf.get("firstBalanceDelayAfterStartMs"))) {
this.firstBalanceDelayAfterStartMs =
this.getInt(masterConf, "firstBalanceDelayAfterStartMs");
}
if (TStringUtils.isNotBlank(masterConf.get("consumerHeartbeatTimeoutMs"))) {
this.consumerHeartbeatTimeoutMs =
this.getInt(masterConf, "consumerHeartbeatTimeoutMs");
}
if (TStringUtils.isNotBlank(masterConf.get("producerHeartbeatTimeoutMs"))) {
this.producerHeartbeatTimeoutMs =
this.getInt(masterConf, "producerHeartbeatTimeoutMs");
}
if (TStringUtils.isNotBlank(masterConf.get("brokerHeartbeatTimeoutMs"))) {
this.brokerHeartbeatTimeoutMs =
this.getInt(masterConf, "brokerHeartbeatTimeoutMs");
}
if (TStringUtils.isNotBlank(masterConf.get("socketSendBuffer"))) {
this.socketSendBuffer = this.getLong(masterConf, "socketSendBuffer");
}
if (TStringUtils.isNotBlank(masterConf.get("socketRecvBuffer"))) {
this.socketRecvBuffer = this.getLong(masterConf, "socketRecvBuffer");
}
if (TStringUtils.isNotBlank(masterConf.get("rpcReadTimeoutMs"))) {
this.rpcReadTimeoutMs =
this.getLong(masterConf, "rpcReadTimeoutMs");
}
if (TStringUtils.isNotBlank(masterConf.get("nettyWriteBufferHighWaterMark"))) {
this.nettyWriteBufferHighWaterMark =
this.getLong(masterConf, "nettyWriteBufferHighWaterMark");
}
if (TStringUtils.isNotBlank(masterConf.get("nettyWriteBufferLowWaterMark"))) {
this.nettyWriteBufferLowWaterMark =
this.getLong(masterConf, "nettyWriteBufferLowWaterMark");
}
if (TStringUtils.isNotBlank(masterConf.get("onlineOnlyReadToRWPeriodMs"))) {
this.onlineOnlyReadToRWPeriodMs =
this.getLong(masterConf, "onlineOnlyReadToRWPeriodMs");
}
if (TStringUtils.isNotBlank(masterConf.get("stepChgWaitPeriodMs"))) {
this.stepChgWaitPeriodMs =
this.getLong(masterConf, "stepChgWaitPeriodMs");
}
if (TStringUtils.isNotBlank(masterConf.get("offlineOnlyReadToRWPeriodMs"))) {
this.offlineOnlyReadToRWPeriodMs =
this.getLong(masterConf, "offlineOnlyReadToRWPeriodMs");
}
if (TStringUtils.isNotBlank(masterConf.get("confModAuthToken"))) {
String tmpAuthToken = masterConf.get("confModAuthToken").trim();
if (tmpAuthToken.length() > TServerConstants.CFG_MODAUTHTOKEN_MAX_LENGTH) {
throw new IllegalArgumentException(
"Invalid value: the length of confModAuthToken's value > "
+ TServerConstants.CFG_MODAUTHTOKEN_MAX_LENGTH);
}
this.confModAuthToken = tmpAuthToken;
}
if (TStringUtils.isNotBlank(masterConf.get("maxGroupBrokerConsumeRate"))) {
this.maxGroupBrokerConsumeRate =
this.getInt(masterConf, "maxGroupBrokerConsumeRate");
if (this.maxGroupBrokerConsumeRate <= 0) {
throw new IllegalArgumentException(
"Invalid value: maxGroupBrokerConsumeRate's value must > 0 !");
}
}
if (TStringUtils.isNotBlank(masterConf.get("maxGroupRebalanceWaitPeriod"))) {
this.maxGroupRebalanceWaitPeriod =
this.getInt(masterConf, "maxGroupRebalanceWaitPeriod");
}
if (TStringUtils.isNotBlank(masterConf.get("startOffsetResetCheck"))) {
this.startOffsetResetCheck =
this.getBoolean(masterConf, "startOffsetResetCheck");
}
if (TStringUtils.isNotBlank(masterConf.get("rowLockWaitDurMs"))) {
this.rowLockWaitDurMs =
this.getInt(masterConf, "rowLockWaitDurMs");
}
if (TStringUtils.isNotBlank(masterConf.get("maxAutoForbiddenCnt"))) {
this.maxAutoForbiddenCnt =
this.getInt(masterConf, "maxAutoForbiddenCnt");
}
if (TStringUtils.isNotBlank(masterConf.get("visitTokenValidPeriodMs"))) {
long tmpPeriodMs = this.getLong(masterConf, "visitTokenValidPeriodMs");
if (tmpPeriodMs < 3 * 60 * 1000) { /* Min value is 3 min */
tmpPeriodMs = 3 * 60 * 1000;
}
this.visitTokenValidPeriodMs = tmpPeriodMs;
}
if (TStringUtils.isNotBlank(masterConf.get("authValidTimeStampPeriodMs"))) {
long tmpPeriodMs = this.getLong(masterConf, "authValidTimeStampPeriodMs");
// must between 5,000 ms and 120,000 ms
this.authValidTimeStampPeriodMs =
tmpPeriodMs < 5000 ? 5000 : tmpPeriodMs > 120000 ? 120000 : tmpPeriodMs;
}
if (TStringUtils.isNotBlank(masterConf.get("startVisitTokenCheck"))) {
this.startVisitTokenCheck = this.getBoolean(masterConf, "startVisitTokenCheck");
}
if (TStringUtils.isNotBlank(masterConf.get("startProduceAuthenticate"))) {
this.startProduceAuthenticate = this.getBoolean(masterConf, "startProduceAuthenticate");
}
if (TStringUtils.isNotBlank(masterConf.get("startProduceAuthorize"))) {
this.startProduceAuthorize = this.getBoolean(masterConf, "startProduceAuthorize");
}
if (TStringUtils.isNotBlank(masterConf.get("useWebProxy"))) {
this.useWebProxy = this.getBoolean(masterConf, "useWebProxy");
}
if (!this.startProduceAuthenticate && this.startProduceAuthorize) {
throw new IllegalArgumentException(
"startProduceAuthenticate must set true if startProduceAuthorize is true!");
}
if (TStringUtils.isNotBlank(masterConf.get("startConsumeAuthenticate"))) {
this.startConsumeAuthenticate = this.getBoolean(masterConf, "startConsumeAuthenticate");
}
if (TStringUtils.isNotBlank(masterConf.get("startConsumeAuthorize"))) {
this.startConsumeAuthorize = this.getBoolean(masterConf, "startConsumeAuthorize");
}
if (!this.startConsumeAuthenticate && this.startConsumeAuthorize) {
throw new IllegalArgumentException(
"startConsumeAuthenticate must set true if startConsumeAuthorize is true!");
}
if (TStringUtils.isNotBlank(masterConf.get("needBrokerVisitAuth"))) {
this.needBrokerVisitAuth = this.getBoolean(masterConf, "needBrokerVisitAuth");
}
if (this.needBrokerVisitAuth) {
if (TStringUtils.isBlank(masterConf.get("visitName"))) {
throw new IllegalArgumentException(new StringBuilder(256)
.append("visitName is null or Blank in ").append(SECT_TOKEN_MASTER)
.append(" section!").toString());
}
if (TStringUtils.isBlank(masterConf.get("visitPassword"))) {
throw new IllegalArgumentException(new StringBuilder(256)
.append("visitPassword is null or Blank in ").append(SECT_TOKEN_MASTER)
.append(" section!").toString());
}
this.visitName = masterConf.get("visitName").trim();
this.visitPassword = masterConf.get("visitPassword").trim();
}
if (TStringUtils.isNotBlank(masterConf.get("rebalanceParallel"))) {
int tmpParallel = this.getInt(masterConf, "rebalanceParallel");
this.rebalanceParallel = MixedUtils.mid(tmpParallel, 1, 20);
}
if (TStringUtils.isNotBlank(masterConf.get("maxMetaForceUpdatePeriodMs"))) {
long tmpPeriodMs = this.getLong(masterConf, "maxMetaForceUpdatePeriodMs");
if (tmpPeriodMs < TBaseConstants.CFG_MIN_META_FORCE_UPDATE_PERIOD) {
tmpPeriodMs = TBaseConstants.CFG_MIN_META_FORCE_UPDATE_PERIOD;
}
this.maxMetaForceUpdatePeriodMs = tmpPeriodMs;
}
}