in store/src/main/java/org/apache/rocketmq/store/kv/CompactionLog.java [448:583]
public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
final int maxMsgNums, final int maxTotalMsgSize) {
readMessageLock.lock();
try {
long beginTime = System.nanoTime();
GetMessageStatus status;
long nextBeginOffset = offset;
long minOffset = 0;
long maxOffset = 0;
GetMessageResult getResult = new GetMessageResult();
final long maxOffsetPy = getLog().getMaxOffset();
SparseConsumeQueue consumeQueue = getCQ();
if (consumeQueue != null) {
minOffset = consumeQueue.getMinOffsetInQueue();
maxOffset = consumeQueue.getMaxOffsetInQueue();
if (maxOffset == 0) {
status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
nextBeginOffset = nextOffsetCorrection(offset, 0);
} else if (offset == maxOffset) {
status = GetMessageStatus.OFFSET_OVERFLOW_ONE;
nextBeginOffset = nextOffsetCorrection(offset, offset);
} else if (offset > maxOffset) {
status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;
if (0 == minOffset) {
nextBeginOffset = nextOffsetCorrection(offset, minOffset);
} else {
nextBeginOffset = nextOffsetCorrection(offset, maxOffset);
}
} else {
long maxPullSize = Math.max(maxTotalMsgSize, 100);
if (maxPullSize > MAX_PULL_MSG_SIZE) {
log.warn("The max pull size is too large maxPullSize={} topic={} queueId={}",
maxPullSize, topic, queueId);
maxPullSize = MAX_PULL_MSG_SIZE;
}
status = GetMessageStatus.NO_MATCHED_MESSAGE;
long maxPhyOffsetPulling = 0;
int cqFileNum = 0;
while (getResult.getBufferTotalSize() <= 0 && nextBeginOffset < maxOffset
&& cqFileNum++ < this.messageStoreConfig.getTravelCqFileNumWhenGetMessage()) {
ReferredIterator<CqUnit> bufferConsumeQueue = consumeQueue.iterateFromOrNext(nextBeginOffset);
if (bufferConsumeQueue == null) {
status = GetMessageStatus.OFFSET_FOUND_NULL;
nextBeginOffset = nextOffsetCorrection(nextBeginOffset, consumeQueue.rollNextFile(nextBeginOffset));
log.warn("consumer request topic:{}, offset:{}, minOffset:{}, maxOffset:{}, "
+ "but access logic queue failed. correct nextBeginOffset to {}",
topic, offset, minOffset, maxOffset, nextBeginOffset);
break;
}
try {
long nextPhyFileStartOffset = Long.MIN_VALUE;
while (bufferConsumeQueue.hasNext() && nextBeginOffset < maxOffset) {
CqUnit cqUnit = bufferConsumeQueue.next();
if (!validateCqUnit(cqUnit)) {
break;
}
long offsetPy = cqUnit.getPos();
int sizePy = cqUnit.getSize();
boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);
if (isTheBatchFull(sizePy, cqUnit.getBatchNum(), maxMsgNums, maxPullSize,
getResult.getBufferTotalSize(), getResult.getMessageCount(), isInDisk)) {
break;
}
if (getResult.getBufferTotalSize() >= maxPullSize) {
break;
}
maxPhyOffsetPulling = offsetPy;
//Be careful, here should before the isTheBatchFull
nextBeginOffset = cqUnit.getQueueOffset() + cqUnit.getBatchNum();
if (nextPhyFileStartOffset != Long.MIN_VALUE) {
if (offsetPy < nextPhyFileStartOffset) {
continue;
}
}
SelectMappedBufferResult selectResult = getMessage(offsetPy, sizePy);
if (null == selectResult) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.MESSAGE_WAS_REMOVING;
}
// nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
nextPhyFileStartOffset = rollNextFile(offsetPy);
continue;
}
this.defaultMessageStore.getStoreStatsService().getGetMessageTransferredMsgCount().add(cqUnit.getBatchNum());
getResult.addMessage(selectResult, cqUnit.getQueueOffset(), cqUnit.getBatchNum());
status = GetMessageStatus.FOUND;
nextPhyFileStartOffset = Long.MIN_VALUE;
}
} finally {
bufferConsumeQueue.release();
}
}
long diff = maxOffsetPy - maxPhyOffsetPulling;
long memory = (long)(StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
getResult.setSuggestPullingFromSlave(diff > memory);
}
} else {
status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
nextBeginOffset = nextOffsetCorrection(offset, 0);
}
if (GetMessageStatus.FOUND == status) {
this.defaultMessageStore.getStoreStatsService().getGetMessageTimesTotalFound().add(getResult.getMessageCount());
} else {
this.defaultMessageStore.getStoreStatsService().getGetMessageTimesTotalMiss().add(getResult.getMessageCount());
}
long elapsedTime = this.defaultMessageStore.getSystemClock().now() - beginTime;
this.defaultMessageStore.getStoreStatsService().setGetMessageEntireTimeMax(elapsedTime);
getResult.setStatus(status);
getResult.setNextBeginOffset(nextBeginOffset);
getResult.setMaxOffset(maxOffset);
getResult.setMinOffset(minOffset);
return getResult;
} finally {
readMessageLock.unlock();
}
}