aios/apps/facility/swift/config/BrokerConfig.cpp (287 lines of code) (raw):

/* * Copyright 2014-present Alibaba Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "swift/config/BrokerConfig.h" #include <assert.h> #include <sstream> #include <stddef.h> #include <stdint.h> #include <string> #include "linux/sysinfo.h" #include "swift/config/ConfigDefine.h" #include "swift/config/PartitionConfig.h" #include "sys/sysinfo.h" namespace swift { namespace config { AUTIL_LOG_SETUP(swift, BrokerConfig); BrokerConfig::BrokerConfig() : amonitorPort(DEFAULT_AMONITOR_PORT) , queueSize(DEFAULT_QUEUE_SIZE) , cacheMetaCount(DEFAULT_CACHE_META_COUNT) , concurrentReadFileLimit(DEFAULT_CONCURRENT_READ_FILE_LIMIT) , maxPermissionWaitTime(DEFAULT_MAX_PERMISSION_WAIT_TIME) , maxMessageCountInOneFile(DEFAULT_MAX_MESSAGE_COUNT_IN_ONE_FILE) , reservedFileCount(DEFAULT_RESERVED_FILE_COUNT) , fileBufferUseRatio(DEFAULT_FILE_BUFFER_USE_RATIO) , fileMetaBufferUseRatio(DEFAULT_FILE_META_BUFFER_USE_RATIO) , commitThreadLoopIntervalMs(DEFAULT_COMMIT_THREAD_LOOP_INTERVAL_MS) , dfsCommitInterval(DEFAULT_DFS_COMMIT_INTERVAL) , dfsCommitIntervalForMemoryPreferTopic(DEFAULT_DFS_COMMIT_INTERVAL_FOR_MEMORY_PREFER_TOPIC) , dfsCommitIntervalWhenDelay(DEFAULT_DFS_COMMIT_INTERVAL_WHEN_DELAY) , dfsCommitThreadNum(DEFAULT_DFS_COMMIT_THREAD_NUM) , dfsCommitQueueSize(DEFAULT_DFS_COMMIT_QUEUE_SIZE) , dfsFileSplitTime(DEFAULT_DFS_FILE_SPLIT_TIME) , closeNoWriteFileTime(DEFAULT_CLOSE_NO_WRITE_FILE_TIME) , minChangeFileForDfsErrorTime(DEFAULT_FILE_CHANGE_FOR_DFS_ERROR) , maxCommitTimeAsError(DEFAULT_COMMIT_INTERVAL_AS_ERROR) , leaderLeaseTime(DEFAULT_LEADER_LEASE_TIME) , leaderLoopInterval(DEFAULT_LEADER_LOOP_INTERVAL) , loadThreadNum(DEFAULT_LOAD_THREAD_NUM) , loadQueueSize(DEFAULT_LOAD_QUEUE_SIZE) , unloadThreadNum(DEFAULT_UNLOAD_THREAD_NUM) , unloadQueueSize(DEFAULT_UNLOAD_QUEUE_SIZE) , recyclePercent(DEFAULT_RECYCLE_PERCENT) , supportMergeMsg(DEFAULT_SUPPORT_MERGE_MSG) , supportFb(DEFAULT_SUPPORT_FB) , checkFieldFilterMsg(DEFAULT_CHECK_FIELD_FILTER_MSG) , httpThreadNum(1) , httpQueueSize(100) , readQueueSize(DEFAULT_READ_QUEUE_SIZE) , statusReportIntervalSec(10) , clearPollingThreadNum(DEFAULT_CLEAR_POLLING_THREAD_NUM) , clearPollingQueueSize(DEFAULT_CLEAR_POLLING_QUEUE_SIZE) , enableFastRecover(false) , randomDelObsoleteFileInterval(true) , useRecommendPort(true) , localMode(false) , readNotCommittedMsg(true) , maxTopicAclSyncIntervalUs(10000000) , _threadNum(DEFAULT_THREAD_NUM) , _reportMetricThreadNum(0) , _partitionMinBufferSize(DEFAULT_PARTITION_MIN_BUFFER_SIZE) , _partitionMaxBufferSize(DEFAULT_PARTITION_MAX_BUFFER_SIZE) , _bufferBlockSize(DEFAULT_BUFFER_BLOCK_SIZE) , _bufferMinReserveSize(DEFAULT_BUFFER_MIN_RESERVE_SIZE) , _dfsCommitBlockSize(DEFAULT_DFS_COMMIT_BLOCK_SIZE) , _dfsFileSize(DEFAULT_DFS_FILE_SIZE) , _dfsFileMinSize(DEFAULT_DFS_FILE_MIN_SIZE) , _obsoleteFileTimeInterval(DEFAULT_OBSOLETE_FILE_TIME_INTERVAL) , _delObsoleteFileInterval(DEFAULT_DEL_OBSOLETE_FILE_INTERVAL) , _candidateObsoleteFileInterval(DEFAULT_CANDIDATE_OBSOLETE_FILE_INTERVAL) , _requestTimeout(DEFAULT_REQUEST_TIMEOUT) , _oneFileFdNum(DEFAULT_ONE_FILE_FD_NUM) , _cacheFileReserveTime(DEFAULT_CACHE_FILE_RESERVE_TIME) , _cacheBlockReserveTime(DEFAULT_CACHE_BLOCK_RESERVE_TIME) , _obsoleteReaderInterval(DEFAULT_OBSOLETE_READER_INTERVAL) , _obsoleteReaderMetricInterval(DEFAULT_OBSOLETE_METRIC_INTERVAL) , _configUnlimited(false) , _logSampleCount(DEFAULT_LOG_SAMPLE_COUNT) , _closeForceLog(true) , _maxGetMessageSizeKb(0) , _holdNoDataRequestTimeMs(0) , _noDataRequestNotfiyIntervalMs(DEFAULT_NO_DATA_REQUEST_NOTFIY_INTERVAL_MS) , _maxReadSizeSec(DEFAULT_MAX_READ_SIZE_MB_SEC) , _timestampOffsetInUs(0) { maxReadThreadNum = int(DEFAULT_THREAD_NUM * DEFAULT_READ_THREAD_RATIO) + 1; maxWriteThreadNum = int(DEFAULT_THREAD_NUM * DEFAULT_WRITE_THREAD_RATIO) + 1; _totalBufferSize = (int64_t)(getPhyMemSize() * 0.5); // default use half system memory } BrokerConfig::~BrokerConfig() {} bool BrokerConfig::validate() { if (maxReadThreadNum <= concurrentReadFileLimit) { AUTIL_LOG(ERROR, "%s[%d] should larger than %s[%d]", MAX_READ_THREAD_NUM, maxReadThreadNum, CONCURRENT_READ_FILE_LIMIT, concurrentReadFileLimit); return false; } if (0 == statusReportIntervalSec || statusReportIntervalSec > 150) { AUTIL_LOG(ERROR, "%s[%d] should less than 150 and not 0", STATUS_REPORT_INTERVAL_SEC, statusReportIntervalSec); return false; } return true; } std::string BrokerConfig::getApplicationId() { return userName + "_" + serviceName; } void BrokerConfig::getDefaultPartitionConfig(PartitionConfig &partitionConfig) const { partitionConfig.setConfigUnlimited(_configUnlimited); partitionConfig.setDataRoot(dfsRoot); partitionConfig.setPartitionMinBufferSize(_partitionMinBufferSize); partitionConfig.setPartitionMaxBufferSize(_partitionMaxBufferSize); partitionConfig.setBlockSize(_bufferBlockSize); partitionConfig.setMaxCommitSize(_dfsCommitBlockSize); partitionConfig.setMaxCommitInterval(int64_t(dfsCommitInterval * 1000000)); partitionConfig.setMaxCommitIntervalForMemoryPreferTopic(int64_t(dfsCommitIntervalForMemoryPreferTopic * 1000000)); partitionConfig.setMaxCommitIntervalWhenDelay(int64_t(dfsCommitIntervalWhenDelay * 1000000)); partitionConfig.setMaxFileSize(_dfsFileSize); partitionConfig.setMinFileSize(_dfsFileMinSize); partitionConfig.setMaxFileSplitInterval(int64_t(dfsFileSplitTime * 1000000ll)); partitionConfig.setCloseNoWriteFileInterval(int64_t(closeNoWriteFileTime * 1000000ll)); partitionConfig.setMinChangeFileForDfsErrorTime(int64_t(minChangeFileForDfsErrorTime * 1000000ll)); partitionConfig.setMaxCommitTimeAsError(int64_t(maxCommitTimeAsError * 1000ll)); partitionConfig.setObsoleteFileTimeInterval(_obsoleteFileTimeInterval); partitionConfig.setReservedFileCount(reservedFileCount); partitionConfig.setDelObsoleteFileInterval(_delObsoleteFileInterval); partitionConfig.setCandidateObsoleteFileInterval(_candidateObsoleteFileInterval); partitionConfig.setMaxMessageCountInOneFile(maxMessageCountInOneFile); partitionConfig.setCacheMetaCount(cacheMetaCount); partitionConfig.setRecyclePercent(recyclePercent); partitionConfig.setCheckFieldFilterMsg(checkFieldFilterMsg); partitionConfig.setObsoleteReaderInterval(_obsoleteReaderInterval); partitionConfig.setObsoleteReaderMetricInterval(_obsoleteReaderMetricInterval); partitionConfig.setMaxReadSizeSec(_maxReadSizeSec); partitionConfig.setReadNotCommittedMsg(readNotCommittedMsg); partitionConfig.setTimestampOffsetInUs(_timestampOffsetInUs); } void BrokerConfig::setThreadNum(int64_t value) { _threadNum = value; maxReadThreadNum = int(value * DEFAULT_READ_THREAD_RATIO) + 1; maxWriteThreadNum = int(value * DEFAULT_WRITE_THREAD_RATIO) + 1; } int BrokerConfig::getThreadNum() const { return _threadNum; } void BrokerConfig::setReportMetricThreadNum(int64_t value) { _reportMetricThreadNum = value; } int BrokerConfig::getReportMetricThreadNum() const { return _reportMetricThreadNum; } void BrokerConfig::setPartitionMinBufferSize(double value) { _partitionMinBufferSize = int64_t(value * 1024 * 1024); } int64_t BrokerConfig::getPartitionMinBufferSize() const { return _partitionMinBufferSize; } void BrokerConfig::setPartitionMaxBufferSize(double value) { _partitionMaxBufferSize = int64_t(value * 1024 * 1024); } int64_t BrokerConfig::getPartitionMaxBufferSize() const { return _partitionMaxBufferSize; } void BrokerConfig::setTotalBufferSize(double value) { if (value < 1.0) { _totalBufferSize = int64_t(getPhyMemSize() * value); } else { _totalBufferSize = int64_t(value * 1024 * 1024); } } int64_t BrokerConfig::getTotalBufferSize() const { return _totalBufferSize; } int64_t BrokerConfig::getTotalFileBufferSize() const { return (int64_t)(_totalBufferSize * fileBufferUseRatio); } int64_t BrokerConfig::getBrokerFileBufferSize() const { return (int64_t)((_totalBufferSize * fileBufferUseRatio) * (1 - fileMetaBufferUseRatio)); } int64_t BrokerConfig::getBrokerFileMetaBufferSize() const { return (int64_t)((_totalBufferSize * fileBufferUseRatio) * fileMetaBufferUseRatio); } int64_t BrokerConfig::getBrokerMessageBufferSize() const { return (int64_t)(_totalBufferSize * (1 - fileBufferUseRatio)); } void BrokerConfig::setBufferBlockSize(double value) { _bufferBlockSize = int64_t(value * 1024 * 1024); if (_bufferBlockSize > MAX_BUFFER_BLOCK_SIZE) { _bufferBlockSize = MAX_BUFFER_BLOCK_SIZE; // for memory message offset limit 22 bit } } int64_t BrokerConfig::getBufferBlockSize() const { return _bufferBlockSize; } int64_t BrokerConfig::getBufferMinReserveSize() { if (_bufferMinReserveSize <= 0) { return int64_t(getBrokerMessageBufferSize() * DEFAULT_BUFFER_MIN_RESERVE_RATIO); } else { return _bufferMinReserveSize; } } void BrokerConfig::setBufferMinReserveSize(double size) { _bufferMinReserveSize = int64_t(size * 1024 * 1024); } void BrokerConfig::setDfsCommitBlockSize(double value) { _dfsCommitBlockSize = int64_t(value * 1024 * 1024); } int64_t BrokerConfig::getDfsCommitBlockSize() const { return _dfsCommitBlockSize; } void BrokerConfig::setDfsFileSize(double value) { _dfsFileSize = int64_t(value * 1024 * 1024); } int64_t BrokerConfig::getDfsFileSize() const { return _dfsFileSize; } void BrokerConfig::setDfsFileMinSize(double value) { _dfsFileMinSize = int64_t(value * 1024 * 1024); } int64_t BrokerConfig::getDfsFileMinSize() const { return _dfsFileMinSize; } void BrokerConfig::setConfigUnlimited(std::string value) { if (value == "true") { _configUnlimited = true; } } bool BrokerConfig::getConfigUnlimited() const { return _configUnlimited; } void BrokerConfig::setObsoleteFileTimeIntervalSec(int64_t timeIntervalSec) { _obsoleteFileTimeInterval = timeIntervalSec * 1000 * 1000; } void BrokerConfig::setObsoleteFileTimeIntervalHour(int64_t timeIntervalHour) { _obsoleteFileTimeInterval = timeIntervalHour * 3600 * 1000 * 1000; } int64_t BrokerConfig::getObsoleteFileTimeInterval() const { return _obsoleteFileTimeInterval; } void BrokerConfig::setDelObsoleteFileIntervalSec(int64_t timeIntervalSec) { _delObsoleteFileInterval = timeIntervalSec * 1000 * 1000; } void BrokerConfig::addDelObsoleteFileIntervalSec(int64_t timeIntervalSec) { _delObsoleteFileInterval += timeIntervalSec * 1000 * 1000; } int64_t BrokerConfig::getDelObsoleteFileInterval() const { return _delObsoleteFileInterval; } void BrokerConfig::setCandidateObsoleteFileIntervalSec(int64_t timeIntervalSec) { _candidateObsoleteFileInterval = timeIntervalSec * 1000 * 1000; } int64_t BrokerConfig::getCandidateObsoleteFileInterval() const { return _candidateObsoleteFileInterval; } void BrokerConfig::setRequestTimeoutSec(int64_t timeoutSec) { _requestTimeout = (int64_t)timeoutSec * 1000 * 1000; } int64_t BrokerConfig::getRequestTimeout() const { return _requestTimeout; } void BrokerConfig::setOneFileFdNum(uint32_t fdNum) { _oneFileFdNum = fdNum; } uint32_t BrokerConfig::getOneFileFdNum() const { return _oneFileFdNum; } void BrokerConfig::setCacheFileReserveTime(int64_t timeSec) { _cacheFileReserveTime = timeSec; } int64_t BrokerConfig::getCacheFileReserveTime() const { return _cacheFileReserveTime; } void BrokerConfig::setCacheBlockReserveTime(int64_t timeSec) { _cacheBlockReserveTime = timeSec; } int64_t BrokerConfig::getCacheBlockReserveTime() const { return _cacheBlockReserveTime; } void BrokerConfig::setObsoleteReaderIntervalSec(int64_t timeIntervalSec) { _obsoleteReaderInterval = timeIntervalSec * 1000 * 1000; } int64_t BrokerConfig::getObsoleteReaderInterval() const { return _obsoleteReaderInterval; } void BrokerConfig::setObsoleteReaderMetricIntervalSec(int64_t timeIntervalSec) { _obsoleteReaderMetricInterval = timeIntervalSec * 1000 * 1000; } int64_t BrokerConfig::getObsoleteReaderMetricInterval() const { return _obsoleteReaderMetricInterval; } void BrokerConfig::setCommitThreadLoopIntervalMs(int32_t intervalMs) { commitThreadLoopIntervalMs = intervalMs; } int32_t BrokerConfig::getCommitThreadLoopIntervalMs() const { return commitThreadLoopIntervalMs; } void BrokerConfig::setLogSampleCount(uint32_t count) { _logSampleCount = count; } uint32_t BrokerConfig::getLogSampleCount() const { return _logSampleCount; } void BrokerConfig::setCloseForceLog(bool flag) { _closeForceLog = flag; } bool BrokerConfig::getCloseForceLog() const { return _closeForceLog; } void BrokerConfig::setMaxGetMessageSizeKb(size_t sizeKb) { _maxGetMessageSizeKb = sizeKb; } size_t BrokerConfig::getMaxGetMessageSizeKb() const { return _maxGetMessageSizeKb; } void BrokerConfig::setHoldNoDataRequestTimeMs(size_t timeMs) { _holdNoDataRequestTimeMs = timeMs; } size_t BrokerConfig::getHoldNoDataRequestTimeMs() const { return _holdNoDataRequestTimeMs; } void BrokerConfig::setNoDataRequestNotfiyIntervalMs(size_t timeMs) { _noDataRequestNotfiyIntervalMs = timeMs; } size_t BrokerConfig::getNoDataRequestNotfiyIntervalMs() const { return _noDataRequestNotfiyIntervalMs; } void BrokerConfig::setMaxReadSizeSec(int32_t sizeMB) { _maxReadSizeSec = sizeMB; } int32_t BrokerConfig::getMaxReadSizeSec() const { return _maxReadSizeSec; } void BrokerConfig::setTimestampOffsetInUs(int64_t offset) { _timestampOffsetInUs = offset; } int64_t BrokerConfig::getTimestampOffsetInUs() const { return _timestampOffsetInUs; } size_t BrokerConfig::getPhyMemSize() { struct sysinfo s_info; int error = 0; (void)(error); error = sysinfo(&s_info); assert(error == 0); return s_info.totalram; } std::string BrokerConfig::getConfigStr() { std::ostringstream oss; oss << "zkRoot:" << zkRoot << " userName:" << userName << " serviceName:" << serviceName << " dfsRoot:" << dfsRoot << " queueSize:" << queueSize << " maxReadThreadNum:" << maxReadThreadNum << " maxWriteThreadNum:" << maxWriteThreadNum << " cacheMetaCount:" << cacheMetaCount << " concurrentReadFileLimit:" << concurrentReadFileLimit << " maxPermissionWaitTime:" << maxPermissionWaitTime << " maxMessageCountInOneFile:" << maxMessageCountInOneFile << " reservedFileCount:" << reservedFileCount << " fileBufferUseRatio:" << fileBufferUseRatio << " fileMetaBufferUseRatio:" << fileMetaBufferUseRatio << " commitThreadLoopIntervalMs:" << commitThreadLoopIntervalMs << " dfsCommitInterval:" << dfsCommitInterval << " dfsCommitIntervalForMemoryPreferTopic:" << dfsCommitIntervalForMemoryPreferTopic << " dfsCommitIntervalWhenDelay:" << dfsCommitIntervalWhenDelay << " dfsCommitThreadNum:" << dfsCommitThreadNum << " dfsCommitQueueSize:" << dfsCommitQueueSize << " dfsFileSplitTime:" << dfsFileSplitTime << " closeNoWriteFileTime:" << closeNoWriteFileTime << " minChangeFileForDfsErrorTime:" << minChangeFileForDfsErrorTime << " maxCommitTimeAsError:" << maxCommitTimeAsError << " leaderLeaseTime:" << leaderLeaseTime << " leaderLoopInterval:" << leaderLoopInterval << " loadThreadNum:" << loadThreadNum << " loadQueueSize:" << loadQueueSize << " unloadThreadNum:" << unloadThreadNum << " unloadQueueSize:" << unloadQueueSize << " recyclePercent:" << recyclePercent << " checkFieldFilterMsg:" << checkFieldFilterMsg << " httpThreadNum:" << httpThreadNum << " httpQueueSize:" << httpQueueSize << " readQueueSize:" << readQueueSize << " statusReportIntervalSec:" << statusReportIntervalSec << " threadNum:" << _threadNum << " enableFastRecover:" << enableFastRecover << " partitionMinBufferSize:" << _partitionMinBufferSize << " partitionMaxBufferSize:" << _partitionMaxBufferSize << " totalBufferSize:" << _totalBufferSize << " bufferBlockSize:" << _bufferBlockSize << " bufferMinReserveSize:" << _bufferMinReserveSize << " dfsCommitBlockSize:" << _dfsCommitBlockSize << " dfsFileSize:" << _dfsFileSize << " dfsFileMinSize:" << _dfsFileMinSize << " obsoleteFileTimeInterval:" << _obsoleteFileTimeInterval << " delObsoleteFileInterval:" << _delObsoleteFileInterval << " randomDelObsoleteFileInterval:" << randomDelObsoleteFileInterval << " candidateObsoleteFileInterval:" << _candidateObsoleteFileInterval << " requestTimeout:" << _requestTimeout << " oneFileFdNum:" << _oneFileFdNum << " cacheFileReserveTime:" << _cacheFileReserveTime << " cacheBlockReserveTime:" << _cacheBlockReserveTime << " obsoleteReaderInterval:" << _obsoleteReaderInterval << " obsoleteReaderMetricInterval:" << _obsoleteReaderMetricInterval << " maxReadSizeSec:" << _maxReadSizeSec << " configUnlimited:" << _configUnlimited << " localMode:" << localMode << " mirrorZkRoot:" << mirrorZkRoot << " readNotCommittedMsg:" << readNotCommittedMsg << " timestampOffsetInUs:" << _timestampOffsetInUs << " maxTopicAclSyncIntervalUs:" << maxTopicAclSyncIntervalUs; return oss.str(); } } // namespace config } // namespace swift