in aios/apps/facility/swift/broker/storage/MessageGroup.cpp [425:572]
protocol::ErrorCode MessageGroup::getMessage(const ConsumptionRequest *request,
bool memMode,
MessageResponse *response,
ReaderInfoPtr readerInfo,
ReadMetricsCollector &collector) {
if (readerInfo) {
readerInfo->setReadFile(false);
}
int64_t startId = request->startid();
int64_t count = request->count();
int64_t maxTotalSize = request->maxtotalsize();
bool filterFlag = MessageUtil::needFilter(_partitionId, request->filter());
ThreadBasedObjectPool<MemoryMessageVector> *vecPool =
Singleton<ThreadBasedObjectPool<MemoryMessageVector>>::getInstance();
MemoryMessageVector *mVec = vecPool->getObject();
mVec->clear();
int64_t readCount = 0;
{
bool readNotCommittedMsg =
request->has_readcommittedmsg() ? !request->readcommittedmsg() : _readNotCommittedMsg;
ScopedReadWriteLock lock(_rwLock, 'r');
auto view = _msgDeque->createView(readNotCommittedMsg);
if (view.empty()) {
response->set_nextmsgid(0);
response->set_nexttimestamp(view.getLastMsgTimestamp());
return ERROR_BROKER_NO_DATA;
}
assert(count >= 0);
response->set_maxmsgid(view.getLastMsgId());
response->set_maxtimestamp(view.getLastMsgTimestamp());
const MemoryMessage &oldestMsg = view[0];
int64_t oldestMsgId = oldestMsg.getMsgId();
if (startId < oldestMsgId && !memMode) {
// to avoid no message in dfs. missing the lastest file or something like that.
response->set_nextmsgid(oldestMsgId);
response->set_nexttimestamp(oldestMsg.getTimestamp());
return ERROR_BROKER_NO_DATA_IN_MEM;
}
int64_t beginPos = startId - oldestMsgId > 0 ? startId - oldestMsgId : 0;
int64_t msgCount = view.size();
int64_t i = beginPos;
int64_t totalSize = 0;
if (filterFlag) {
MessageFilter filter(request->filter());
for (; i < msgCount; ++i) {
const MemoryMessage &memMsg = view[i];
if (filter.filter(memMsg)) {
if (readCount >= count) {
break;
}
totalSize += memMsg.getLen();
if (totalSize > maxTotalSize && readCount != 0) {
break;
}
mVec->push_back(memMsg);
readCount += memMsg.getMsgCount();
}
}
for (; i < msgCount; ++i) {
const MemoryMessage &memMsg = view[i];
if (filter.filter(memMsg)) {
response->set_nextmsgid(memMsg.getMsgId());
response->set_nexttimestamp(memMsg.getTimestamp());
break;
}
}
} else {
for (; i < msgCount; ++i) {
const MemoryMessage &memMsg = view[i];
if (readCount >= count) {
break;
}
totalSize += memMsg.getLen();
if (totalSize > maxTotalSize && readCount != 0) {
break;
}
mVec->push_back(memMsg);
readCount += memMsg.getMsgCount();
}
if (i < msgCount) {
const MemoryMessage &memMsg = view[i];
response->set_nextmsgid(memMsg.getMsgId());
response->set_nexttimestamp(memMsg.getTimestamp());
}
}
if (i >= msgCount) {
response->set_nextmsgid(view.getNextMsgIdAfterLast());
response->set_nexttimestamp(view.getNextMsgTimestampAfterLast());
}
}
int64_t returnedMsgsSizePerReadRequest = 0;
size_t msgCount = mVec->size();
string decodeBuf;
bool hasMergedMsg = false;
int64_t totalMsgCount = 0;
if (request->versioninfo().supportfb()) {
ThreadBasedObjectPool<FBMessageWriter> *objectPool =
Singleton<ThreadBasedObjectPool<FBMessageWriter>>::getInstance();
FBMessageWriter *writer = objectPool->getObject();
writer->reset();
for (size_t i = 0; i < msgCount; ++i) {
const MemoryMessage &memMsg = (*mVec)[i];
hasMergedMsg |= memMsg.isMerged();
returnedMsgsSizePerReadRequest += memMsg.getLen();
totalMsgCount += MessageConverter::decode(memMsg, writer, decodeBuf);
}
writer->finish();
response->set_messageformat(MF_FB);
response->set_fbmsgs(writer->data(), writer->size());
writer->reset();
} else {
for (size_t i = 0; i < msgCount; ++i) {
const MemoryMessage &memMsg = (*mVec)[i];
hasMergedMsg |= memMsg.isMerged();
protocol::Message *msg = response->add_msgs();
returnedMsgsSizePerReadRequest += memMsg.getLen();
totalMsgCount += MessageConverter::decode(memMsg, *msg, decodeBuf);
}
response->set_messageformat(MF_PB);
}
assert(totalMsgCount == readCount);
if (totalMsgCount != readCount) {
AUTIL_LOG(ERROR,
"partition [%s] read error, read count [%ld], decode count [%ld]!",
_partitionId.ShortDebugString().c_str(),
readCount,
totalMsgCount);
}
response->set_hasmergedmsg(hasMergedMsg);
response->set_totalmsgcount(totalMsgCount);
collector.msgReadQpsFromMemory = msgCount;
collector.msgReadQpsWithMergedFromMemory = totalMsgCount;
collector.msgReadRateFromMemory = returnedMsgsSizePerReadRequest;
if (readerInfo) {
readerInfo->dataInfo->updateRate(returnedMsgsSizePerReadRequest);
}
mVec->clear();
if (mVec->capacity() * sizeof(MemoryMessage) > MAX_MEMORY_MESSAGE_BUFFER_SIZE) {
AUTIL_LOG(INFO,
"memory message buffer [%ld] large than [%ld], release space.",
mVec->capacity() * sizeof(MemoryMessage),
MAX_MEMORY_MESSAGE_BUFFER_SIZE);
MemoryMessageVector mmv;
mmv.swap(*mVec);
}
return ERROR_NONE;
}