public GetCacheMsgResult getMessages()

in inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java [195:316]


    public GetCacheMsgResult getMessages(long lstRdDataOffset, long lstRdIndexOffset,
            int maxReadSize, int maxReadCount,
            int partitionId, boolean isSecond,
            boolean isFilterConsume, Set<Integer> filterKeySet,
            long reqRcvTime) {
        // #lizard forgives
        Integer lastWritePos = 0;
        boolean hasMsg = false;
        // judge memory contains the given offset or not.
        List<ByteBuffer> cacheMsgList = new ArrayList<>();
        if (lstRdIndexOffset < this.writeIndexStartPos) {
            return new GetCacheMsgResult(false, TErrCodeConstants.MOVED,
                    lstRdIndexOffset, "Request offset lower than cache minOffset");
        }
        if (lstRdIndexOffset >= this.writeIndexStartPos + this.cacheIndexOffset.get()) {
            return new GetCacheMsgResult(false, TErrCodeConstants.NOT_FOUND,
                    lstRdIndexOffset, "Request offset reached cache maxOffset");
        }
        int totalReadSize = 0;
        int currIndexOffset;
        int currDataOffset;
        long lastDataRdOff = lstRdDataOffset;
        int startReadOff = (int) (lstRdIndexOffset - this.writeIndexStartPos);
        this.writeLock.lock();
        try {
            if (isFilterConsume) {
                // filter conduct. accelerate by keysMap.
                for (Integer keyCode : filterKeySet) {
                    if (keyCode != null) {
                        lastWritePos = this.keysMap.get(keyCode);
                        if ((lastWritePos != null) && (lastWritePos >= startReadOff)) {
                            hasMsg = true;
                            break;
                        }
                    }
                }
            } else {
                // orderly consume by partition id.
                lastWritePos = this.queuesMap.get(partitionId);
                if ((lastWritePos != null) && (lastWritePos >= startReadOff)) {
                    hasMsg = true;
                }
            }
            currDataOffset = this.cacheDataOffset.get();
            currIndexOffset = this.cacheIndexOffset.get();
            lastDataRdOff = this.writeDataStartPos + currDataOffset;
        } finally {
            this.writeLock.unlock();
        }
        int limitReadSize = currIndexOffset - startReadOff;
        // cannot find message, return not found
        if (!hasMsg) {
            if (isSecond && !isFilterConsume) {
                return new GetCacheMsgResult(true, 0, "Ok2",
                        lstRdIndexOffset, limitReadSize, lastDataRdOff, totalReadSize, cacheMsgList);
            } else {
                return new GetCacheMsgResult(false, TErrCodeConstants.NOT_FOUND,
                        "Can't found Message by index!", lstRdIndexOffset,
                        limitReadSize, lastDataRdOff, totalReadSize, cacheMsgList);
            }
        }
        // fetch data by index.
        int readedSize = 0;
        int cPartitionId = 0;
        long cDataPos = 0L;
        int cDataSize = 0;
        int cKeyCode = 0;
        long cTimeRecv = 0L;
        int cDataOffset = 0;
        ByteBuffer tmpIndexRdBuf = this.cachedIndexSegment.asReadOnlyBuffer();
        ByteBuffer tmpDataRdBuf = this.cacheDataSegment.asReadOnlyBuffer();
        // loop read by index
        for (int count = 0; count < maxReadCount; count++, startReadOff += DataStoreUtils.STORE_INDEX_HEAD_LEN) {
            // cannot find matched message, return
            if ((startReadOff >= currIndexOffset)
                    || (startReadOff + DataStoreUtils.STORE_INDEX_HEAD_LEN > currIndexOffset)) {
                break;
            }
            // read index content.
            tmpIndexRdBuf.position(startReadOff);
            cPartitionId = tmpIndexRdBuf.getInt();
            cDataPos = tmpIndexRdBuf.getLong();
            cDataSize = tmpIndexRdBuf.getInt();
            cKeyCode = tmpIndexRdBuf.getInt();
            cTimeRecv = tmpIndexRdBuf.getLong();
            cDataOffset = (int) (cDataPos - this.writeDataStartPos);
            // skip when mismatch condition
            if ((cDataOffset < 0)
                    || (cDataSize <= 0)
                    || (cDataOffset >= currDataOffset)
                    || (cDataSize > ClusterConfigHolder.getMaxMsgSize())
                    || (cDataOffset + cDataSize > currDataOffset)) {
                readedSize += DataStoreUtils.STORE_INDEX_HEAD_LEN;
                continue;
            }
            if ((cPartitionId != partitionId)
                    || (isFilterConsume && (!filterKeySet.contains(cKeyCode)))) {
                readedSize += DataStoreUtils.STORE_INDEX_HEAD_LEN;
                continue;
            }
            if (reqRcvTime != 0 && cTimeRecv < reqRcvTime) {
                continue;
            }
            // read data file.
            byte[] tmpArray = new byte[cDataSize];
            final ByteBuffer buffer = ByteBuffer.wrap(tmpArray);
            tmpDataRdBuf.position(cDataOffset);
            tmpDataRdBuf.get(tmpArray);
            buffer.rewind();
            cacheMsgList.add(buffer);
            lastDataRdOff = cDataPos + cDataSize;
            readedSize += DataStoreUtils.STORE_INDEX_HEAD_LEN;
            totalReadSize += cDataSize;
            // break when exceed the max transfer size.
            if (totalReadSize >= maxReadSize) {
                break;
            }
        }
        // return result
        return new GetCacheMsgResult(true, 0, "Ok1",
                lstRdIndexOffset, readedSize, lastDataRdOff, totalReadSize, cacheMsgList);
    }