in inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStore.java [188:323]
public GetMessageResult getMessages(int reqSwitch, long requestOffset,
int partitionId, ConsumerNodeInfo consumerNodeInfo,
String statsKeyBase, int msgSizeLimit,
long reqRcvTime) throws IOException {
// #lizard forgives
if (this.closed.get()) {
throw new IllegalStateException(new StringBuilder(512)
.append("[Data Store] Closed MessageStore for storeKey ")
.append(this.storeKey).toString());
}
int result = 0;
boolean inMemCache = false;
int maxIndexReadLength = memMaxIndexReadCnt.get();
GetCacheMsgResult memMsgRlt = new GetCacheMsgResult(false, TErrCodeConstants.NOT_FOUND,
requestOffset, "Can't found Message by index in cache");
// determine position to read.
reqSwitch = (reqSwitch <= 0)
? 0
: (consumerNodeInfo.isFilterConsume() ? (reqSwitch % 100) : (reqSwitch / 100));
if (tubeConfig.isEnableMemStore()) {
if (reqSwitch > 1) {
// in read memory situation, read main memory or backup memory by consumer's config.
long maxIndexOffset = TBaseConstants.META_VALUE_UNDEFINED;
if (requestOffset >= this.msgFileStore.getIndexMaxOffset()) {
this.writeCacheMutex.readLock().lock();
try {
maxIndexOffset = this.msgMemStore.getIndexLastWritePos();
result = this.msgMemStoreBeingFlush.isOffsetInHold(requestOffset);
if (result >= 0) {
inMemCache = true;
if (result > 0) {
if (reqSwitch > 2) {
memMsgRlt =
// read from main memory.
msgMemStore.getMessages(consumerNodeInfo.getLastDataRdOffset(),
requestOffset, msgStoreMgr.getMaxMsgTransferSize(),
maxIndexReadLength, partitionId, false,
consumerNodeInfo.isFilterConsume(),
consumerNodeInfo.getFilterCondCodeSet(), reqRcvTime);
}
} else {
// read from backup memory.
memMsgRlt =
msgMemStoreBeingFlush.getMessages(consumerNodeInfo.getLastDataRdOffset(),
requestOffset, msgStoreMgr.getMaxMsgTransferSize(),
maxIndexReadLength, partitionId, true,
consumerNodeInfo.isFilterConsume(),
consumerNodeInfo.getFilterCondCodeSet(), reqRcvTime);
}
}
} finally {
this.writeCacheMutex.readLock().unlock();
}
}
if (inMemCache) {
// return not found when data is under memory sink operation.
if (memMsgRlt.isSuccess) {
HashMap<String, TrafficInfo> countMap =
new HashMap<>();
List<ClientBroker.TransferedMessage> transferedMessageList =
new ArrayList<>();
if (!memMsgRlt.cacheMsgList.isEmpty()) {
final StringBuilder strBuffer = new StringBuilder(512);
for (ByteBuffer dataBuffer : memMsgRlt.cacheMsgList) {
ClientBroker.TransferedMessage transferedMessage =
DataStoreUtils.getTransferMsg(dataBuffer,
dataBuffer.array().length,
countMap, statsKeyBase, strBuffer);
if (transferedMessage != null) {
transferedMessageList.add(transferedMessage);
}
}
}
GetMessageResult getResult =
new GetMessageResult(true, 0, memMsgRlt.errInfo, requestOffset,
memMsgRlt.dltOffset, memMsgRlt.lastRdDataOff,
memMsgRlt.totalMsgSize, countMap, transferedMessageList);
getResult.setMaxOffset(maxIndexOffset);
return getResult;
} else {
return new GetMessageResult(false, memMsgRlt.retCode, requestOffset,
memMsgRlt.dltOffset, memMsgRlt.errInfo);
}
}
}
}
// before read from file, adjust request's offset.
long reqNewOffset = Math.max(requestOffset, this.msgFileStore.getIndexMinOffset());
if (reqSwitch <= 1 && reqNewOffset >= getFileIndexMaxOffset()) {
return new GetMessageResult(false, TErrCodeConstants.NOT_FOUND,
reqNewOffset, 0, "current offset is exceed max file offset");
}
maxIndexReadLength = consumerNodeInfo.isFilterConsume()
? fileMaxFilterIndexReadSize.get()
: fileMaxIndexReadSize.get();
final ByteBuffer indexBuffer = ByteBuffer.allocate(maxIndexReadLength);
Segment indexRecordView =
this.msgFileStore.indexSlice(reqNewOffset, maxIndexReadLength);
if (indexRecordView == null) {
if (reqNewOffset < this.msgFileStore.getIndexMinOffset()) {
return new GetMessageResult(false, TErrCodeConstants.MOVED,
reqNewOffset, 0, "current offset is exceed min offset!");
} else {
return new GetMessageResult(false, TErrCodeConstants.NOT_FOUND,
reqNewOffset, 0, "current offset is exceed max offset!");
}
}
indexRecordView.read(indexBuffer, reqNewOffset);
indexBuffer.flip();
indexRecordView.relViewRef();
if ((msgFileStore.getDataHighMaxOffset() - consumerNodeInfo.getLastDataRdOffset() >= this.tubeConfig
.getDoubleDefaultDeduceReadSize())
&& msgSizeLimit > this.maxAllowRdSize) {
msgSizeLimit = this.maxAllowRdSize;
}
GetMessageResult retResult =
msgFileStore.getMessages(partitionId,
consumerNodeInfo.getLastDataRdOffset(), reqNewOffset,
indexBuffer, consumerNodeInfo.isFilterConsume(),
consumerNodeInfo.getFilterCondCodeSet(),
statsKeyBase, msgSizeLimit, reqRcvTime);
if (reqSwitch <= 1) {
retResult.setMaxOffset(getFileIndexMaxOffset());
} else {
retResult.setMaxOffset(getIndexMaxOffset());
}
if (consumerNodeInfo.isFilterConsume()
&& retResult.isSuccess
&& retResult.getLastReadOffset() > 0) {
if ((getFileIndexMaxOffset()
- reqNewOffset - retResult.getLastReadOffset()) < fileLowReqMaxFilterIndexReadSize.get()) {
retResult.setSlowFreq(true);
}
}
return retResult;
}