public GetMessageResult getMessage()

in store/src/main/java/org/apache/rocketmq/store/kv/CompactionLog.java [448:583]


    public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
        final int maxMsgNums, final int maxTotalMsgSize) {
        readMessageLock.lock();
        try {
            long beginTime = System.nanoTime();

            GetMessageStatus status;
            long nextBeginOffset = offset;
            long minOffset = 0;
            long maxOffset = 0;

            GetMessageResult getResult = new GetMessageResult();

            final long maxOffsetPy = getLog().getMaxOffset();

            SparseConsumeQueue consumeQueue = getCQ();
            if (consumeQueue != null) {
                minOffset = consumeQueue.getMinOffsetInQueue();
                maxOffset = consumeQueue.getMaxOffsetInQueue();

                if (maxOffset == 0) {
                    status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
                    nextBeginOffset = nextOffsetCorrection(offset, 0);
                } else if (offset == maxOffset) {
                    status = GetMessageStatus.OFFSET_OVERFLOW_ONE;
                    nextBeginOffset = nextOffsetCorrection(offset, offset);
                } else if (offset > maxOffset) {
                    status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;
                    if (0 == minOffset) {
                        nextBeginOffset = nextOffsetCorrection(offset, minOffset);
                    } else {
                        nextBeginOffset = nextOffsetCorrection(offset, maxOffset);
                    }
                } else {

                    long maxPullSize = Math.max(maxTotalMsgSize, 100);
                    if (maxPullSize > MAX_PULL_MSG_SIZE) {
                        log.warn("The max pull size is too large maxPullSize={} topic={} queueId={}",
                            maxPullSize, topic, queueId);
                        maxPullSize = MAX_PULL_MSG_SIZE;
                    }
                    status = GetMessageStatus.NO_MATCHED_MESSAGE;
                    long maxPhyOffsetPulling = 0;
                    int cqFileNum = 0;

                    while (getResult.getBufferTotalSize() <= 0 && nextBeginOffset < maxOffset
                        && cqFileNum++ < this.messageStoreConfig.getTravelCqFileNumWhenGetMessage()) {
                        ReferredIterator<CqUnit> bufferConsumeQueue = consumeQueue.iterateFromOrNext(nextBeginOffset);

                        if (bufferConsumeQueue == null) {
                            status = GetMessageStatus.OFFSET_FOUND_NULL;
                            nextBeginOffset = nextOffsetCorrection(nextBeginOffset, consumeQueue.rollNextFile(nextBeginOffset));
                            log.warn("consumer request topic:{}, offset:{}, minOffset:{}, maxOffset:{}, "
                                    + "but access logic queue failed. correct nextBeginOffset to {}",
                                topic, offset, minOffset, maxOffset, nextBeginOffset);
                            break;
                        }

                        try {
                            long nextPhyFileStartOffset = Long.MIN_VALUE;
                            while (bufferConsumeQueue.hasNext() && nextBeginOffset < maxOffset) {
                                CqUnit cqUnit = bufferConsumeQueue.next();
                                if (!validateCqUnit(cqUnit)) {
                                    break;
                                }
                                long offsetPy = cqUnit.getPos();
                                int sizePy = cqUnit.getSize();

                                boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);

                                if (isTheBatchFull(sizePy, cqUnit.getBatchNum(), maxMsgNums, maxPullSize,
                                    getResult.getBufferTotalSize(), getResult.getMessageCount(), isInDisk)) {
                                    break;
                                }

                                if (getResult.getBufferTotalSize() >= maxPullSize) {
                                    break;
                                }

                                maxPhyOffsetPulling = offsetPy;

                                //Be careful, here should before the isTheBatchFull
                                nextBeginOffset = cqUnit.getQueueOffset() + cqUnit.getBatchNum();

                                if (nextPhyFileStartOffset != Long.MIN_VALUE) {
                                    if (offsetPy < nextPhyFileStartOffset) {
                                        continue;
                                    }
                                }

                                SelectMappedBufferResult selectResult = getMessage(offsetPy, sizePy);
                                if (null == selectResult) {
                                    if (getResult.getBufferTotalSize() == 0) {
                                        status = GetMessageStatus.MESSAGE_WAS_REMOVING;
                                    }

                                    // nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
                                    nextPhyFileStartOffset = rollNextFile(offsetPy);
                                    continue;
                                }
                                this.defaultMessageStore.getStoreStatsService().getGetMessageTransferredMsgCount().add(cqUnit.getBatchNum());
                                getResult.addMessage(selectResult, cqUnit.getQueueOffset(), cqUnit.getBatchNum());
                                status = GetMessageStatus.FOUND;
                                nextPhyFileStartOffset = Long.MIN_VALUE;
                            }
                        } finally {
                            bufferConsumeQueue.release();
                        }
                    }

                    long diff = maxOffsetPy - maxPhyOffsetPulling;
                    long memory = (long)(StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
                    getResult.setSuggestPullingFromSlave(diff > memory);
                }
            } else {
                status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
                nextBeginOffset = nextOffsetCorrection(offset, 0);
            }

            if (GetMessageStatus.FOUND == status) {
                this.defaultMessageStore.getStoreStatsService().getGetMessageTimesTotalFound().add(getResult.getMessageCount());
            } else {
                this.defaultMessageStore.getStoreStatsService().getGetMessageTimesTotalMiss().add(getResult.getMessageCount());
            }
            long elapsedTime = this.defaultMessageStore.getSystemClock().now() - beginTime;
            this.defaultMessageStore.getStoreStatsService().setGetMessageEntireTimeMax(elapsedTime);

            getResult.setStatus(status);
            getResult.setNextBeginOffset(nextBeginOffset);
            getResult.setMaxOffset(maxOffset);
            getResult.setMinOffset(minOffset);
            return getResult;
        } finally {
            readMessageLock.unlock();
        }
    }