ErrorCode MessageBrain::init()

in aios/apps/facility/swift/broker/storage/MessageBrain.cpp [128:307]


ErrorCode MessageBrain::init(const config::PartitionConfig &config,
                             BlockPool *centerPool,
                             BlockPool *fileCachePool,
                             PermissionCenter *permissionCenter,
                             BrokerMetricsReporter *metricsReporter,
                             const ThreadSafeTaskStatusPtr &taskStatus,
                             int32_t oneFileFdNum,
                             int64_t fileReserveTime,
                             bool enableFastRecover) {
    _taskStatus = taskStatus;
    _metricsReporter = metricsReporter;
    _permissionCenter = permissionCenter;
    _minBufferSize = config.getPartitionMinBufferSize();
    _maxBufferSize = config.getPartitionMaxBufferSize();
    _blockSize = config.getBlockSize();
    _needFieldFilter = config.needFieldFilter();
    _topicMode = config.getTopicMode();
    _compressMsg = config.compressMsg();
    _compressThres = config.getCompressThres();
    _maxCommitIntervalForMemoryPrefer = config.getMaxCommitIntervalForMemoryPreferTopic();
    _maxCommitIntervalWhenDelay = config.getMaxCommitIntervalWhenDelay();
    _recyclePercent = config.getRecyclePercent();
    _obsoleteReaderInterval = config.getObsoleteReaderInterval();
    _obsoleteReaderMetricInterval = config.getObsoleteReaderMetricInterval();
    _maxReadSizeSec = config.getMaxReadSizeSec();
    _enableFastRecover = enableFastRecover;
    util::InlineVersion inlineVersion;
    if (_taskStatus) {
        _partitionId = _taskStatus->getPartitionId();
        inlineVersion.fromProto(_partitionId.inlineversion());
        _metricsTags.reset(new kmonitor::MetricsTags());
        _metricsTags->AddTag("topic", _partitionId.topicname());
        _metricsTags->AddTag("partition", intToString(_partitionId.id()));
        _metricsTags->AddTag("access_id", DEFAULT_METRIC_ACCESSID);
    }
    _checkFieldFilterMsg = config.checkFieldFilterMsg();
    int64_t obsoleteFileTimeInterval = config.getObsoleteFileTimeInterval();
    int32_t reservedFileCount = config.getReservedFileCount();
    int64_t delObsoleteFileInterval = config.getDelObsoleteFileInterval();
    int64_t candidateObsoleteFileInterval = config.getCandidateObsoleteFileInterval();
    int64_t maxMessageCountInOneFile = config.getMaxMessageCountInOneFile();
    uint32_t cacheMetaCount = config.getCacheMetaCount();
    std::string dataDir = config.getDataRoot();
    AUTIL_LOG(INFO,
              "partition[%s], minBufferSize[%ld], maxBufferSize[%ld], block size[%ld], "
              "max message length[%ld], needFieldFilter[%d], topicMode[%d], "
              "obsoleteFileTimeInterval[%ld], reservedFileCount[%d], "
              "delObsoleteFileTimeInterval[%ld], candidateObsoleteFileInterval[%ld] "
              "maxMessageCountInOneFile[%ld], cacheMetaCount[%d], compressMsg[%d], "
              "compressThres[%d], dir root[%s], checkFieldFilterMsg[%d], maxReadSizeSec[%ld], "
              "enableFastRecover[%d], timestampOffset[%ld], inlineVersion %s",
              _partitionId.ShortDebugString().c_str(),
              _minBufferSize,
              _maxBufferSize,
              _blockSize,
              _maxMessageSize,
              _needFieldFilter,
              _topicMode,
              obsoleteFileTimeInterval,
              reservedFileCount,
              delObsoleteFileInterval,
              candidateObsoleteFileInterval,
              maxMessageCountInOneFile,
              cacheMetaCount,
              (int)_compressMsg,
              _compressThres,
              dataDir.c_str(),
              _checkFieldFilterMsg,
              _maxReadSizeSec,
              _enableFastRecover,
              config.getTimestampOffsetInUs(),
              inlineVersion.toDebugString().c_str());
    if (centerPool) {
        _partitionBlockPool = new BlockPool(centerPool, _maxBufferSize, _minBufferSize);
    } else {
        _partitionBlockPool = new BlockPool(_maxBufferSize, _blockSize);
    }
    _flowCtrol->setMaxReadSize(_maxReadSizeSec);
    ErrorCode errorCode;
    // if dataDir is not configured, then we will use memory only.
    if (dataDir.empty()) {
        if (TOPIC_MODE_MEMORY_ONLY != _topicMode) {
            AUTIL_LOG(ERROR,
                      "dataDir should not be empty, "
                      "when topic mode is not  TOPIC_MODE_MEMORY_ONLY");
            return ERROR_BROKER_DATA_DIR_EMPTY;
        }
        AUTIL_LOG(WARN, "dataDir is empty, use memory only and not write any files");
        _messageGroup = new MessageGroup();
        errorCode = _messageGroup->init(
            _partitionId, _partitionBlockPool, NULL, _topicMode, true, config.getTimestampOffsetInUs());
        if (errorCode != ERROR_NONE) {
            AUTIL_LOG(ERROR,
                      "partition [%s] init message group failed,errorcode[%d]",
                      _partitionId.ShortDebugString().c_str(),
                      errorCode);
            return errorCode;
        }
    } else {
        StageTime stageTime;
        _fileManager = new FileManager();
        ObsoleteFileCriterion obsoleteFileCriterion;
        obsoleteFileCriterion.obsoleteFileTimeInterval = obsoleteFileTimeInterval;
        obsoleteFileCriterion.reservedFileCount = reservedFileCount;
        obsoleteFileCriterion.delObsoleteFileInterval = delObsoleteFileInterval;
        obsoleteFileCriterion.candidateObsoleteFileInterval = candidateObsoleteFileInterval;
        errorCode = _fileManager->init(
            dataDir, config.getExtendDataRoots(), obsoleteFileCriterion, inlineVersion, _enableFastRecover);
        if (errorCode != ERROR_NONE) {
            AUTIL_LOG(ERROR,
                      "partition [%s] init filemanager failed, errorcode[%d], "
                      "datadir[%s]",
                      _partitionId.ShortDebugString().c_str(),
                      errorCode,
                      dataDir.c_str());
            return errorCode;
        }
        stageTime.end_stage();
        if (_metricsReporter) {
            _metricsReporter->reportInitFileManagerLatency(stageTime.last_us(), _metricsTags.get());
        }

        _fsMessageReader = new FsMessageReader(_partitionId,
                                               _fileManager,
                                               fileCachePool,
                                               oneFileFdNum,
                                               fileReserveTime,
                                               _permissionCenter,
                                               _metricsReporter);
        _messageCommitter =
            new MessageCommitter(_partitionId, config, _fileManager, _metricsReporter, _enableFastRecover);
        _messageGroup = new MessageGroup();
        if (_topicMode == TOPIC_MODE_MEMORY_PREFER || _topicMode == TOPIC_MODE_PERSIST_DATA) {
            _commitManager = new CommitManager(_messageGroup, _partitionId.from(), _partitionId.to());
        }
        // recover lastest message
        MessageResponse lastMsgs;
        errorCode = _fsMessageReader->getLastMessage(&lastMsgs);
        if (errorCode != ERROR_NONE) {
            AUTIL_LOG(ERROR,
                      "partition [%s] recover msgs for fsMessageReader failed,"
                      "errorcode[%d]",
                      _partitionId.ShortDebugString().c_str(),
                      errorCode);
            return errorCode;
        }
        errorCode = _messageGroup->init(_partitionId,
                                        _partitionBlockPool,
                                        &lastMsgs,
                                        _topicMode,
                                        config.readNotCommittedMsg(),
                                        config.getTimestampOffsetInUs());
        if (errorCode != ERROR_NONE) {
            AUTIL_LOG(ERROR,
                      "init message group failed,"
                      "errorcode[%d]",
                      errorCode);
            return errorCode;
        }
    }
    if (TOPIC_MODE_SECURITY == _topicMode) {
        _maxWaitTimeForSecurityCommit = config.getMaxWaitTimeForSecurityCommit();
        _maxDataSizeForSecurityCommit = config.getMaxDataSizeForSecurityCommit();
        _backGroundThreadForSecurityMode =
            Thread::createThread(bind(&MessageBrain::backgroundCommitForSecurityMode, this), "sec_back_comm");
        if (!_backGroundThreadForSecurityMode) {
            AUTIL_LOG(ERROR,
                      "partition [%s] create backgroud thread for security mode failed",
                      _partitionId.ShortDebugString().c_str());
            return ERROR_BROKER_CREATE_THREAD;
        }
        AUTIL_LOG(INFO,
                  "security partition [%s] commit data param: %ld us, %ld byte",
                  _partitionId.ShortDebugString().c_str(),
                  _maxWaitTimeForSecurityCommit,
                  _maxDataSizeForSecurityCommit);
    }
    AUTIL_LOG(INFO, "partition [%s] init message brain success.", _partitionId.ShortDebugString().c_str());
    return ERROR_NONE;
}