aios/apps/facility/swift/broker/storage/MessageBrain.cpp (1,315 lines of code) (raw):
/*
* Copyright 2014-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "swift/broker/storage/MessageBrain.h"
#include <assert.h>
#include <cmath>
#include <flatbuffers/flatbuffers.h>
#include <functional>
#include <iosfwd>
#include <map>
#include <memory>
#include <unistd.h>
#include <utility>
#include "autil/CommonMacros.h"
#include "autil/TimeUtility.h"
#include "kmonitor/client/core/MetricsTags.h"
#include "swift/broker/storage/BrokerResponseError.h"
#include "swift/broker/storage/CommitManager.h" // IWYU pragma: keep
#include "swift/broker/storage/FlowControl.h"
#include "swift/broker/storage_dfs/FileManager.h"
#include "swift/broker/storage_dfs/FsMessageReader.h"
#include "swift/broker/storage_dfs/MessageCommitter.h" // IWYU pragma: keep
#include "swift/common/FieldGroupReader.h"
#include "swift/common/MemoryMessage.h"
#include "swift/config/ConfigDefine.h"
#include "swift/config/PartitionConfig.h"
#include "swift/filter/DescFieldFilter.h"
#include "swift/filter/FieldFilter.h"
#include "swift/log/BrokerLogClosure.h"
#include "swift/monitor/BrokerMetricsReporter.h"
#include "swift/monitor/MetricsCommon.h"
#include "swift/monitor/TimeTrigger.h"
#include "swift/protocol/BrokerRequestResponse.pb.h"
#include "swift/protocol/Common.pb.h"
#include "swift/protocol/ErrCode.pb.h"
#include "swift/protocol/FBMessageReader.h"
#include "swift/protocol/MessageCompressor.h"
#include "swift/protocol/SwiftMessage.pb.h"
#include "swift/protocol/SwiftMessage_generated.h"
#include "swift/util/Block.h"
#include "swift/util/BlockPool.h"
#include "swift/util/PanguInlineFileUtil.h"
#include "swift/util/Permission.h"
#include "swift/util/PermissionCenter.h"
using namespace std;
using namespace autil;
using namespace swift::config;
using namespace swift::protocol;
using namespace swift::filter;
using namespace swift::common;
using namespace swift::heartbeat;
using namespace swift::service;
using namespace swift::monitor;
using namespace swift::util;
namespace swift {
namespace storage {
AUTIL_LOG_SETUP(swift, MessageBrain);
MessageBrain::MessageBrain(int64_t sessionId) {
_partitionBlockPool = NULL;
_fileManager = NULL;
_fsMessageReader = NULL;
_messageGroup = NULL;
_messageCommitter = NULL;
_metricsReporter = NULL;
_permissionCenter = NULL;
_maxMessageSize = common::MAX_MESSAGE_SIZE;
_minBufferSize = 128 << 20; // 128M
_maxBufferSize = 1 << 30; // 1G
_blockSize = 1 << 20; // 1M
_maxDataSizeForSecurityCommit = 1 << 20; // 1M
_maxWaitTimeForSecurityCommit = 50 * 1000; // 50ms
_maxCommitIntervalForMemoryPrefer = config::DEFAULT_DFS_COMMIT_INTERVAL_FOR_MEMORY_PREFER_TOPIC;
_isStopped = false;
_topicMode = TOPIC_MODE_NORMAL;
_compressMsg = false;
_compressThres = 0;
_sessionId = sessionId;
_commitManager = NULL;
_recyclePercent = 0.1;
_checkFieldFilterMsg = true;
_forceUnload = false;
_obsoleteReaderInterval = config::DEFAULT_OBSOLETE_READER_INTERVAL;
_obsoleteReaderMetricInterval = config::DEFAULT_OBSOLETE_METRIC_INTERVAL;
_lastReadMsgTimeStamp = TimeUtility::currentTime();
_lastReadTime = 0;
_innerPartMetric = NULL;
_maxReadSizeSec = DEFAULT_MAX_READ_SIZE_MB_SEC << 20;
_flowCtrol = new FlowControl(_maxReadSizeSec, _maxReadSizeSec);
_enableFastRecover = false;
}
MessageBrain::~MessageBrain() {
stopSecurityModeThread();
if (!_forceUnload) {
if (TOPIC_MODE_NORMAL == _topicMode && _messageCommitter != NULL && _messageGroup != NULL) {
commitAllMessage();
}
} else {
AUTIL_LOG(INFO, "[%s] force unload", _partitionId.ShortDebugString().c_str());
}
DELETE_AND_SET_NULL(_commitManager);
DELETE_AND_SET_NULL(_messageCommitter);
DELETE_AND_SET_NULL(_messageGroup);
DELETE_AND_SET_NULL(_fsMessageReader);
DELETE_AND_SET_NULL(_fileManager);
DELETE_AND_SET_NULL(_partitionBlockPool);
DELETE_AND_SET_NULL(_innerPartMetric);
DELETE_AND_SET_NULL(_flowCtrol);
}
ErrorCode MessageBrain::init(const config::PartitionConfig &config,
BlockPool *centerPool,
BlockPool *fileCachePool,
PermissionCenter *permissionCenter,
BrokerMetricsReporter *metricsReporter,
const ThreadSafeTaskStatusPtr &taskStatus,
int32_t oneFileFdNum,
int64_t fileReserveTime,
bool enableFastRecover) {
_taskStatus = taskStatus;
_metricsReporter = metricsReporter;
_permissionCenter = permissionCenter;
_minBufferSize = config.getPartitionMinBufferSize();
_maxBufferSize = config.getPartitionMaxBufferSize();
_blockSize = config.getBlockSize();
_needFieldFilter = config.needFieldFilter();
_topicMode = config.getTopicMode();
_compressMsg = config.compressMsg();
_compressThres = config.getCompressThres();
_maxCommitIntervalForMemoryPrefer = config.getMaxCommitIntervalForMemoryPreferTopic();
_maxCommitIntervalWhenDelay = config.getMaxCommitIntervalWhenDelay();
_recyclePercent = config.getRecyclePercent();
_obsoleteReaderInterval = config.getObsoleteReaderInterval();
_obsoleteReaderMetricInterval = config.getObsoleteReaderMetricInterval();
_maxReadSizeSec = config.getMaxReadSizeSec();
_enableFastRecover = enableFastRecover;
util::InlineVersion inlineVersion;
if (_taskStatus) {
_partitionId = _taskStatus->getPartitionId();
inlineVersion.fromProto(_partitionId.inlineversion());
_metricsTags.reset(new kmonitor::MetricsTags());
_metricsTags->AddTag("topic", _partitionId.topicname());
_metricsTags->AddTag("partition", intToString(_partitionId.id()));
_metricsTags->AddTag("access_id", DEFAULT_METRIC_ACCESSID);
}
_checkFieldFilterMsg = config.checkFieldFilterMsg();
int64_t obsoleteFileTimeInterval = config.getObsoleteFileTimeInterval();
int32_t reservedFileCount = config.getReservedFileCount();
int64_t delObsoleteFileInterval = config.getDelObsoleteFileInterval();
int64_t candidateObsoleteFileInterval = config.getCandidateObsoleteFileInterval();
int64_t maxMessageCountInOneFile = config.getMaxMessageCountInOneFile();
uint32_t cacheMetaCount = config.getCacheMetaCount();
std::string dataDir = config.getDataRoot();
AUTIL_LOG(INFO,
"partition[%s], minBufferSize[%ld], maxBufferSize[%ld], block size[%ld], "
"max message length[%ld], needFieldFilter[%d], topicMode[%d], "
"obsoleteFileTimeInterval[%ld], reservedFileCount[%d], "
"delObsoleteFileTimeInterval[%ld], candidateObsoleteFileInterval[%ld] "
"maxMessageCountInOneFile[%ld], cacheMetaCount[%d], compressMsg[%d], "
"compressThres[%d], dir root[%s], checkFieldFilterMsg[%d], maxReadSizeSec[%ld], "
"enableFastRecover[%d], timestampOffset[%ld], inlineVersion %s",
_partitionId.ShortDebugString().c_str(),
_minBufferSize,
_maxBufferSize,
_blockSize,
_maxMessageSize,
_needFieldFilter,
_topicMode,
obsoleteFileTimeInterval,
reservedFileCount,
delObsoleteFileInterval,
candidateObsoleteFileInterval,
maxMessageCountInOneFile,
cacheMetaCount,
(int)_compressMsg,
_compressThres,
dataDir.c_str(),
_checkFieldFilterMsg,
_maxReadSizeSec,
_enableFastRecover,
config.getTimestampOffsetInUs(),
inlineVersion.toDebugString().c_str());
if (centerPool) {
_partitionBlockPool = new BlockPool(centerPool, _maxBufferSize, _minBufferSize);
} else {
_partitionBlockPool = new BlockPool(_maxBufferSize, _blockSize);
}
_flowCtrol->setMaxReadSize(_maxReadSizeSec);
ErrorCode errorCode;
// if dataDir is not configured, then we will use memory only.
if (dataDir.empty()) {
if (TOPIC_MODE_MEMORY_ONLY != _topicMode) {
AUTIL_LOG(ERROR,
"dataDir should not be empty, "
"when topic mode is not TOPIC_MODE_MEMORY_ONLY");
return ERROR_BROKER_DATA_DIR_EMPTY;
}
AUTIL_LOG(WARN, "dataDir is empty, use memory only and not write any files");
_messageGroup = new MessageGroup();
errorCode = _messageGroup->init(
_partitionId, _partitionBlockPool, NULL, _topicMode, true, config.getTimestampOffsetInUs());
if (errorCode != ERROR_NONE) {
AUTIL_LOG(ERROR,
"partition [%s] init message group failed,errorcode[%d]",
_partitionId.ShortDebugString().c_str(),
errorCode);
return errorCode;
}
} else {
StageTime stageTime;
_fileManager = new FileManager();
ObsoleteFileCriterion obsoleteFileCriterion;
obsoleteFileCriterion.obsoleteFileTimeInterval = obsoleteFileTimeInterval;
obsoleteFileCriterion.reservedFileCount = reservedFileCount;
obsoleteFileCriterion.delObsoleteFileInterval = delObsoleteFileInterval;
obsoleteFileCriterion.candidateObsoleteFileInterval = candidateObsoleteFileInterval;
errorCode = _fileManager->init(
dataDir, config.getExtendDataRoots(), obsoleteFileCriterion, inlineVersion, _enableFastRecover);
if (errorCode != ERROR_NONE) {
AUTIL_LOG(ERROR,
"partition [%s] init filemanager failed, errorcode[%d], "
"datadir[%s]",
_partitionId.ShortDebugString().c_str(),
errorCode,
dataDir.c_str());
return errorCode;
}
stageTime.end_stage();
if (_metricsReporter) {
_metricsReporter->reportInitFileManagerLatency(stageTime.last_us(), _metricsTags.get());
}
_fsMessageReader = new FsMessageReader(_partitionId,
_fileManager,
fileCachePool,
oneFileFdNum,
fileReserveTime,
_permissionCenter,
_metricsReporter);
_messageCommitter =
new MessageCommitter(_partitionId, config, _fileManager, _metricsReporter, _enableFastRecover);
_messageGroup = new MessageGroup();
if (_topicMode == TOPIC_MODE_MEMORY_PREFER || _topicMode == TOPIC_MODE_PERSIST_DATA) {
_commitManager = new CommitManager(_messageGroup, _partitionId.from(), _partitionId.to());
}
// recover lastest message
MessageResponse lastMsgs;
errorCode = _fsMessageReader->getLastMessage(&lastMsgs);
if (errorCode != ERROR_NONE) {
AUTIL_LOG(ERROR,
"partition [%s] recover msgs for fsMessageReader failed,"
"errorcode[%d]",
_partitionId.ShortDebugString().c_str(),
errorCode);
return errorCode;
}
errorCode = _messageGroup->init(_partitionId,
_partitionBlockPool,
&lastMsgs,
_topicMode,
config.readNotCommittedMsg(),
config.getTimestampOffsetInUs());
if (errorCode != ERROR_NONE) {
AUTIL_LOG(ERROR,
"init message group failed,"
"errorcode[%d]",
errorCode);
return errorCode;
}
}
if (TOPIC_MODE_SECURITY == _topicMode) {
_maxWaitTimeForSecurityCommit = config.getMaxWaitTimeForSecurityCommit();
_maxDataSizeForSecurityCommit = config.getMaxDataSizeForSecurityCommit();
_backGroundThreadForSecurityMode =
Thread::createThread(bind(&MessageBrain::backgroundCommitForSecurityMode, this), "sec_back_comm");
if (!_backGroundThreadForSecurityMode) {
AUTIL_LOG(ERROR,
"partition [%s] create backgroud thread for security mode failed",
_partitionId.ShortDebugString().c_str());
return ERROR_BROKER_CREATE_THREAD;
}
AUTIL_LOG(INFO,
"security partition [%s] commit data param: %ld us, %ld byte",
_partitionId.ShortDebugString().c_str(),
_maxWaitTimeForSecurityCommit,
_maxDataSizeForSecurityCommit);
}
AUTIL_LOG(INFO, "partition [%s] init message brain success.", _partitionId.ShortDebugString().c_str());
return ERROR_NONE;
}
void MessageBrain::stopSecurityModeThread() {
if (!_isStopped) {
_isStopped = true;
{
autil::ScopedLock lock(_writeRequestCond);
_writeRequestCond.signal();
}
_backGroundThreadForSecurityMode.reset();
auto leftItemCount = _writeRequestItemQueue.getQueueSize();
if (leftItemCount > 0) {
AUTIL_LOG(INFO,
"partition[%s] still has [%d] item not processed.",
_partitionId.ShortDebugString().c_str(),
leftItemCount);
commitForSecurityMode();
AUTIL_LOG(INFO,
"partition[%s] after commit left item is [%d]",
_partitionId.ShortDebugString().c_str(),
_writeRequestItemQueue.getQueueSize());
}
}
}
void MessageBrain::commitForSecurityMode() {
while (_writeRequestItemQueue.getQueueSize() > 0) {
vector<WriteRequestItem *> requestItemVec;
vector<WriteMetricsCollectorPtr> collectors;
_writeRequestItemQueue.popRequestItem(requestItemVec, WriteRequestItemQueue::DEFAULT_REQUESTITEM_LIMIT);
checkTimeOutWriteRequest(requestItemVec);
addWriteRequestToMemory(requestItemVec);
commitAllMessage();
WriteRequestItem *requestItem = NULL;
for (size_t i = 0; i < requestItemVec.size(); ++i) {
requestItem = requestItemVec[i];
if (requestItem) {
doneRunForProductRequest(requestItem->closure, requestItem->collector);
DELETE_AND_SET_NULL(requestItem);
}
}
requestItemVec.clear();
}
}
void MessageBrain::backgroundCommitForSecurityMode() {
do {
commitForSecurityMode();
if (!checkSecurityModeCommitCondition()) {
autil::ScopedLock lock(_writeRequestCond);
_writeRequestCond.wait(_maxWaitTimeForSecurityCommit);
}
} while (!_isStopped);
}
bool MessageBrain::checkSecurityModeCommitCondition() {
if (_writeRequestItemQueue.getMaxWaitTime() > _maxWaitTimeForSecurityCommit ||
_writeRequestItemQueue.getQueueDataSize() > (uint64_t)_maxDataSizeForSecurityCommit) {
return true;
} else {
return false;
}
}
void MessageBrain::checkTimeOutWriteRequest(vector<WriteRequestItem *> &requestItemVec) {
int64_t currTime = autil::TimeUtility::currentTime();
WriteRequestItem *requestItem = NULL;
for (size_t i = 0; i < requestItemVec.size(); ++i) {
requestItem = requestItemVec[i];
if (currTime - requestItem->receivedTime >= DEFAULT_TIMETOUT_LIMIT) {
++requestItem->collector->timeoutNumOfWriteRequest;
doneRunForProductRequest(requestItem->closure, requestItem->collector, ERROR_BROKER_BUSY);
DELETE_AND_SET_NULL(requestItem);
requestItemVec[i] = NULL;
} else {
break;
}
}
}
void MessageBrain::addWriteRequestToMemory(vector<WriteRequestItem *> &requestItemVec) {
WriteRequestItem *requestItem = NULL;
ErrorCode ec = ERROR_NONE;
for (size_t i = 0; i < requestItemVec.size(); ++i) {
requestItem = requestItemVec[i];
if (NULL == requestItem) {
continue;
}
auto *response = requestItem->closure->_response;
ec = doAddMessage(requestItem->closure->_request,
response,
requestItem->msgCount,
requestItem->dataSize,
requestItem->collector);
setBrokerResponseError(response->mutable_errorinfo(), ec);
if (0 == response->acceptedmsgcount()) {
++requestItem->collector->deniedNumOfWriteRequestWithError;
doneRunForProductRequest(requestItem->closure, requestItem->collector, ec);
DELETE_AND_SET_NULL(requestItem);
requestItemVec[i] = NULL;
}
}
}
void MessageBrain::commitAllMessage() {
if (_topicMode == TOPIC_MODE_MEMORY_PREFER || _topicMode == TOPIC_MODE_MEMORY_ONLY ||
_messageCommitter->hasSealError()) {
return;
}
int64_t lastMsgId = _messageGroup->getLastReceivedMsgId();
int64_t lastCommittedId = _messageCommitter->getCommittedId();
ErrorCode ec = ERROR_NONE;
while (lastMsgId > lastCommittedId) {
ec = commitMessage();
if (ec != ERROR_NONE) {
AUTIL_LOG(WARN,
"partition[%s] some error happen when commit message, "
"errorCode[%d], errorMsg[%s]",
_partitionId.ShortDebugString().c_str(),
ec,
ErrorCode_Name(ec).c_str());
if (_taskStatus) {
_taskStatus->setError(ec, ErrorCode_Name(ec).c_str());
}
if (_messageCommitter->hasSealError()) {
AUTIL_LOG(WARN, "[%s] has seal error, break commit", _partitionId.ShortDebugString().c_str());
break;
}
}
ec = _messageCommitter->commitFile();
if (ec != ERROR_NONE) {
AUTIL_LOG(WARN,
"partition[%s] some error happen when commit file, "
"errorCode[%d], errorMsg[%s]",
_partitionId.ShortDebugString().c_str(),
ec,
ErrorCode_Name(ec).c_str());
if (_taskStatus) {
_taskStatus->setError(ec, ErrorCode_Name(ec).c_str());
}
_messageCommitter->closeFile();
if (_messageCommitter->hasSealError()) {
AUTIL_LOG(WARN, "[%s] has seal error, break commit", _partitionId.ShortDebugString().c_str());
break;
}
}
lastMsgId = _messageGroup->getLastReceivedMsgId();
lastCommittedId = _messageCommitter->getCommittedId();
_messageGroup->setCommittedId(lastCommittedId);
if (lastMsgId > lastCommittedId) {
usleep(DEFAULT_RETRY_COMMIT_INTERVAL);
}
}
}
bool MessageBrain::needCommitMessage() {
if (!_messageGroup) {
return false;
}
int64_t lastMsgId = _messageGroup->getLastReceivedMsgId();
if (_messageCommitter) {
int64_t lastCommittedId = _messageCommitter->getCommittedId();
return lastMsgId != lastCommittedId;
} else {
return true;
}
}
ErrorCode MessageBrain::commitMessage() {
if (!_messageGroup) {
return ERROR_NONE;
}
int64_t lastMsgId = _messageGroup->getLastReceivedMsgId();
if (_topicMode == TOPIC_MODE_MEMORY_ONLY) {
_messageGroup->setCommittedId(lastMsgId);
if (_messageCommitter) {
_messageCommitter->updateCommittedId(lastMsgId);
}
return ERROR_NONE;
}
if (_topicMode == TOPIC_MODE_MEMORY_PREFER) {
int64_t committedId;
int64_t lastAccessTime;
int64_t committedTime;
_commitManager->getCommitIdAndAccessTime(committedId, lastAccessTime, committedTime);
int64_t lastCommittedId = _messageCommitter->getCommittedId();
int64_t curTime = TimeUtility::currentTime();
if (lastCommittedId < committedId && committedId != -1) {
_messageCommitter->updateCommittedId(committedId);
lastCommittedId = _messageCommitter->getCommittedId();
_messageGroup->setCommittedId(lastCommittedId);
int64_t delay = (curTime - committedTime) / 1000.0;
AUTIL_LOG(INFO,
"[%s] update committed id [%ld], committed timestamp[%ld], delay [%ld]",
_partitionId.ShortDebugString().c_str(),
lastCommittedId,
_messageGroup->getCommittedTimestamp(),
delay);
if (_metricsReporter) {
_metricsReporter->reportClientCommitQps(_metricsTags.get());
_metricsReporter->reportClientCommitDelay(delay, _metricsTags.get());
}
}
int64_t commitDelay = _messageGroup->getCommitDelay();
if (curTime - lastAccessTime < _maxCommitIntervalForMemoryPrefer && commitDelay < _maxCommitIntervalWhenDelay) {
return ERROR_NONE;
}
}
int64_t lastWritedId = _messageCommitter->getWritedId();
MemoryMessageVector vec;
ErrorCode ec = ERROR_NONE;
int64_t lastCommittedId = 0;
while (lastMsgId > lastWritedId) {
_messageGroup->getMemoryMessage(lastWritedId + 1, 256, vec);
ec = _messageCommitter->write(vec);
if (ec != ERROR_NONE) {
break;
}
lastWritedId = _messageCommitter->getWritedId();
lastCommittedId = _messageCommitter->getCommittedId();
_messageGroup->setCommittedId(lastCommittedId);
}
// write a null vector to trigger commit
if (ec == ERROR_NONE) {
ec = _messageCommitter->write(MemoryMessageVector());
}
lastCommittedId = _messageCommitter->getCommittedId();
_messageGroup->setCommittedId(lastCommittedId);
return ec;
}
int64_t MessageBrain::getCommittedMsgId() const {
if (_messageCommitter) {
return _messageCommitter->getCommittedId();
} else {
return _messageGroup->getCommittedId();
}
}
size_t MessageBrain::calNeedBlockCount(size_t msgCount, int64_t totalDataSize) {
size_t needBlockCount = 0;
size_t remainMetaCount = _messageGroup->getRemainMetaCount();
if (msgCount > remainMetaCount) {
size_t metaCountInBlock = _messageGroup->getMetaCountInOneBlock();
needBlockCount += (msgCount - remainMetaCount + metaCountInBlock - 1) / metaCountInBlock;
}
int64_t remainDataSize = _messageGroup->getRemainDataSize();
if (totalDataSize > remainDataSize) {
int64_t blockSize = _partitionBlockPool->getBlockSize();
needBlockCount += (totalDataSize - remainDataSize + blockSize - 1) / blockSize;
}
if (needBlockCount > 0) {
needBlockCount++;
}
return needBlockCount;
}
RecycleInfo MessageBrain::getRecycleInfo() { return _messageGroup->getRecycleInfo(); }
void MessageBrain::delExpiredFile() {
if (_fileManager) {
int64_t committedTs = FileManager::COMMITTED_TIMESTAMP_INVALID;
if (TOPIC_MODE_PERSIST_DATA == _topicMode) {
_commitManager->getCommitTimestamp(committedTs);
}
uint32_t deletedFileCount = 0;
_fileManager->delExpiredFile(committedTs, deletedFileCount);
if (_metricsReporter && deletedFileCount > 0) {
_metricsReporter->reportDeleteObsoleteFileQps(deletedFileCount, _metricsTags.get());
}
}
}
void MessageBrain::syncDfsUsedSize() {
if (_fileManager && !_fileManager->isUsedDfsSizeSynced()) {
FileManagerMetricsCollector collector;
_fileManager->syncDfsUsedSize(collector);
if (_metricsReporter) {
_metricsReporter->reportFileManagerMetrics(_metricsTags, collector);
}
}
}
// partition recycle condition: 1. current write may exceed max partition buffer
// 2.no free block in center pool and partition reserved blocks also can not hold those messags
void MessageBrain::recycleBuffer(int64_t blockCount) {
if (!_messageGroup) {
return;
}
if (_recycleMutex.trylock() != 0) {
return;
}
if (!_messageGroup->canRecycle()) {
AUTIL_INTERVAL_LOG(100,
INFO,
"partition [%s ] no msg can be recycled,"
" msg count [%ld], used block [%ld], unused block [%ld],"
" max use block [%ld], parent unused block [%ld]",
_partitionId.ShortDebugString().c_str(),
_messageGroup->getMsgCountInBuffer(),
_partitionBlockPool->getUsedBlockCount(),
_partitionBlockPool->getUnusedBlockCount(),
_partitionBlockPool->getMaxBlockCount(),
_partitionBlockPool->getParentUnusedBlockCount());
_recycleMutex.unlock();
return;
}
int64_t begTime = TimeUtility::currentTime();
kmonitor::MetricsTags tags;
tags.AddTag("topic", _partitionId.topicname());
tags.AddTag("partition", intToString(_partitionId.id()));
if (blockCount != 0) { //自己写调用进来的,如果资源够不用回收
int64_t unusedBlock = _partitionBlockPool->getUnusedBlockCount();
int64_t partReserveBlock = min(unusedBlock, _partitionBlockPool->getReserveBlockCount());
if (partReserveBlock >= blockCount) {
_recycleMutex.unlock();
return;
}
static int64_t RECYCLE_MIN_BLOCK_THRESHOLD = 32;
int64_t maxCanUseBlock = _partitionBlockPool->getMaxBlockCount();
int64_t usedBlock = _partitionBlockPool->getUsedBlockCount();
int64_t totalUnusedBlock = unusedBlock + _partitionBlockPool->getParentUnusedBlockCount();
int64_t needBlock = max(blockCount, RECYCLE_MIN_BLOCK_THRESHOLD);
if (usedBlock + blockCount <= maxCanUseBlock && totalUnusedBlock > needBlock) {
_recycleMutex.unlock();
return;
}
}
int64_t actualRecycleSize = 0;
int64_t actualRecycleCount = 0;
{ // recycle by reader info first
int64_t minTimestamp = 0, minMsgId = 0;
{
ScopedReadWriteLock lock(_readerRW, 'r');
getSlowestMemReaderInfo(&_readerInfoMap, minTimestamp, minMsgId);
}
_messageGroup->tryRecycleByReaderInfo(minTimestamp, actualRecycleSize, actualRecycleCount);
if (_metricsReporter) {
_metricsReporter->reportRecycleWriteCacheByReaderSize(actualRecycleSize, &tags);
}
}
if (actualRecycleSize <= 0) {
_messageGroup->tryRecycleFast(_recyclePercent, actualRecycleSize, actualRecycleCount);
if (actualRecycleSize <= 0) {
int64_t maxBlockCount = _partitionBlockPool->getMaxBlockCount();
int64_t usedBlockCount = _partitionBlockPool->getUsedBlockCount();
int64_t blockSize = _partitionBlockPool->getBlockSize();
int64_t recycleBlock = 0;
if (usedBlockCount + blockCount > maxBlockCount) { // each recycle 10% used memory at least
recycleBlock = max(blockCount, int64_t(std::ceil(maxBlockCount * _recyclePercent)));
} else {
recycleBlock = max(blockCount, int64_t(std::ceil(usedBlockCount * _recyclePercent)));
}
int64_t recycleSize = recycleBlock * blockSize;
_messageGroup->tryRecycle(recycleSize, actualRecycleSize, actualRecycleCount);
}
if (_metricsReporter) {
_metricsReporter->reportRecycleWriteCacheByForceSize(actualRecycleSize, &tags);
}
}
int64_t endTime = TimeUtility::currentTime();
AUTIL_LOG(INFO,
"partition [%s ] recycle msg count [%ld], recycle size [%ld],"
" left msg count [%ld], used block [%ld], unused block [%ld],"
" max use block [%ld], parent unused block [%ld], use time [%ld].",
_partitionId.ShortDebugString().c_str(),
actualRecycleCount,
actualRecycleSize,
_messageGroup->getMsgCountInBuffer(),
_partitionBlockPool->getUsedBlockCount(),
_partitionBlockPool->getUnusedBlockCount(),
_partitionBlockPool->getMaxBlockCount(),
_partitionBlockPool->getParentUnusedBlockCount(),
endTime - begTime);
if (_metricsReporter) {
_metricsReporter->incRecycleQps(&tags);
}
_recycleMutex.unlock();
}
void MessageBrain::recycleFileCache(int64_t metaThreshold, int64_t dataThreshold) {
if (_fsMessageReader) {
ReaderInfoMap readerInfoMap;
{
ScopedReadWriteLock lock(_readerRW, 'r');
readerInfoMap = _readerInfoMap;
}
_fsMessageReader->recycle(&readerInfoMap, metaThreshold, dataThreshold);
}
}
ErrorCode MessageBrain::getMessage(const ConsumptionRequest *request,
MessageResponse *response,
TimeoutChecker *timeoutChecker,
const string *srcIpPort,
ReadMetricsCollector &collector) {
++_metricStat.readRequestNum;
if (_enableFastRecover && _messageCommitter && _messageCommitter->hasSealError()) {
AUTIL_LOG(
ERROR, "%s-%d write dfs error, return part not found", _partitionId.topicname().c_str(), _partitionId.id());
return ERROR_BROKER_TOPIC_PARTITION_NOT_FOUND;
}
_lastReadTime = TimeUtility::currentTime();
if (!_flowCtrol->canRead(_lastReadTime)) {
collector.readLimitQps = 1;
AUTIL_INTERVAL_LOG(20,
WARN,
"%s-%d recent read size[%ld] exceed limit[%ld]",
_partitionId.topicname().c_str(),
_partitionId.id(),
_flowCtrol->getReadSize(),
_maxReadSizeSec);
return ERROR_BROKER_BUSY;
}
if (!(request->has_starttimestamp() || request->has_startid())) {
AUTIL_LOG(WARN, "no start id and msg time stamp");
return ERROR_BROKER_MESSAGE_FORMAT_INVALID;
}
response->set_sessionid(_sessionId);
if (request->has_sessionidextend()) {
int64_t sessionId = request->sessionidextend();
if (sessionId != -1 && sessionId != _sessionId) {
// drop this request
return ERROR_BROKER_SESSION_CHANGED;
}
}
if (_commitManager) {
bool needUpdateMsgId = (TOPIC_MODE_MEMORY_PREFER == _topicMode);
_commitManager->updateCommit(request, needUpdateMsgId);
}
ReaderInfoPtr readerInfo(new ReaderInfo(autil::TimeUtility::currentTime()));
ErrorCode ec = ERROR_NONE;
if (request->requiredfieldnames_size() != 0 || request->has_fieldfilterdesc()) {
if (_needFieldFilter) {
ec = getFieldFilterMessage(request, response, timeoutChecker, readerInfo, collector);
} else {
AUTIL_LOG(ERROR,
"partition[%s] receive request required field names or "
"has field filter desc, but partition not support field filter",
_partitionId.ShortDebugString().c_str());
ec = ERROR_BROKER_TOPIC_NOT_SUPPORT_FIELD_FILTER;
}
} else {
ec = doGetMessage(request, response, timeoutChecker, readerInfo, collector);
}
int64_t totalSize = 0;
int64_t msgCount = 0;
if (response->messageformat() == MF_PB) {
msgCount = response->msgs_size();
for (int i = 0; i < msgCount; ++i) {
size_t len = response->msgs(i).data().length();
totalSize += len;
}
collector.readMsgCount_PB = msgCount;
if (msgCount > 0) {
const Message &lastMsg = response->msgs(msgCount - 1);
if (lastMsg.has_timestamp()) {
_lastReadMsgTimeStamp = lastMsg.timestamp();
}
}
} else if (response->messageformat() == MF_FB) {
FBMessageReader reader;
totalSize = response->fbmsgs().size();
reader.init(response->fbmsgs(), false);
msgCount = reader.size();
collector.readMsgCount_FB = msgCount;
if (msgCount > 0) {
const protocol::flat::Message *fbMsg = reader.read(msgCount - 1);
int64_t timestamp = fbMsg->timestamp();
if (timestamp != 0) {
_lastReadMsgTimeStamp = fbMsg->timestamp();
}
}
}
if (msgCount > 0) {
collector.returnedMsgSize = totalSize / msgCount;
}
_metricStat.readSize += totalSize;
int64_t readDfsSize = collector.partitionActualReadDataRateFromDFS + collector.partitionActualReadMetaRateFromDFS;
if (readDfsSize > 0) {
_flowCtrol->updateReadSize(readDfsSize);
}
collector.returnedMsgsSizePerReadRequest = totalSize;
collector.returnedMsgsCountPerReadRequest = msgCount;
collector.returnedMsgsCountWithMergePerReadRequest = response->totalmsgcount();
collector.readMsgCount = msgCount;
collector.mergedReadMsgCount = response->totalmsgcount() - msgCount;
collector.totalReadMsgCount = response->totalmsgcount();
updateReaderInfo(ec, request, srcIpPort, response, readerInfo);
return ec;
}
ErrorCode MessageBrain::getFieldFilterMessage(const ConsumptionRequest *request,
MessageResponse *response,
TimeoutChecker *timeoutChecker,
ReaderInfoPtr &readerInfo,
ReadMetricsCollector &collector) {
ErrorCode ec = ERROR_NONE;
FieldFilter fieldFilter;
if (ERROR_NONE != (ec = fieldFilter.init(request))) {
return ec;
}
fieldFilter.setTopicName(_partitionId.topicname());
fieldFilter.setPartId(_partitionId.id());
int64_t totalSize = 0;
int64_t rawTotalSize = 0;
int64_t readMsgCount = 0;
int64_t updateMsgCount = 0;
ec = doGetMessage(request, response, timeoutChecker, readerInfo, collector);
ErrorCode ec2 = MessageCompressor::decompressResponseMessage(response, false);
if (ec2 != ERROR_NONE) {
AUTIL_LOG(WARN, "decompress message error.");
}
fieldFilter.filter(response, totalSize, rawTotalSize, readMsgCount, updateMsgCount);
if (readMsgCount != 0) {
collector.fieldFilterReadMsgCountPerReadRequest = readMsgCount;
collector.fieldFilterUpdatedMsgRatio = 100 * updateMsgCount / (double)readMsgCount;
collector.fieldFilteredMsgSize = totalSize / readMsgCount;
collector.rawFieldFilteredMsgSize = rawTotalSize / readMsgCount;
}
if (rawTotalSize != 0) {
double ratio = totalSize / (double)rawTotalSize;
collector.fieldFilteredMsgRatio = ratio * 100;
int64_t filterSize = rawTotalSize - totalSize;
collector.fieldFilterMsgsSizePerReadRequest = filterSize;
}
return ec;
}
ErrorCode MessageBrain::doGetMessage(const ConsumptionRequest *request,
MessageResponse *response,
TimeoutChecker *timeoutChecker,
ReaderInfoPtr &readerInfo,
ReadMetricsCollector &collector) {
if (messageIdValidInMemory(request)) {
ErrorCode ec = _messageGroup->getMessage(request, !_fsMessageReader, response, readerInfo, collector);
if (ec == ERROR_NONE) {
return ERROR_NONE;
}
}
if (!_permissionCenter->incReadFileCount()) {
AUTIL_LOG(WARN,
"read message from partition [%s] exceed read file limit [%d],",
_partitionId.ShortDebugString().c_str(),
_permissionCenter->getReadFileLimit());
return ERROR_BROKER_BUSY;
}
ScopedReadFilePermission readFilePermission(_permissionCenter);
unique_ptr<ConsumptionRequest> newRequest;
ErrorCode ec = ERROR_NONE;
if (!messageIdValid(request, timeoutChecker, &collector)) {
AUTIL_MASSIVE_LOG(
WARN, "validate msgId for request[%s] failed, need seek", request->ShortDebugString().c_str());
assert(request->has_starttimestamp());
MessageIdResponse msgIdResponse;
ec = getMinMessageIdByTime(request->starttimestamp(), &msgIdResponse, timeoutChecker, false);
int64_t msgId = msgIdResponse.msgid();
if (ec != ERROR_NONE) {
if (ec == ERROR_BROKER_NO_DATA) { // memory topic or all date remove
response->set_nextmsgid(0);
response->set_nexttimestamp(_messageGroup->getLastAvailableMsgTimestamp());
AUTIL_LOG(WARN,
"partition[%s] no data, but request start[%ld], timestamp[%ld]",
_partitionId.ShortDebugString().c_str(),
request->startid(),
request->starttimestamp());
return ec;
} else if (ec == ERROR_BROKER_TIMESTAMP_TOO_LATEST) {
msgId++;
} else {
AUTIL_MASSIVE_LOG(ERROR,
"[%s] getMinMessageIdByTime[%ld] failed, ec [%d]",
_partitionId.ShortDebugString().c_str(),
request->starttimestamp(),
ec);
return ec;
}
}
newRequest.reset(new ConsumptionRequest(*request));
newRequest->set_startid(msgId);
request = newRequest.get(); // request has changed
}
ec = _messageGroup->getMessage(request, !_fsMessageReader, response, readerInfo, collector);
if (ERROR_BROKER_NO_DATA_IN_MEM == ec) {
bool isAppending = false;
ReaderInfoMap readerInfoMap;
{
ScopedReadWriteLock lock(_readerRW, 'r');
readerInfoMap = _readerInfoMap;
}
ec = _fsMessageReader->getMessage(
request, response, timeoutChecker, isAppending, readerInfo.get(), &readerInfoMap, &collector);
if (isAppending) {
AUTIL_MASSIVE_LOG(WARN, "[%s] set commit file is readed flag", _partitionId.ShortDebugString().c_str());
_messageCommitter->setCommitFileIsReaded();
}
}
return ec;
}
ErrorCode MessageBrain::checkProductionRequest(const ProductionRequest *request,
int64_t &totalSize,
int64_t &msgCount,
int64_t &msgCountWithMerged,
WriteMetricsCollector &collector) {
totalSize = 0;
ErrorCode ec = ERROR_NONE;
FieldGroupReader fieldGroupReader;
if (_needFieldFilter && _checkFieldFilterMsg) {
TimeTrigger decompressTimeTrigger;
decompressTimeTrigger.beginTrigger();
float ratio;
ec = MessageCompressor::decompressProductionMessage(const_cast<ProductionRequest *>(request), ratio);
decompressTimeTrigger.endTrigger();
collector.writeMsgDecompressedRatio = ratio * 100;
collector.writeMsgDecompressedLatency = decompressTimeTrigger.getLatency();
if (ec != ERROR_NONE) {
return ec;
}
}
if (request->messageformat() == MF_PB) {
msgCount = request->msgs_size();
msgCountWithMerged = msgCount;
for (int i = 0; i < request->msgs_size(); ++i) {
const protocol::Message &msg = request->msgs(i);
const string &data = msg.data();
int64_t msgSize = data.size();
if (msgSize > _maxMessageSize) {
ec = ERROR_BROKER_MSG_LENGTH_EXCEEDED;
break;
}
if (_checkFieldFilterMsg && _needFieldFilter && !msg.merged()) {
if (!fieldGroupReader.fromProductionString(data)) {
ec = ERROR_BROKER_MESSAGE_FORMAT_INVALID;
break;
}
}
if (msg.merged()) {
msgCountWithMerged += *(const uint16_t *)msg.data().c_str();
msgCountWithMerged -= 1;
}
totalSize += msgSize;
}
} else if (request->messageformat() == MF_FB) {
FBMessageReader reader;
if (!reader.init(request->fbmsgs(), true)) {
return ERROR_BROKER_MESSAGE_FORMAT_INVALID;
}
msgCount = reader.size();
msgCountWithMerged = msgCount;
for (int64_t i = 0; i < msgCount; i++) {
const protocol::flat::Message *fbMsg = reader.read(i);
if (fbMsg == NULL || fbMsg->data() == NULL) {
return ERROR_BROKER_MESSAGE_FORMAT_INVALID;
}
int64_t msgSize = fbMsg->data()->size();
if (msgSize > _maxMessageSize) {
ec = ERROR_BROKER_MSG_LENGTH_EXCEEDED;
break;
}
if (_checkFieldFilterMsg && _needFieldFilter && !fbMsg->merged()) {
if (!fieldGroupReader.fromProductionString(fbMsg->data()->c_str(), fbMsg->data()->size())) {
ec = ERROR_BROKER_MESSAGE_FORMAT_INVALID;
break;
}
}
totalSize += msgSize;
if (fbMsg->merged()) {
msgCountWithMerged += *(const uint16_t *)fbMsg->data()->c_str();
msgCountWithMerged -= 1;
}
}
}
return ec;
}
ErrorCode MessageBrain::addMessage(ProductionLogClosure *closure, WriteMetricsCollectorPtr &collector) {
++_metricStat.writeRequestNum;
const ProductionRequest *request = closure->_request;
MessageResponse *response = closure->_response;
if (_enableFastRecover && _messageCommitter && _messageCommitter->hasSealError()) {
AUTIL_LOG(
ERROR, "%s-%d write dfs error, return part not found", _partitionId.topicname().c_str(), _partitionId.id());
doneRunForProductRequest(closure, collector, ERROR_BROKER_TOPIC_PARTITION_NOT_FOUND);
return ERROR_BROKER_TOPIC_PARTITION_NOT_FOUND;
}
if (!_permissionCenter->incWriteCount()) {
AUTIL_LOG(WARN,
"write msg to partition[%s] exceed write limit[%d]",
_partitionId.ShortDebugString().c_str(),
_permissionCenter->getWriteLimit());
doneRunForProductRequest(closure, collector, ERROR_BROKER_BUSY);
return ERROR_BROKER_BUSY;
}
ScopedWritePermission writePermission(_permissionCenter);
int64_t totalMsgSize = 0;
int64_t msgCount = 0;
int64_t msgCountWithMerged = 0;
ErrorCode errorCode = checkProductionRequest(request, totalMsgSize, msgCount, msgCountWithMerged, *collector);
collector->msgsSizePerWriteRequest = totalMsgSize;
collector->msgsCountPerWriteRequest = msgCount;
collector->msgsCountWithMergedPerWriteRequest = msgCountWithMerged;
if (errorCode != ERROR_NONE) {
collector->acceptedMsgsCountPerWriteRequest = 0;
collector->acceptedMsgsSizePerWriteRequest = 0;
++collector->deniedNumOfWriteRequestWithError;
response->set_maxallowmsglen(_maxMessageSize);
doneRunForProductRequest(closure, collector, errorCode);
return errorCode;
}
if (_compressMsg || request->compressmsginbroker()) {
TimeTrigger compressTimeTrigger;
compressTimeTrigger.beginTrigger();
float ratio = 1.0;
MessageCompressor::compressProductionMessage(const_cast<ProductionRequest *>(request), _compressThres, ratio);
compressTimeTrigger.endTrigger();
collector->msgCompressedRatio = ratio * 100;
collector->msgCompressedLatency = compressTimeTrigger.getLatency();
}
bool replicationMode = request->has_replicationmode() && request->replicationmode();
if (!replicationMode && TOPIC_MODE_SECURITY == _topicMode) {
WriteRequestItem *requestItem = new WriteRequestItem;
requestItem->closure = closure;
requestItem->msgCount = msgCount;
requestItem->dataSize = totalMsgSize;
requestItem->collector = collector;
if (!_writeRequestItemQueue.pushRequestItem(requestItem)) {
errorCode = ERROR_BROKER_BUSY;
doneRunForProductRequest(closure, collector, errorCode);
delete requestItem;
} else if (checkSecurityModeCommitCondition()) {
autil::ScopedLock lock(_writeRequestCond);
_writeRequestCond.signal();
}
return errorCode;
}
// TOPIC_MODE_NORMAL == _topicMode
errorCode = doAddMessage(request, response, msgCount, totalMsgSize, collector);
if (replicationMode) {
commitAllMessage();
}
doneRunForProductRequest(closure, collector, errorCode);
_metricStat.writeSize += totalMsgSize;
return errorCode;
}
void MessageBrain::doneRunForProductRequest(ProductionLogClosure *closure,
WriteMetricsCollectorPtr &collector,
ErrorCode ec) {
closure->_response->set_sessionid(_sessionId);
closure->_response->set_committedid(getCommittedMsgId());
if (!closure->_response->has_acceptedmsgcount()) {
closure->_response->set_acceptedmsgcount(0);
}
closure->_response->set_maxmsgid(_messageGroup->getLastAvailableMsgId());
int64_t currTime = autil::TimeUtility::currentTime();
if (collector && _metricsReporter) {
collector->writeRequestLatency = (currTime - closure->_beginTime) / 1000.0;
if (ERROR_BROKER_BUSY == ec) {
collector->writeBusyError = true;
} else if (ERROR_NONE != ec) {
collector->otherWriteError = true;
}
_metricsReporter->reportWriteMetricsBackupThread(collector,
collector->accessId.empty() ? _metricsTags : nullptr);
}
ErrorInfo *errorInfo = closure->_response->mutable_errorinfo();
setBrokerResponseError(errorInfo, ec);
closure->_response->set_donetimestamp(currTime);
closure->Run();
}
ErrorCode MessageBrain::doAddMessage(const ProductionRequest *request,
MessageResponse *response,
int64_t msgCount,
int64_t totalMsgSize,
WriteMetricsCollectorPtr &collector) {
ScopedLock lock(_addMsgLock);
size_t needBlockCount = calNeedBlockCount(msgCount, totalMsgSize);
if (needBlockCount > 0) {
recycleBuffer(needBlockCount);
}
return _messageGroup->addMessage(request, response, *collector);
}
ErrorCode MessageBrain::getMaxMessageId(const MessageIdRequest *request, MessageIdResponse *response) {
response->set_sessionid(_sessionId);
if (request->has_sessionid()) {
int64_t sessionId = request->sessionid();
if (sessionId != -1 && sessionId != _sessionId) {
// drop this request
return ERROR_BROKER_SESSION_CHANGED;
}
}
if (_enableFastRecover && _messageCommitter && _messageCommitter->hasSealError()) {
AUTIL_LOG(
ERROR, "%s-%d write dfs error, return part not found", _partitionId.topicname().c_str(), _partitionId.id());
return ERROR_BROKER_TOPIC_PARTITION_NOT_FOUND;
}
if (!_permissionCenter->incGetMaxIdCount()) {
AUTIL_LOG(WARN,
"get max msgid from partition [%s] exceed read limit [%d],"
" current get maxid threadcount [%d].",
_partitionId.ShortDebugString().c_str(),
_permissionCenter->getReadLimit(),
_permissionCenter->getCurMaxIdCount());
return ERROR_BROKER_BUSY;
}
ScopedMaxIdPermission maxIdPermission(_permissionCenter);
return _messageGroup->getMaxMessageId(response);
}
ErrorCode MessageBrain::getMinMessageIdByTime(const MessageIdRequest *request,
MessageIdResponse *response,
TimeoutChecker *timeoutChecker) {
response->set_sessionid(_sessionId);
if (request->has_sessionid()) {
int64_t sessionId = request->sessionid();
if (sessionId != -1 && sessionId != _sessionId) {
// drop this request
return ERROR_BROKER_SESSION_CHANGED;
}
}
if (_enableFastRecover && _messageCommitter && _messageCommitter->hasSealError()) {
AUTIL_LOG(
ERROR, "%s-%d write dfs error, return part not found", _partitionId.topicname().c_str(), _partitionId.id());
return ERROR_BROKER_TOPIC_PARTITION_NOT_FOUND;
}
int64_t timestamp = request->timestamp();
return getMinMessageIdByTime(timestamp, response, timeoutChecker, true);
}
ErrorCode MessageBrain::getMinMessageIdByTime(int64_t timestamp,
MessageIdResponse *response,
TimeoutChecker *timeoutChecker,
bool applyFileRead) {
if (!_permissionCenter->incGetMinIdCount()) {
AUTIL_LOG(WARN,
"get min msgid from partition [%s] exceed read limit [%d],"
" current get minId thread count [%d].",
_partitionId.ShortDebugString().c_str(),
_permissionCenter->getReadLimit(),
_permissionCenter->getCurMinIdCount());
return ERROR_BROKER_BUSY;
}
ScopedMinIdPermission minIdPermission(_permissionCenter);
bool isMinInMemory = false;
ErrorCode errorCode = _messageGroup->getMinMessageIdByTime(timestamp, response, isMinInMemory);
if (errorCode != ERROR_NONE) {
return errorCode;
}
int64_t memoryTime = response->timestamp();
if (isMinInMemory && memoryTime > timestamp && _fsMessageReader) {
if (applyFileRead) {
if (!_permissionCenter->incReadFileCount()) {
AUTIL_MASSIVE_LOG(WARN,
"get min msgid from partition [%s] exceed read file limit [%d],",
_partitionId.ShortDebugString().c_str(),
_permissionCenter->getReadFileLimit());
return ERROR_BROKER_BUSY;
}
ScopedReadFilePermission readFilePermission(_permissionCenter);
errorCode = _fsMessageReader->getMinMessageIdByTime(timestamp, response, timeoutChecker);
} else {
errorCode = _fsMessageReader->getMinMessageIdByTime(timestamp, response, timeoutChecker);
}
}
return errorCode;
}
void MessageBrain::getPartitionMetrics(monitor::PartitionMetricsCollector &collector) const {
collector.topicName = _partitionId.topicname();
collector.partId = _partitionId.id();
collector.partitionStatus = 1;
collector.actualDataSize = _messageGroup->getDataSize();
collector.actualMetaSize = _messageGroup->getMetaSize();
int64_t blockSize = _partitionBlockPool->getBlockSize();
collector.usedBufferSize = _partitionBlockPool->getUsedBlockCount() * blockSize;
collector.unusedMaxBufferSize = _partitionBlockPool->getMaxUnusedBlockCount() * blockSize;
collector.reserveFreeBufferSize = _partitionBlockPool->getUnusedBlockCount() * blockSize;
collector.leftToBeCommittedDataSize = _messageGroup->getLeftToBeCommittedDataSize();
collector.lastAcceptedMsgTimeStamp = _messageGroup->getLastAcceptedMsgTimeStamp();
collector.lastReadMsgTimeStamp = _lastReadMsgTimeStamp;
int64_t firstTimestamp = _messageGroup->getRecycleInfo().firstMsgTimestamp;
int64_t curTime = autil::TimeUtility::currentTime();
collector.oldestMsgTimeStampInBuffer = curTime - firstTimestamp;
collector.commitDelay = _messageGroup->getCommitDelay();
collector.maxMsgId = _messageGroup->getLastReceivedMsgId();
collector.msgCountInBuffer = _messageGroup->getMsgCountInBuffer();
collector.topicMode = _topicMode;
collector.writeRequestQueueSize = _writeRequestItemQueue.getQueueSize();
collector.writeRequestQueueDataSize = _writeRequestItemQueue.getQueueDataSize();
if (_messageCommitter) {
collector.writeDataFileCount = _messageCommitter->getDataFile() == nullptr ? 0 : 1;
collector.writeMetaFileCount = _messageCommitter->getMetaFile() == nullptr ? 0 : 1;
collector.writeFailMetaFileCount = _messageCommitter->getWriteFailMetaCount();
collector.writeFailDataFileCount = _messageCommitter->getWriteFailDataCount();
collector.writeBufferSize = _messageCommitter->getWriteBufferSize();
}
if (_fsMessageReader) {
collector.cacheFileCount = _fsMessageReader->getCacheFileCount();
collector.cacheMetaBlockCount = _fsMessageReader->getCacheMetaBlockCount();
collector.cacheDataBlockCount = _fsMessageReader->getCacheDataBlockCount();
}
if (_fileManager && _fileManager->isUsedDfsSizeSynced()) {
collector.usedDfsSize = _fileManager->getUsedDfsSize();
}
}
void MessageBrain::getPartitionInMetric(uint32_t interval, protocol::PartitionInMetric *metric) {
metric->set_topicname(_partitionId.topicname());
metric->set_partid(_partitionId.id());
metric->set_lastwritetime(_messageGroup->getLastAcceptedMsgTimeStamp() / 1000000);
metric->set_lastreadtime(_lastReadTime / 1000000);
if (NULL == _innerPartMetric) {
_innerPartMetric = new InnerPartMetric(interval);
}
_innerPartMetric->update(_metricStat);
InnerPartMetricStat stat1min, stat5min;
_innerPartMetric->getMetric1min(stat1min);
_innerPartMetric->getMetric5min(stat5min);
metric->set_writerate1min(stat1min.writeSize);
metric->set_writerate5min(stat5min.writeSize / 5);
metric->set_readrate1min(stat1min.readSize);
metric->set_readrate5min(stat5min.readSize / 5);
metric->set_writerequest1min(stat1min.writeRequestNum);
metric->set_writerequest5min(stat5min.writeRequestNum / 5);
metric->set_readrequest1min(stat1min.readRequestNum);
metric->set_readrequest5min(stat5min.readRequestNum / 5);
int64_t dfsCommitDelay = -1;
if (_fileManager) {
dfsCommitDelay = _messageGroup->getCommitDelay();
}
if (-1 != dfsCommitDelay) {
metric->set_commitdelay(dfsCommitDelay);
}
}
protocol::TopicMode MessageBrain::getTopicMode() { return _topicMode; }
bool MessageBrain::messageIdValidInMemory(const ConsumptionRequest *request) {
if (!request->has_startid()) {
return false;
}
if (!request->has_starttimestamp()) {
return true;
}
bool valid = false;
int64_t msgId = request->startid();
int64_t msgTime = request->starttimestamp();
if (msgId == 0) {
return true;
}
if (_messageGroup->messageIdValid(msgId, msgTime, valid)) {
return valid;
}
return false;
}
bool MessageBrain::messageIdValid(const ConsumptionRequest *request,
TimeoutChecker *timeoutChecker,
ReadMetricsCollector *collector) {
if (!request->has_startid()) {
return false;
}
if (!request->has_starttimestamp()) {
return true;
}
bool valid = false;
int64_t msgId = request->startid();
int64_t msgTime = request->starttimestamp();
if (msgId == 0) {
return true;
}
if (_messageGroup->messageIdValid(msgId, msgTime, valid)) {
return valid;
}
if (!_fsMessageReader) {
return false;
}
return _fsMessageReader->messageIdValid(msgId, msgTime, timeoutChecker, collector);
}
void MessageBrain::setForceUnload(bool flag) { _forceUnload = flag; }
int64_t MessageBrain::getFileCacheBlockCount() {
if (_fsMessageReader) {
return _fsMessageReader->getCacheBlockCount();
} else {
return 0;
}
}
void MessageBrain::updateReaderInfo(ErrorCode ec,
const ConsumptionRequest *request,
const string *srcIpPort,
const MessageResponse *response,
ReaderInfoPtr &info) {
if (ec != ERROR_NONE && ec != ERROR_BROKER_BUSY) {
return;
}
if (NULL == srcIpPort || srcIpPort->empty()) {
return;
}
const ReaderDes &reader = getReaderDescription(request, *srcIpPort);
info->nextTimestamp = response->nexttimestamp();
info->nextMsgId = response->nextmsgid();
ScopedReadWriteLock lock(_readerRW, 'w');
_readerInfoMap[reader] = info;
}
ReaderDes MessageBrain::getReaderDescription(const ConsumptionRequest *request, const string &srcIpPort) {
ReaderDes reader;
reader.setIpPort(srcIpPort);
reader.from = request->filter().from();
reader.to = request->filter().to();
return reader;
}
void MessageBrain::recycleFile() {
if (_fsMessageReader) {
_fsMessageReader->recycleFile();
}
}
void MessageBrain::recycleObsoleteReader() {
// clear obsolete reader
int64_t currentTime = autil::TimeUtility::currentTime();
int64_t expireTime = currentTime - _obsoleteReaderInterval;
string removeInfo;
{
ScopedReadWriteLock lock(_readerRW, 'w');
auto iter = _readerInfoMap.begin();
while (iter != _readerInfoMap.end()) {
if (iter->second->requestTime < expireTime) {
const string &readerDesStr = (iter->first).toString();
removeInfo += readerDesStr;
iter = _readerInfoMap.erase(iter);
} else {
iter++;
}
}
}
{ // clear metrics history data
ScopedReadWriteLock lock(_readerRW, 'w');
expireTime = currentTime - _obsoleteReaderMetricInterval;
auto iter = _readerInfoMap.begin();
while (iter != _readerInfoMap.end()) {
iter->second->metaInfo->clearHistoryData(expireTime);
iter->second->dataInfo->clearHistoryData(expireTime);
iter++;
}
}
if (!removeInfo.empty()) {
AUTIL_LOG(
INFO, "erase expire readers[%s], now:%ld, expire time:%ld", removeInfo.c_str(), currentTime, expireTime);
}
}
bool MessageBrain::getFileBlockDis(vector<int64_t> &metaDisBlocks, vector<int64_t> &dataDisBlocks) {
if (_fsMessageReader) {
ReaderInfoMap readerInfoMap;
{
ScopedReadWriteLock lock(_readerRW, 'r');
readerInfoMap = _readerInfoMap;
}
return _fsMessageReader->getFileBlockDis(&readerInfoMap, metaDisBlocks, dataDisBlocks);
} else {
return false;
}
}
bool MessageBrain::hasSealError() {
if (_messageCommitter) {
return _messageCommitter->hasSealError();
} else {
return false;
}
}
} // namespace storage
} // namespace swift