in inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java [295:439]
public GetMessageResult getMessages(int partitionId, long lastRdOffset,
long reqOffset, ByteBuffer indexBuffer,
boolean isFilterConsume,
Set<Integer> filterKeySet,
String statsKeyBase,
int maxMsgTransferSize,
long reqRcvTime) {
// #lizard forgives
// Orderly read from index file, then random read from data file.
int retCode = 0;
int totalSize = 0;
String errInfo = "Ok";
boolean result = true;
int dataRealLimit = 0;
int curIndexOffset = 0;
int readedOffset = 0;
Segment recordSeg = null;
int curIndexPartitionId = 0;
long curIndexDataOffset = 0L;
int curIndexDataSize = 0;
int curIndexKeyCode = 0;
long recvTimeInMillsec = 0L;
long maxDataLimitOffset = 0L;
long lastRdDataOffset = 0L;
final StringBuilder sBuilder = new StringBuilder(512);
final long curDataMaxOffset = getDataMaxOffset();
final long curDataMinOffset = getDataMinOffset();
HashMap<String, TrafficInfo> countMap = new HashMap<>();
ByteBuffer dataBuffer =
ByteBuffer.allocate(TServerConstants.CFG_STORE_DEFAULT_MSG_READ_UNIT);
List<ClientBroker.TransferedMessage> transferedMessageList =
new ArrayList<>();
// read data file by index.
for (curIndexOffset = 0; curIndexOffset < indexBuffer.remaining(); curIndexOffset +=
DataStoreUtils.STORE_INDEX_HEAD_LEN) {
curIndexPartitionId = indexBuffer.getInt();
curIndexDataOffset = indexBuffer.getLong();
curIndexDataSize = indexBuffer.getInt();
curIndexKeyCode = indexBuffer.getInt();
recvTimeInMillsec = indexBuffer.getLong();
maxDataLimitOffset = curIndexDataOffset + curIndexDataSize;
// skip when mismatch condition
if (curIndexDataOffset < 0
|| curIndexDataSize <= 0
|| curIndexDataSize > DataStoreUtils.STORE_MAX_MESSAGE_STORE_LEN
|| curIndexDataOffset < curDataMinOffset) {
readedOffset = curIndexOffset + DataStoreUtils.STORE_INDEX_HEAD_LEN;
continue;
}
// read finish, then return.
if (curIndexDataOffset >= curDataMaxOffset
|| maxDataLimitOffset > curDataMaxOffset) {
lastRdDataOffset = curIndexDataOffset;
break;
}
// conduct filter operation.
if (curIndexPartitionId != partitionId
|| (isFilterConsume
&& !filterKeySet.contains(curIndexKeyCode))) {
lastRdDataOffset = maxDataLimitOffset;
readedOffset = curIndexOffset + DataStoreUtils.STORE_INDEX_HEAD_LEN;
continue;
}
if (reqRcvTime != 0 && recvTimeInMillsec < reqRcvTime) {
continue;
}
try {
// get data from data file by index one by one.
if (recordSeg == null
|| !((curIndexDataOffset >= recordSeg.getStart())
&& (maxDataLimitOffset <= recordSeg.getStart() + recordSeg.getCommitSize()))) {
if (recordSeg != null) {
recordSeg.relViewRef();
recordSeg = null;
}
recordSeg = dataSegments.getRecordSeg(curIndexDataOffset);
if (recordSeg == null) {
continue;
}
if (this.closed.get()) {
throw new Exception("Read Service has closed!");
}
}
if (dataBuffer.capacity() < curIndexDataSize) {
dataBuffer = ByteBuffer.allocate(curIndexDataSize);
}
dataBuffer.clear();
dataBuffer.limit(curIndexDataSize);
recordSeg.read(dataBuffer, curIndexDataOffset);
dataBuffer.flip();
dataRealLimit = dataBuffer.limit();
if (dataRealLimit < curIndexDataSize) {
lastRdDataOffset = curIndexDataOffset;
readedOffset = curIndexOffset + DataStoreUtils.STORE_INDEX_HEAD_LEN;
continue;
}
} catch (Throwable e2) {
if (e2 instanceof IOException) {
ServiceStatusHolder.addReadIOErrCnt();
BrokerSrvStatsHolder.incDiskIOExcCnt();
}
samplePrintCtrl.printExceptionCaught(e2,
messageStore.getStoreKey(), String.valueOf(partitionId));
retCode = TErrCodeConstants.INTERNAL_SERVER_ERROR;
sBuilder.delete(0, sBuilder.length());
errInfo = sBuilder.append("Get message from file failure : ")
.append(e2.getCause()).toString();
sBuilder.delete(0, sBuilder.length());
result = false;
break;
}
// build query result.
readedOffset = curIndexOffset + DataStoreUtils.STORE_INDEX_HEAD_LEN;
lastRdDataOffset = maxDataLimitOffset;
ClientBroker.TransferedMessage transferedMessage =
DataStoreUtils.getTransferMsg(dataBuffer,
curIndexDataSize, countMap, statsKeyBase, sBuilder);
if (transferedMessage == null) {
continue;
}
transferedMessageList.add(transferedMessage);
totalSize += curIndexDataSize;
// break when exceed the max transfer size.
if (totalSize >= maxMsgTransferSize) {
break;
}
}
// release resource
if (recordSeg != null) {
recordSeg.relViewRef();
}
if (retCode != 0) {
if (!transferedMessageList.isEmpty()) {
retCode = 0;
errInfo = "Ok";
}
}
if (lastRdDataOffset <= 0L) {
lastRdDataOffset = lastRdOffset;
}
// return result.
return new GetMessageResult(result, retCode, errInfo,
reqOffset, readedOffset, lastRdDataOffset,
totalSize, countMap, transferedMessageList);
}