in store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java [792:989]
public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
final int maxMsgNums, final int maxTotalMsgSize, final MessageFilter messageFilter) {
if (this.shutdown) {
LOGGER.warn("message store has shutdown, so getMessage is forbidden");
return null;
}
if (!this.runningFlags.isReadable()) {
LOGGER.warn("message store is not readable, so getMessage is forbidden " + this.runningFlags.getFlagBits());
return null;
}
Optional<TopicConfig> topicConfig = getTopicConfig(topic);
CleanupPolicy policy = CleanupPolicyUtils.getDeletePolicy(topicConfig);
//check request topic flag
if (Objects.equals(policy, CleanupPolicy.COMPACTION) && messageStoreConfig.isEnableCompaction()) {
return compactionStore.getMessage(group, topic, queueId, offset, maxMsgNums, maxTotalMsgSize);
} // else skip
long beginTime = this.getSystemClock().now();
GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
long nextBeginOffset = offset;
long minOffset = 0;
long maxOffset = 0;
GetMessageResult getResult = new GetMessageResult();
int filterMessageCount = 0;
final long maxOffsetPy = this.commitLog.getMaxOffset();
ConsumeQueueInterface consumeQueue = findConsumeQueue(topic, queueId);
if (consumeQueue != null) {
minOffset = consumeQueue.getMinOffsetInQueue();
maxOffset = consumeQueue.getMaxOffsetInQueue();
if (maxOffset == 0) {
status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
nextBeginOffset = nextOffsetCorrection(offset, 0);
} else if (offset < minOffset) {
status = GetMessageStatus.OFFSET_TOO_SMALL;
nextBeginOffset = nextOffsetCorrection(offset, minOffset);
} else if (offset == maxOffset) {
status = GetMessageStatus.OFFSET_OVERFLOW_ONE;
nextBeginOffset = nextOffsetCorrection(offset, offset);
} else if (offset > maxOffset) {
status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;
nextBeginOffset = nextOffsetCorrection(offset, maxOffset);
} else {
final int maxFilterMessageSize = Math.max(this.messageStoreConfig.getMaxFilterMessageSize(), maxMsgNums * consumeQueue.getUnitSize());
final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();
long maxPullSize = Math.max(maxTotalMsgSize, 100);
if (maxPullSize > MAX_PULL_MSG_SIZE) {
LOGGER.warn("The max pull size is too large maxPullSize={} topic={} queueId={}", maxPullSize, topic, queueId);
maxPullSize = MAX_PULL_MSG_SIZE;
}
status = GetMessageStatus.NO_MATCHED_MESSAGE;
long maxPhyOffsetPulling = 0;
int cqFileNum = 0;
while (getResult.getBufferTotalSize() <= 0
&& nextBeginOffset < maxOffset
&& cqFileNum++ < this.messageStoreConfig.getTravelCqFileNumWhenGetMessage()) {
ReferredIterator<CqUnit> bufferConsumeQueue = null;
try {
bufferConsumeQueue = consumeQueue.iterateFrom(nextBeginOffset, maxMsgNums);
if (bufferConsumeQueue == null) {
status = GetMessageStatus.OFFSET_FOUND_NULL;
nextBeginOffset = nextOffsetCorrection(nextBeginOffset, this.consumeQueueStore.rollNextFile(consumeQueue, nextBeginOffset));
LOGGER.warn("consumer request topic: " + topic + ", offset: " + offset + ", minOffset: " + minOffset + ", maxOffset: "
+ maxOffset + ", but access logic queue failed. Correct nextBeginOffset to " + nextBeginOffset);
break;
}
long nextPhyFileStartOffset = Long.MIN_VALUE;
while (bufferConsumeQueue.hasNext()
&& nextBeginOffset < maxOffset) {
CqUnit cqUnit = bufferConsumeQueue.next();
long offsetPy = cqUnit.getPos();
int sizePy = cqUnit.getSize();
boolean isInMem = estimateInMemByCommitOffset(offsetPy, maxOffsetPy);
if ((cqUnit.getQueueOffset() - offset) * consumeQueue.getUnitSize() >= maxFilterMessageSize) {
break;
}
if (this.isTheBatchFull(sizePy, cqUnit.getBatchNum(), maxMsgNums, maxPullSize, getResult.getBufferTotalSize(), getResult.getMessageCount(), isInMem)) {
break;
}
if (getResult.getBufferTotalSize() >= maxPullSize) {
break;
}
maxPhyOffsetPulling = offsetPy;
//Be careful, here should before the isTheBatchFull
nextBeginOffset = cqUnit.getQueueOffset() + cqUnit.getBatchNum();
if (nextPhyFileStartOffset != Long.MIN_VALUE) {
if (offsetPy < nextPhyFileStartOffset) {
continue;
}
}
if (messageFilter != null
&& !messageFilter.isMatchedByConsumeQueue(cqUnit.getValidTagsCodeAsLong(), cqUnit.getCqExtUnit())) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
}
continue;
}
SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
if (null == selectResult) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.MESSAGE_WAS_REMOVING;
}
nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
continue;
}
if (messageStoreConfig.isColdDataFlowControlEnable() && !MixAll.isSysConsumerGroupPullMessage(group) && !selectResult.isInCache()) {
getResult.setColdDataSum(getResult.getColdDataSum() + sizePy);
}
if (messageFilter != null
&& !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
}
// release...
selectResult.release();
filterMessageCount++;
continue;
}
this.storeStatsService.getGetMessageTransferredMsgCount().add(cqUnit.getBatchNum());
getResult.addMessage(selectResult, cqUnit.getQueueOffset(), cqUnit.getBatchNum());
status = GetMessageStatus.FOUND;
nextPhyFileStartOffset = Long.MIN_VALUE;
}
} catch (RocksDBException e) {
ERROR_LOG.error("getMessage Failed. cid: {}, topic: {}, queueId: {}, offset: {}, minOffset: {}, maxOffset: {}, {}",
group, topic, queueId, offset, minOffset, maxOffset, e.getMessage());
} finally {
if (bufferConsumeQueue != null) {
bufferConsumeQueue.release();
}
}
}
if (diskFallRecorded) {
long fallBehind = maxOffsetPy - maxPhyOffsetPulling;
brokerStatsManager.recordDiskFallBehindSize(group, topic, queueId, fallBehind);
}
long diff = maxOffsetPy - maxPhyOffsetPulling;
long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
* (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
getResult.setSuggestPullingFromSlave(diff > memory);
}
} else {
status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
nextBeginOffset = nextOffsetCorrection(offset, 0);
}
if (GetMessageStatus.FOUND == status) {
this.storeStatsService.getGetMessageTimesTotalFound().add(1);
} else {
this.storeStatsService.getGetMessageTimesTotalMiss().add(1);
}
if (this.messageStoreConfig.isDiskFallRecorded() && GetMessageStatus.OFFSET_OVERFLOW_ONE == status) {
brokerStatsManager.recordDiskFallBehindSize(group, topic, queueId, 0);
brokerStatsManager.recordDiskFallBehindTime(group, topic, queueId, 0);
}
long elapsedTime = this.getSystemClock().now() - beginTime;
this.storeStatsService.setGetMessageEntireTimeMax(elapsedTime);
// lazy init no data found.
if (getResult == null) {
getResult = new GetMessageResult(0);
}
getResult.setStatus(status);
getResult.setNextBeginOffset(nextBeginOffset);
getResult.setMaxOffset(maxOffset);
getResult.setMinOffset(minOffset);
getResult.setFilterMessageCount(filterMessageCount);
return getResult;
}