in inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/mem/MsgMemStore.java [195:316]
public GetCacheMsgResult getMessages(long lstRdDataOffset, long lstRdIndexOffset,
int maxReadSize, int maxReadCount,
int partitionId, boolean isSecond,
boolean isFilterConsume, Set<Integer> filterKeySet,
long reqRcvTime) {
// #lizard forgives
Integer lastWritePos = 0;
boolean hasMsg = false;
// judge memory contains the given offset or not.
List<ByteBuffer> cacheMsgList = new ArrayList<>();
if (lstRdIndexOffset < this.writeIndexStartPos) {
return new GetCacheMsgResult(false, TErrCodeConstants.MOVED,
lstRdIndexOffset, "Request offset lower than cache minOffset");
}
if (lstRdIndexOffset >= this.writeIndexStartPos + this.cacheIndexOffset.get()) {
return new GetCacheMsgResult(false, TErrCodeConstants.NOT_FOUND,
lstRdIndexOffset, "Request offset reached cache maxOffset");
}
int totalReadSize = 0;
int currIndexOffset;
int currDataOffset;
long lastDataRdOff = lstRdDataOffset;
int startReadOff = (int) (lstRdIndexOffset - this.writeIndexStartPos);
this.writeLock.lock();
try {
if (isFilterConsume) {
// filter conduct. accelerate by keysMap.
for (Integer keyCode : filterKeySet) {
if (keyCode != null) {
lastWritePos = this.keysMap.get(keyCode);
if ((lastWritePos != null) && (lastWritePos >= startReadOff)) {
hasMsg = true;
break;
}
}
}
} else {
// orderly consume by partition id.
lastWritePos = this.queuesMap.get(partitionId);
if ((lastWritePos != null) && (lastWritePos >= startReadOff)) {
hasMsg = true;
}
}
currDataOffset = this.cacheDataOffset.get();
currIndexOffset = this.cacheIndexOffset.get();
lastDataRdOff = this.writeDataStartPos + currDataOffset;
} finally {
this.writeLock.unlock();
}
int limitReadSize = currIndexOffset - startReadOff;
// cannot find message, return not found
if (!hasMsg) {
if (isSecond && !isFilterConsume) {
return new GetCacheMsgResult(true, 0, "Ok2",
lstRdIndexOffset, limitReadSize, lastDataRdOff, totalReadSize, cacheMsgList);
} else {
return new GetCacheMsgResult(false, TErrCodeConstants.NOT_FOUND,
"Can't found Message by index!", lstRdIndexOffset,
limitReadSize, lastDataRdOff, totalReadSize, cacheMsgList);
}
}
// fetch data by index.
int readedSize = 0;
int cPartitionId = 0;
long cDataPos = 0L;
int cDataSize = 0;
int cKeyCode = 0;
long cTimeRecv = 0L;
int cDataOffset = 0;
ByteBuffer tmpIndexRdBuf = this.cachedIndexSegment.asReadOnlyBuffer();
ByteBuffer tmpDataRdBuf = this.cacheDataSegment.asReadOnlyBuffer();
// loop read by index
for (int count = 0; count < maxReadCount; count++, startReadOff += DataStoreUtils.STORE_INDEX_HEAD_LEN) {
// cannot find matched message, return
if ((startReadOff >= currIndexOffset)
|| (startReadOff + DataStoreUtils.STORE_INDEX_HEAD_LEN > currIndexOffset)) {
break;
}
// read index content.
tmpIndexRdBuf.position(startReadOff);
cPartitionId = tmpIndexRdBuf.getInt();
cDataPos = tmpIndexRdBuf.getLong();
cDataSize = tmpIndexRdBuf.getInt();
cKeyCode = tmpIndexRdBuf.getInt();
cTimeRecv = tmpIndexRdBuf.getLong();
cDataOffset = (int) (cDataPos - this.writeDataStartPos);
// skip when mismatch condition
if ((cDataOffset < 0)
|| (cDataSize <= 0)
|| (cDataOffset >= currDataOffset)
|| (cDataSize > ClusterConfigHolder.getMaxMsgSize())
|| (cDataOffset + cDataSize > currDataOffset)) {
readedSize += DataStoreUtils.STORE_INDEX_HEAD_LEN;
continue;
}
if ((cPartitionId != partitionId)
|| (isFilterConsume && (!filterKeySet.contains(cKeyCode)))) {
readedSize += DataStoreUtils.STORE_INDEX_HEAD_LEN;
continue;
}
if (reqRcvTime != 0 && cTimeRecv < reqRcvTime) {
continue;
}
// read data file.
byte[] tmpArray = new byte[cDataSize];
final ByteBuffer buffer = ByteBuffer.wrap(tmpArray);
tmpDataRdBuf.position(cDataOffset);
tmpDataRdBuf.get(tmpArray);
buffer.rewind();
cacheMsgList.add(buffer);
lastDataRdOff = cDataPos + cDataSize;
readedSize += DataStoreUtils.STORE_INDEX_HEAD_LEN;
totalReadSize += cDataSize;
// break when exceed the max transfer size.
if (totalReadSize >= maxReadSize) {
break;
}
}
// return result
return new GetCacheMsgResult(true, 0, "Ok1",
lstRdIndexOffset, readedSize, lastDataRdOff, totalReadSize, cacheMsgList);
}