protocol::ErrorCode MessageGroup::getMessage()

in aios/apps/facility/swift/broker/storage/MessageGroup.cpp [425:572]


protocol::ErrorCode MessageGroup::getMessage(const ConsumptionRequest *request,
                                             bool memMode,
                                             MessageResponse *response,
                                             ReaderInfoPtr readerInfo,
                                             ReadMetricsCollector &collector) {
    if (readerInfo) {
        readerInfo->setReadFile(false);
    }
    int64_t startId = request->startid();
    int64_t count = request->count();
    int64_t maxTotalSize = request->maxtotalsize();
    bool filterFlag = MessageUtil::needFilter(_partitionId, request->filter());
    ThreadBasedObjectPool<MemoryMessageVector> *vecPool =
        Singleton<ThreadBasedObjectPool<MemoryMessageVector>>::getInstance();
    MemoryMessageVector *mVec = vecPool->getObject();
    mVec->clear();
    int64_t readCount = 0;
    {
        bool readNotCommittedMsg =
            request->has_readcommittedmsg() ? !request->readcommittedmsg() : _readNotCommittedMsg;
        ScopedReadWriteLock lock(_rwLock, 'r');
        auto view = _msgDeque->createView(readNotCommittedMsg);
        if (view.empty()) {
            response->set_nextmsgid(0);
            response->set_nexttimestamp(view.getLastMsgTimestamp());
            return ERROR_BROKER_NO_DATA;
        }
        assert(count >= 0);
        response->set_maxmsgid(view.getLastMsgId());
        response->set_maxtimestamp(view.getLastMsgTimestamp());

        const MemoryMessage &oldestMsg = view[0];
        int64_t oldestMsgId = oldestMsg.getMsgId();
        if (startId < oldestMsgId && !memMode) {
            // to avoid no message in dfs. missing the lastest file or something like that.
            response->set_nextmsgid(oldestMsgId);
            response->set_nexttimestamp(oldestMsg.getTimestamp());
            return ERROR_BROKER_NO_DATA_IN_MEM;
        }
        int64_t beginPos = startId - oldestMsgId > 0 ? startId - oldestMsgId : 0;
        int64_t msgCount = view.size();
        int64_t i = beginPos;
        int64_t totalSize = 0;
        if (filterFlag) {
            MessageFilter filter(request->filter());
            for (; i < msgCount; ++i) {
                const MemoryMessage &memMsg = view[i];
                if (filter.filter(memMsg)) {
                    if (readCount >= count) {
                        break;
                    }
                    totalSize += memMsg.getLen();
                    if (totalSize > maxTotalSize && readCount != 0) {
                        break;
                    }
                    mVec->push_back(memMsg);
                    readCount += memMsg.getMsgCount();
                }
            }
            for (; i < msgCount; ++i) {
                const MemoryMessage &memMsg = view[i];
                if (filter.filter(memMsg)) {
                    response->set_nextmsgid(memMsg.getMsgId());
                    response->set_nexttimestamp(memMsg.getTimestamp());
                    break;
                }
            }
        } else {
            for (; i < msgCount; ++i) {
                const MemoryMessage &memMsg = view[i];
                if (readCount >= count) {
                    break;
                }
                totalSize += memMsg.getLen();
                if (totalSize > maxTotalSize && readCount != 0) {
                    break;
                }
                mVec->push_back(memMsg);
                readCount += memMsg.getMsgCount();
            }
            if (i < msgCount) {
                const MemoryMessage &memMsg = view[i];
                response->set_nextmsgid(memMsg.getMsgId());
                response->set_nexttimestamp(memMsg.getTimestamp());
            }
        }
        if (i >= msgCount) {
            response->set_nextmsgid(view.getNextMsgIdAfterLast());
            response->set_nexttimestamp(view.getNextMsgTimestampAfterLast());
        }
    }
    int64_t returnedMsgsSizePerReadRequest = 0;
    size_t msgCount = mVec->size();
    string decodeBuf;
    bool hasMergedMsg = false;
    int64_t totalMsgCount = 0;
    if (request->versioninfo().supportfb()) {
        ThreadBasedObjectPool<FBMessageWriter> *objectPool =
            Singleton<ThreadBasedObjectPool<FBMessageWriter>>::getInstance();
        FBMessageWriter *writer = objectPool->getObject();
        writer->reset();
        for (size_t i = 0; i < msgCount; ++i) {
            const MemoryMessage &memMsg = (*mVec)[i];
            hasMergedMsg |= memMsg.isMerged();
            returnedMsgsSizePerReadRequest += memMsg.getLen();
            totalMsgCount += MessageConverter::decode(memMsg, writer, decodeBuf);
        }
        writer->finish();
        response->set_messageformat(MF_FB);
        response->set_fbmsgs(writer->data(), writer->size());
        writer->reset();
    } else {
        for (size_t i = 0; i < msgCount; ++i) {
            const MemoryMessage &memMsg = (*mVec)[i];
            hasMergedMsg |= memMsg.isMerged();
            protocol::Message *msg = response->add_msgs();
            returnedMsgsSizePerReadRequest += memMsg.getLen();
            totalMsgCount += MessageConverter::decode(memMsg, *msg, decodeBuf);
        }
        response->set_messageformat(MF_PB);
    }
    assert(totalMsgCount == readCount);
    if (totalMsgCount != readCount) {
        AUTIL_LOG(ERROR,
                  "partition [%s] read error, read count [%ld], decode count [%ld]!",
                  _partitionId.ShortDebugString().c_str(),
                  readCount,
                  totalMsgCount);
    }
    response->set_hasmergedmsg(hasMergedMsg);
    response->set_totalmsgcount(totalMsgCount);
    collector.msgReadQpsFromMemory = msgCount;
    collector.msgReadQpsWithMergedFromMemory = totalMsgCount;
    collector.msgReadRateFromMemory = returnedMsgsSizePerReadRequest;
    if (readerInfo) {
        readerInfo->dataInfo->updateRate(returnedMsgsSizePerReadRequest);
    }
    mVec->clear();
    if (mVec->capacity() * sizeof(MemoryMessage) > MAX_MEMORY_MESSAGE_BUFFER_SIZE) {
        AUTIL_LOG(INFO,
                  "memory message buffer [%ld] large than [%ld], release space.",
                  mVec->capacity() * sizeof(MemoryMessage),
                  MAX_MEMORY_MESSAGE_BUFFER_SIZE);
        MemoryMessageVector mmv;
        mmv.swap(*mVec);
    }
    return ERROR_NONE;
}