in inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerConfig.java [257:462]
private void loadBrokerSectConf(final Ini iniConf) {
// #lizard forgives
final Section brokerSect = iniConf.get(SECT_TOKEN_BROKER);
if (brokerSect == null) {
throw new IllegalArgumentException("Require broker section in configure file not Blank!");
}
this.brokerId = this.getInt(brokerSect, "brokerId");
this.port = this.getInt(brokerSect, "port", 8123);
if (TStringUtils.isBlank(brokerSect.get("primaryPath"))) {
throw new IllegalArgumentException("Require primaryPath not Blank!");
}
this.primaryPath = brokerSect.get("primaryPath").trim();
if (this.primaryPath.endsWith(File.separator)) {
this.primaryPath = this.primaryPath.substring(0,
this.primaryPath.length() - File.separator.length());
if (this.primaryPath.isEmpty()) {
throw new IllegalArgumentException(new StringBuilder(256)
.append("Parameter primaryPath(")
.append(brokerSect.get("primaryPath").trim())
.append(" only include separator! ").toString());
}
}
if (TStringUtils.isBlank(brokerSect.get("hostName"))) {
throw new IllegalArgumentException(new StringBuilder(256).append("hostName is null or Blank in ")
.append(SECT_TOKEN_BROKER).append(" section!").toString());
}
// enableWriteOffset2Zk
if (TStringUtils.isNotBlank(brokerSect.get("enableWriteOffset2Zk"))) {
this.enableWriteOffset2Zk = getBoolean(brokerSect, "enableWriteOffset2Zk");
}
if (TStringUtils.isBlank(brokerSect.get("offsetStgFilePath"))) {
this.offsetStgFilePath = this.primaryPath;
} else {
this.offsetStgFilePath = brokerSect.get("offsetStgFilePath").trim();
if (this.offsetStgFilePath.endsWith(File.separator)) {
this.offsetStgFilePath = this.offsetStgFilePath.substring(0,
this.offsetStgFilePath.length() - File.separator.length());
if (this.offsetStgFilePath.isEmpty()) {
throw new IllegalArgumentException(new StringBuilder(256)
.append("Parameter offsetStgFilePath(")
.append(brokerSect.get("offsetStgFilePath").trim())
.append(" only include separator! ").toString());
}
}
}
if (TStringUtils.isNotBlank(brokerSect.get("offsetStgCacheFlushMs"))) {
this.offsetStgCacheFlushMs =
Math.min(getLong(brokerSect, "offsetStgCacheFlushMs"), 1000L);
this.offsetStgFileSyncMs = this.offsetStgCacheFlushMs + 1000L;
}
if (TStringUtils.isNotBlank(brokerSect.get("grpOffsetStgExpMs"))) {
this.grpOffsetStgExpMs = Math.min(getLong(brokerSect, "grpOffsetStgExpMs"),
TServerConstants.CFG_MIN_GROUP_OFFSETS_STG_EXPIRED_DUR_MS);
}
if (TStringUtils.isNotBlank(brokerSect.get("offsetStgFileSyncMs"))) {
this.offsetStgFileSyncMs =
Math.min(getLong(brokerSect, "offsetStgFileSyncMs"),
this.offsetStgCacheFlushMs + 1000L);
}
if (TStringUtils.isNotBlank(brokerSect.get("offsetStgSyncDurWarnMs"))) {
this.offsetStgSyncDurWarnMs =
Math.min(getLong(brokerSect, "offsetStgSyncDurWarnMs"),
this.offsetStgFileSyncMs + 1000L);
}
if (TStringUtils.isNotBlank(brokerSect.get("defEthName"))) {
this.defEthName = brokerSect.get("defEthName").trim();
}
if (TStringUtils.isNotBlank(brokerSect.get("hostName"))) {
this.hostName = brokerSect.get("hostName").trim();
} else {
try {
this.hostName = AddressUtils.getIPV4LocalAddress(this.defEthName);
} catch (Throwable e) {
throw new IllegalArgumentException(new StringBuilder(256)
.append("Get default broker hostName failure : ")
.append(e.getMessage()).toString());
}
}
if (TStringUtils.isBlank(brokerSect.get("masterAddressList"))) {
throw new IllegalArgumentException(new StringBuilder(256)
.append("masterAddressList is null or Blank in ")
.append(SECT_TOKEN_BROKER).append(" section!").toString());
}
this.masterAddressList = brokerSect.get("masterAddressList");
if (TStringUtils.isNotBlank(brokerSect.get("webPort"))) {
this.webPort = this.getInt(brokerSect, "webPort");
}
this.maxSegmentSize = this.getInt(brokerSect, "maxSegmentSize");
this.transferSize = this.getInt(brokerSect, "transferSize");
if (TStringUtils.isNotBlank(brokerSect.get("indexTransCount"))) {
this.indexTransCount = this.getInt(brokerSect, "indexTransCount");
}
if (TStringUtils.isNotBlank(brokerSect.get("logClearupDurationMs"))) {
this.logClearupDurationMs = getLong(brokerSect, "logClearupDurationMs");
if (this.logClearupDurationMs < 60 * 1000) {
this.logClearupDurationMs = 60 * 1000;
}
}
if (TStringUtils.isNotBlank(brokerSect.get("logFlushDiskDurMs"))) {
this.logFlushDiskDurMs = getLong(brokerSect, "logFlushDiskDurMs");
if (this.logFlushDiskDurMs < 10000) {
this.logFlushDiskDurMs = 10000;
}
}
if (TStringUtils.isNotBlank(brokerSect.get("logFlushMemDurMs"))) {
this.logFlushMemDurMs = getLong(brokerSect, "logFlushMemDurMs");
if (this.logFlushMemDurMs < 10000) {
this.logFlushMemDurMs = 10000;
}
}
if (TStringUtils.isNotBlank(brokerSect.get("authValidTimeStampPeriodMs"))) {
long tmpPeriodMs = this.getLong(brokerSect, "authValidTimeStampPeriodMs");
this.authValidTimeStampPeriodMs =
tmpPeriodMs < 5000 ? 5000 : tmpPeriodMs > 120000 ? 120000 : tmpPeriodMs;
}
if (TStringUtils.isNotBlank(brokerSect.get("visitTokenCheckInValidTimeMs"))) {
long tmpPeriodMs = this.getLong(brokerSect, "visitTokenCheckInValidTimeMs");
this.visitTokenCheckInValidTimeMs =
tmpPeriodMs < 60000 ? 60000 : tmpPeriodMs > 300000 ? 300000 : tmpPeriodMs;
}
if (TStringUtils.isNotBlank(brokerSect.get("socketSendBuffer"))) {
this.socketSendBuffer = getLong(brokerSect, "socketSendBuffer");
}
if (TStringUtils.isNotBlank(brokerSect.get("socketRecvBuffer"))) {
this.socketRecvBuffer = getLong(brokerSect, "socketRecvBuffer");
}
if (TStringUtils.isNotBlank(brokerSect.get("maxIndexSegmentSize"))) {
this.maxIndexSegmentSize = getInt(brokerSect, "maxIndexSegmentSize");
}
if (!TStringUtils.isBlank(brokerSect.get("updateConsumerOffsets"))) {
this.updateConsumerOffsets = getBoolean(brokerSect, "updateConsumerOffsets");
}
if (TStringUtils.isNotBlank(brokerSect.get("rpcReadTimeoutMs"))) {
this.rpcReadTimeoutMs = getLong(brokerSect, "rpcReadTimeoutMs");
}
if (TStringUtils.isNotBlank(brokerSect.get("nettyWriteBufferHighWaterMark"))) {
this.nettyWriteBufferHighWaterMark = getLong(brokerSect, "nettyWriteBufferHighWaterMark");
}
if (TStringUtils.isNotBlank(brokerSect.get("nettyWriteBufferLowWaterMark"))) {
this.nettyWriteBufferLowWaterMark = getLong(brokerSect, "nettyWriteBufferLowWaterMark");
}
if (TStringUtils.isNotBlank(brokerSect.get("heartbeatPeriodMs"))) {
this.heartbeatPeriodMs = getLong(brokerSect, "heartbeatPeriodMs");
}
if (TStringUtils.isNotBlank(brokerSect.get("tcpWriteServiceThread"))) {
this.tcpWriteServiceThread = getInt(brokerSect, "tcpWriteServiceThread");
}
if (TStringUtils.isNotBlank(brokerSect.get("tcpReadServiceThread"))) {
this.tcpReadServiceThread = getInt(brokerSect, "tcpReadServiceThread");
}
if (TStringUtils.isNotBlank(brokerSect.get("tlsWriteServiceThread"))) {
this.tlsWriteServiceThread = getInt(brokerSect, "tlsWriteServiceThread");
}
if (TStringUtils.isNotBlank(brokerSect.get("tlsReadServiceThread"))) {
this.tlsReadServiceThread = getInt(brokerSect, "tlsReadServiceThread");
}
if (TStringUtils.isNotBlank(brokerSect.get("consumerRegTimeoutMs"))) {
this.consumerRegTimeoutMs = getInt(brokerSect, "consumerRegTimeoutMs");
if (this.consumerRegTimeoutMs < 20000) {
this.consumerRegTimeoutMs = 20000;
}
}
if (TStringUtils.isNotBlank(brokerSect.get("defaultDeduceReadSize"))) {
this.defaultDeduceReadSize = getLong(brokerSect, "defaultDeduceReadSize");
this.defaultDoubleDeduceReadSize = this.defaultDeduceReadSize * 2;
}
if (TStringUtils.isNotBlank(brokerSect.get("rowLockWaitDurMs"))) {
this.rowLockWaitDurMs = getInt(brokerSect, "rowLockWaitDurMs");
}
if (TStringUtils.isNotBlank(brokerSect.get("allowedReadIOExcptCnt"))) {
this.allowedReadIOExcptCnt = getInt(brokerSect, "allowedReadIOExcptCnt");
}
if (TStringUtils.isNotBlank(brokerSect.get("allowedWriteIOExcptCnt"))) {
this.allowedWriteIOExcptCnt = getInt(brokerSect, "allowedWriteIOExcptCnt");
}
if (TStringUtils.isNotBlank(brokerSect.get("ioExcptStatsDurationMs"))) {
this.ioExcptStatsDurationMs = getLong(brokerSect, "ioExcptStatsDurationMs");
}
if (TStringUtils.isNotBlank(brokerSect.get("visitMasterAuth"))) {
this.visitMasterAuth = this.getBoolean(brokerSect, "visitMasterAuth");
}
if (this.visitMasterAuth) {
if (TStringUtils.isBlank(brokerSect.get("visitName"))) {
throw new IllegalArgumentException(new StringBuilder(256)
.append("visitName is null or Blank in ")
.append(SECT_TOKEN_BROKER).append(" section!").toString());
}
if (TStringUtils.isBlank(brokerSect.get("visitPassword"))) {
throw new IllegalArgumentException(new StringBuilder(256)
.append("visitPassword is null or Blank in ").append(SECT_TOKEN_BROKER)
.append(" section!").toString());
}
this.visitName = brokerSect.get("visitName").trim();
this.visitPassword = brokerSect.get("visitPassword").trim();
}
if (TStringUtils.isNotBlank(brokerSect.get("groupOffsetScanDurMs"))) {
this.groupOffsetScanDurMs =
MixedUtils.mid(getLong(brokerSect, "groupOffsetScanDurMs"),
TServerConstants.CFG_MIN_GROUP_OFFSET_SCAN_DUR,
TServerConstants.CFG_MAX_GROUP_OFFSET_SCAN_DUR);
}
if (TStringUtils.isNotBlank(brokerSect.get("enableMemStore"))) {
this.enableMemStore = this.getBoolean(brokerSect, "enableMemStore");
}
}