public GetMessageResult getMessages()

in inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java [295:439]


    public GetMessageResult getMessages(int partitionId, long lastRdOffset,
            long reqOffset, ByteBuffer indexBuffer,
            boolean isFilterConsume,
            Set<Integer> filterKeySet,
            String statsKeyBase,
            int maxMsgTransferSize,
            long reqRcvTime) {
        // #lizard forgives
        // Orderly read from index file, then random read from data file.
        int retCode = 0;
        int totalSize = 0;
        String errInfo = "Ok";
        boolean result = true;
        int dataRealLimit = 0;
        int curIndexOffset = 0;
        int readedOffset = 0;
        Segment recordSeg = null;
        int curIndexPartitionId = 0;
        long curIndexDataOffset = 0L;
        int curIndexDataSize = 0;
        int curIndexKeyCode = 0;
        long recvTimeInMillsec = 0L;
        long maxDataLimitOffset = 0L;
        long lastRdDataOffset = 0L;
        final StringBuilder sBuilder = new StringBuilder(512);
        final long curDataMaxOffset = getDataMaxOffset();
        final long curDataMinOffset = getDataMinOffset();
        HashMap<String, TrafficInfo> countMap = new HashMap<>();
        ByteBuffer dataBuffer =
                ByteBuffer.allocate(TServerConstants.CFG_STORE_DEFAULT_MSG_READ_UNIT);
        List<ClientBroker.TransferedMessage> transferedMessageList =
                new ArrayList<>();
        // read data file by index.
        for (curIndexOffset = 0; curIndexOffset < indexBuffer.remaining(); curIndexOffset +=
                DataStoreUtils.STORE_INDEX_HEAD_LEN) {
            curIndexPartitionId = indexBuffer.getInt();
            curIndexDataOffset = indexBuffer.getLong();
            curIndexDataSize = indexBuffer.getInt();
            curIndexKeyCode = indexBuffer.getInt();
            recvTimeInMillsec = indexBuffer.getLong();
            maxDataLimitOffset = curIndexDataOffset + curIndexDataSize;
            // skip when mismatch condition
            if (curIndexDataOffset < 0
                    || curIndexDataSize <= 0
                    || curIndexDataSize > DataStoreUtils.STORE_MAX_MESSAGE_STORE_LEN
                    || curIndexDataOffset < curDataMinOffset) {
                readedOffset = curIndexOffset + DataStoreUtils.STORE_INDEX_HEAD_LEN;
                continue;
            }
            // read finish, then return.
            if (curIndexDataOffset >= curDataMaxOffset
                    || maxDataLimitOffset > curDataMaxOffset) {
                lastRdDataOffset = curIndexDataOffset;
                break;
            }
            // conduct filter operation.
            if (curIndexPartitionId != partitionId
                    || (isFilterConsume
                            && !filterKeySet.contains(curIndexKeyCode))) {
                lastRdDataOffset = maxDataLimitOffset;
                readedOffset = curIndexOffset + DataStoreUtils.STORE_INDEX_HEAD_LEN;
                continue;
            }
            if (reqRcvTime != 0 && recvTimeInMillsec < reqRcvTime) {
                continue;
            }
            try {
                // get data from data file by index one by one.
                if (recordSeg == null
                        || !((curIndexDataOffset >= recordSeg.getStart())
                                && (maxDataLimitOffset <= recordSeg.getStart() + recordSeg.getCommitSize()))) {
                    if (recordSeg != null) {
                        recordSeg.relViewRef();
                        recordSeg = null;
                    }
                    recordSeg = dataSegments.getRecordSeg(curIndexDataOffset);
                    if (recordSeg == null) {
                        continue;
                    }
                    if (this.closed.get()) {
                        throw new Exception("Read Service has closed!");
                    }
                }
                if (dataBuffer.capacity() < curIndexDataSize) {
                    dataBuffer = ByteBuffer.allocate(curIndexDataSize);
                }
                dataBuffer.clear();
                dataBuffer.limit(curIndexDataSize);
                recordSeg.read(dataBuffer, curIndexDataOffset);
                dataBuffer.flip();
                dataRealLimit = dataBuffer.limit();
                if (dataRealLimit < curIndexDataSize) {
                    lastRdDataOffset = curIndexDataOffset;
                    readedOffset = curIndexOffset + DataStoreUtils.STORE_INDEX_HEAD_LEN;
                    continue;
                }
            } catch (Throwable e2) {
                if (e2 instanceof IOException) {
                    ServiceStatusHolder.addReadIOErrCnt();
                    BrokerSrvStatsHolder.incDiskIOExcCnt();
                }
                samplePrintCtrl.printExceptionCaught(e2,
                        messageStore.getStoreKey(), String.valueOf(partitionId));
                retCode = TErrCodeConstants.INTERNAL_SERVER_ERROR;
                sBuilder.delete(0, sBuilder.length());
                errInfo = sBuilder.append("Get message from file failure : ")
                        .append(e2.getCause()).toString();
                sBuilder.delete(0, sBuilder.length());
                result = false;
                break;
            }
            // build query result.
            readedOffset = curIndexOffset + DataStoreUtils.STORE_INDEX_HEAD_LEN;
            lastRdDataOffset = maxDataLimitOffset;
            ClientBroker.TransferedMessage transferedMessage =
                    DataStoreUtils.getTransferMsg(dataBuffer,
                            curIndexDataSize, countMap, statsKeyBase, sBuilder);
            if (transferedMessage == null) {
                continue;
            }
            transferedMessageList.add(transferedMessage);
            totalSize += curIndexDataSize;
            // break when exceed the max transfer size.
            if (totalSize >= maxMsgTransferSize) {
                break;
            }
        }
        // release resource
        if (recordSeg != null) {
            recordSeg.relViewRef();
        }
        if (retCode != 0) {
            if (!transferedMessageList.isEmpty()) {
                retCode = 0;
                errInfo = "Ok";
            }
        }
        if (lastRdDataOffset <= 0L) {
            lastRdDataOffset = lastRdOffset;
        }
        // return result.
        return new GetMessageResult(result, retCode, errInfo,
                reqOffset, readedOffset, lastRdDataOffset,
                totalSize, countMap, transferedMessageList);
    }