public GetMessageResult getMessages()

in inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java [188:323]


    public GetMessageResult getMessages(int reqSwitch, long requestOffset,
            int partitionId, ConsumerNodeInfo consumerNodeInfo,
            String statsKeyBase, int msgSizeLimit,
            long reqRcvTime) throws IOException {
        // #lizard forgives
        if (this.closed.get()) {
            throw new IllegalStateException(new StringBuilder(512)
                    .append("[Data Store] Closed MessageStore for storeKey ")
                    .append(this.storeKey).toString());
        }
        int result = 0;
        boolean inMemCache = false;
        int maxIndexReadLength = memMaxIndexReadCnt.get();
        GetCacheMsgResult memMsgRlt = new GetCacheMsgResult(false, TErrCodeConstants.NOT_FOUND,
                requestOffset, "Can't found Message by index in cache");
        // determine position to read.
        reqSwitch = (reqSwitch <= 0)
                ? 0
                : (consumerNodeInfo.isFilterConsume() ? (reqSwitch % 100) : (reqSwitch / 100));
        if (tubeConfig.isEnableMemStore()) {
            if (reqSwitch > 1) {
                // in read memory situation, read main memory or backup memory by consumer's config.
                long maxIndexOffset = TBaseConstants.META_VALUE_UNDEFINED;
                if (requestOffset >= this.msgFileStore.getIndexMaxOffset()) {
                    this.writeCacheMutex.readLock().lock();
                    try {
                        maxIndexOffset = this.msgMemStore.getIndexLastWritePos();
                        result = this.msgMemStoreBeingFlush.isOffsetInHold(requestOffset);
                        if (result >= 0) {
                            inMemCache = true;
                            if (result > 0) {
                                if (reqSwitch > 2) {
                                    memMsgRlt =
                                            // read from main memory.
                                            msgMemStore.getMessages(consumerNodeInfo.getLastDataRdOffset(),
                                                    requestOffset, msgStoreMgr.getMaxMsgTransferSize(),
                                                    maxIndexReadLength, partitionId, false,
                                                    consumerNodeInfo.isFilterConsume(),
                                                    consumerNodeInfo.getFilterCondCodeSet(), reqRcvTime);
                                }
                            } else {
                                // read from backup memory.
                                memMsgRlt =
                                        msgMemStoreBeingFlush.getMessages(consumerNodeInfo.getLastDataRdOffset(),
                                                requestOffset, msgStoreMgr.getMaxMsgTransferSize(),
                                                maxIndexReadLength, partitionId, true,
                                                consumerNodeInfo.isFilterConsume(),
                                                consumerNodeInfo.getFilterCondCodeSet(), reqRcvTime);
                            }
                        }
                    } finally {
                        this.writeCacheMutex.readLock().unlock();
                    }
                }
                if (inMemCache) {
                    // return not found when data is under memory sink operation.
                    if (memMsgRlt.isSuccess) {
                        HashMap<String, TrafficInfo> countMap =
                                new HashMap<>();
                        List<ClientBroker.TransferedMessage> transferedMessageList =
                                new ArrayList<>();
                        if (!memMsgRlt.cacheMsgList.isEmpty()) {
                            final StringBuilder strBuffer = new StringBuilder(512);
                            for (ByteBuffer dataBuffer : memMsgRlt.cacheMsgList) {
                                ClientBroker.TransferedMessage transferedMessage =
                                        DataStoreUtils.getTransferMsg(dataBuffer,
                                                dataBuffer.array().length,
                                                countMap, statsKeyBase, strBuffer);
                                if (transferedMessage != null) {
                                    transferedMessageList.add(transferedMessage);
                                }
                            }
                        }
                        GetMessageResult getResult =
                                new GetMessageResult(true, 0, memMsgRlt.errInfo, requestOffset,
                                        memMsgRlt.dltOffset, memMsgRlt.lastRdDataOff,
                                        memMsgRlt.totalMsgSize, countMap, transferedMessageList);
                        getResult.setMaxOffset(maxIndexOffset);
                        return getResult;
                    } else {
                        return new GetMessageResult(false, memMsgRlt.retCode, requestOffset,
                                memMsgRlt.dltOffset, memMsgRlt.errInfo);
                    }
                }
            }
        }
        // before read from file, adjust request's offset.
        long reqNewOffset = Math.max(requestOffset, this.msgFileStore.getIndexMinOffset());
        if (reqSwitch <= 1 && reqNewOffset >= getFileIndexMaxOffset()) {
            return new GetMessageResult(false, TErrCodeConstants.NOT_FOUND,
                    reqNewOffset, 0, "current offset is exceed max file offset");
        }
        maxIndexReadLength = consumerNodeInfo.isFilterConsume()
                ? fileMaxFilterIndexReadSize.get()
                : fileMaxIndexReadSize.get();
        final ByteBuffer indexBuffer = ByteBuffer.allocate(maxIndexReadLength);
        Segment indexRecordView =
                this.msgFileStore.indexSlice(reqNewOffset, maxIndexReadLength);
        if (indexRecordView == null) {
            if (reqNewOffset < this.msgFileStore.getIndexMinOffset()) {
                return new GetMessageResult(false, TErrCodeConstants.MOVED,
                        reqNewOffset, 0, "current offset is exceed min offset!");
            } else {
                return new GetMessageResult(false, TErrCodeConstants.NOT_FOUND,
                        reqNewOffset, 0, "current offset is exceed max offset!");
            }
        }
        indexRecordView.read(indexBuffer, reqNewOffset);
        indexBuffer.flip();
        indexRecordView.relViewRef();
        if ((msgFileStore.getDataHighMaxOffset() - consumerNodeInfo.getLastDataRdOffset() >= this.tubeConfig
                .getDoubleDefaultDeduceReadSize())
                && msgSizeLimit > this.maxAllowRdSize) {
            msgSizeLimit = this.maxAllowRdSize;
        }
        GetMessageResult retResult =
                msgFileStore.getMessages(partitionId,
                        consumerNodeInfo.getLastDataRdOffset(), reqNewOffset,
                        indexBuffer, consumerNodeInfo.isFilterConsume(),
                        consumerNodeInfo.getFilterCondCodeSet(),
                        statsKeyBase, msgSizeLimit, reqRcvTime);
        if (reqSwitch <= 1) {
            retResult.setMaxOffset(getFileIndexMaxOffset());
        } else {
            retResult.setMaxOffset(getIndexMaxOffset());
        }
        if (consumerNodeInfo.isFilterConsume()
                && retResult.isSuccess
                && retResult.getLastReadOffset() > 0) {
            if ((getFileIndexMaxOffset()
                    - reqNewOffset - retResult.getLastReadOffset()) < fileLowReqMaxFilterIndexReadSize.get()) {
                retResult.setSlowFreq(true);
            }
        }
        return retResult;
    }