in aios/apps/facility/swift/broker/storage/MessageBrain.cpp [128:307]
ErrorCode MessageBrain::init(const config::PartitionConfig &config,
BlockPool *centerPool,
BlockPool *fileCachePool,
PermissionCenter *permissionCenter,
BrokerMetricsReporter *metricsReporter,
const ThreadSafeTaskStatusPtr &taskStatus,
int32_t oneFileFdNum,
int64_t fileReserveTime,
bool enableFastRecover) {
_taskStatus = taskStatus;
_metricsReporter = metricsReporter;
_permissionCenter = permissionCenter;
_minBufferSize = config.getPartitionMinBufferSize();
_maxBufferSize = config.getPartitionMaxBufferSize();
_blockSize = config.getBlockSize();
_needFieldFilter = config.needFieldFilter();
_topicMode = config.getTopicMode();
_compressMsg = config.compressMsg();
_compressThres = config.getCompressThres();
_maxCommitIntervalForMemoryPrefer = config.getMaxCommitIntervalForMemoryPreferTopic();
_maxCommitIntervalWhenDelay = config.getMaxCommitIntervalWhenDelay();
_recyclePercent = config.getRecyclePercent();
_obsoleteReaderInterval = config.getObsoleteReaderInterval();
_obsoleteReaderMetricInterval = config.getObsoleteReaderMetricInterval();
_maxReadSizeSec = config.getMaxReadSizeSec();
_enableFastRecover = enableFastRecover;
util::InlineVersion inlineVersion;
if (_taskStatus) {
_partitionId = _taskStatus->getPartitionId();
inlineVersion.fromProto(_partitionId.inlineversion());
_metricsTags.reset(new kmonitor::MetricsTags());
_metricsTags->AddTag("topic", _partitionId.topicname());
_metricsTags->AddTag("partition", intToString(_partitionId.id()));
_metricsTags->AddTag("access_id", DEFAULT_METRIC_ACCESSID);
}
_checkFieldFilterMsg = config.checkFieldFilterMsg();
int64_t obsoleteFileTimeInterval = config.getObsoleteFileTimeInterval();
int32_t reservedFileCount = config.getReservedFileCount();
int64_t delObsoleteFileInterval = config.getDelObsoleteFileInterval();
int64_t candidateObsoleteFileInterval = config.getCandidateObsoleteFileInterval();
int64_t maxMessageCountInOneFile = config.getMaxMessageCountInOneFile();
uint32_t cacheMetaCount = config.getCacheMetaCount();
std::string dataDir = config.getDataRoot();
AUTIL_LOG(INFO,
"partition[%s], minBufferSize[%ld], maxBufferSize[%ld], block size[%ld], "
"max message length[%ld], needFieldFilter[%d], topicMode[%d], "
"obsoleteFileTimeInterval[%ld], reservedFileCount[%d], "
"delObsoleteFileTimeInterval[%ld], candidateObsoleteFileInterval[%ld] "
"maxMessageCountInOneFile[%ld], cacheMetaCount[%d], compressMsg[%d], "
"compressThres[%d], dir root[%s], checkFieldFilterMsg[%d], maxReadSizeSec[%ld], "
"enableFastRecover[%d], timestampOffset[%ld], inlineVersion %s",
_partitionId.ShortDebugString().c_str(),
_minBufferSize,
_maxBufferSize,
_blockSize,
_maxMessageSize,
_needFieldFilter,
_topicMode,
obsoleteFileTimeInterval,
reservedFileCount,
delObsoleteFileInterval,
candidateObsoleteFileInterval,
maxMessageCountInOneFile,
cacheMetaCount,
(int)_compressMsg,
_compressThres,
dataDir.c_str(),
_checkFieldFilterMsg,
_maxReadSizeSec,
_enableFastRecover,
config.getTimestampOffsetInUs(),
inlineVersion.toDebugString().c_str());
if (centerPool) {
_partitionBlockPool = new BlockPool(centerPool, _maxBufferSize, _minBufferSize);
} else {
_partitionBlockPool = new BlockPool(_maxBufferSize, _blockSize);
}
_flowCtrol->setMaxReadSize(_maxReadSizeSec);
ErrorCode errorCode;
// if dataDir is not configured, then we will use memory only.
if (dataDir.empty()) {
if (TOPIC_MODE_MEMORY_ONLY != _topicMode) {
AUTIL_LOG(ERROR,
"dataDir should not be empty, "
"when topic mode is not TOPIC_MODE_MEMORY_ONLY");
return ERROR_BROKER_DATA_DIR_EMPTY;
}
AUTIL_LOG(WARN, "dataDir is empty, use memory only and not write any files");
_messageGroup = new MessageGroup();
errorCode = _messageGroup->init(
_partitionId, _partitionBlockPool, NULL, _topicMode, true, config.getTimestampOffsetInUs());
if (errorCode != ERROR_NONE) {
AUTIL_LOG(ERROR,
"partition [%s] init message group failed,errorcode[%d]",
_partitionId.ShortDebugString().c_str(),
errorCode);
return errorCode;
}
} else {
StageTime stageTime;
_fileManager = new FileManager();
ObsoleteFileCriterion obsoleteFileCriterion;
obsoleteFileCriterion.obsoleteFileTimeInterval = obsoleteFileTimeInterval;
obsoleteFileCriterion.reservedFileCount = reservedFileCount;
obsoleteFileCriterion.delObsoleteFileInterval = delObsoleteFileInterval;
obsoleteFileCriterion.candidateObsoleteFileInterval = candidateObsoleteFileInterval;
errorCode = _fileManager->init(
dataDir, config.getExtendDataRoots(), obsoleteFileCriterion, inlineVersion, _enableFastRecover);
if (errorCode != ERROR_NONE) {
AUTIL_LOG(ERROR,
"partition [%s] init filemanager failed, errorcode[%d], "
"datadir[%s]",
_partitionId.ShortDebugString().c_str(),
errorCode,
dataDir.c_str());
return errorCode;
}
stageTime.end_stage();
if (_metricsReporter) {
_metricsReporter->reportInitFileManagerLatency(stageTime.last_us(), _metricsTags.get());
}
_fsMessageReader = new FsMessageReader(_partitionId,
_fileManager,
fileCachePool,
oneFileFdNum,
fileReserveTime,
_permissionCenter,
_metricsReporter);
_messageCommitter =
new MessageCommitter(_partitionId, config, _fileManager, _metricsReporter, _enableFastRecover);
_messageGroup = new MessageGroup();
if (_topicMode == TOPIC_MODE_MEMORY_PREFER || _topicMode == TOPIC_MODE_PERSIST_DATA) {
_commitManager = new CommitManager(_messageGroup, _partitionId.from(), _partitionId.to());
}
// recover lastest message
MessageResponse lastMsgs;
errorCode = _fsMessageReader->getLastMessage(&lastMsgs);
if (errorCode != ERROR_NONE) {
AUTIL_LOG(ERROR,
"partition [%s] recover msgs for fsMessageReader failed,"
"errorcode[%d]",
_partitionId.ShortDebugString().c_str(),
errorCode);
return errorCode;
}
errorCode = _messageGroup->init(_partitionId,
_partitionBlockPool,
&lastMsgs,
_topicMode,
config.readNotCommittedMsg(),
config.getTimestampOffsetInUs());
if (errorCode != ERROR_NONE) {
AUTIL_LOG(ERROR,
"init message group failed,"
"errorcode[%d]",
errorCode);
return errorCode;
}
}
if (TOPIC_MODE_SECURITY == _topicMode) {
_maxWaitTimeForSecurityCommit = config.getMaxWaitTimeForSecurityCommit();
_maxDataSizeForSecurityCommit = config.getMaxDataSizeForSecurityCommit();
_backGroundThreadForSecurityMode =
Thread::createThread(bind(&MessageBrain::backgroundCommitForSecurityMode, this), "sec_back_comm");
if (!_backGroundThreadForSecurityMode) {
AUTIL_LOG(ERROR,
"partition [%s] create backgroud thread for security mode failed",
_partitionId.ShortDebugString().c_str());
return ERROR_BROKER_CREATE_THREAD;
}
AUTIL_LOG(INFO,
"security partition [%s] commit data param: %ld us, %ld byte",
_partitionId.ShortDebugString().c_str(),
_maxWaitTimeForSecurityCommit,
_maxDataSizeForSecurityCommit);
}
AUTIL_LOG(INFO, "partition [%s] init message brain success.", _partitionId.ShortDebugString().c_str());
return ERROR_NONE;
}