in inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/StoreRepairAdmin.java [334:445]
private void createIndexFiles() {
final Segment[] segments = this.segments.getView();
if (segments.length == 0) {
return;
}
Segment curPartSeg = null;
final ByteBuffer dataBuffer =
ByteBuffer.allocate(TBaseConstants.META_MAX_MESSAGE_DATA_SIZE_UPPER_LIMIT);
final ByteBuffer indexBuffer =
ByteBuffer.allocate(DataStoreUtils.STORE_INDEX_HEAD_LEN);
for (Segment curSegment : segments) {
if (curSegment == null) {
continue;
}
try {
long curOffset = 0L;
while (curOffset < curSegment.getCachedSize()) {
dataBuffer.clear();
curSegment.relRead(dataBuffer, curOffset);
dataBuffer.flip();
int dataRealLimit = dataBuffer.limit();
if (dataRealLimit < DataStoreUtils.STORE_DATA_HEADER_LEN) {
break;
}
int dataStart = 0;
while (dataStart < dataRealLimit) {
if (dataRealLimit - dataStart <= DataStoreUtils.STORE_DATA_HEADER_LEN) {
break;
}
// read message fields
final int msgLen =
dataBuffer.getInt(dataStart + DataStoreUtils.STORE_HEADER_POS_LENGTH);
final int msgToken =
dataBuffer.getInt(dataStart + DataStoreUtils.STORE_HEADER_POS_DATATYPE);
final int checkSum =
dataBuffer.getInt(dataStart + DataStoreUtils.STORE_HEADER_POS_CHECKSUM);
final int partitionId =
dataBuffer.getInt(dataStart + DataStoreUtils.STORE_HEADER_POS_QUEUEID);
final long queueOffset =
dataBuffer.getLong(dataStart + DataStoreUtils.STORE_HEADER_POS_QUEUE_LOGICOFF);
final long timeRecv =
dataBuffer.getLong(dataStart + DataStoreUtils.STORE_HEADER_POS_RECEIVEDTIME);
final int keyCode =
dataBuffer.getInt(dataStart + DataStoreUtils.STORE_HEADER_POS_KEYCODE);
final int msgSize = msgLen + 4;
final long msgOffset = queueOffset - queueOffset % DataStoreUtils.STORE_INDEX_HEAD_LEN;
int payLoadLen = msgLen - DataStoreUtils.STORE_DATA_PREFX_LEN;
int payLoadOffset = dataStart + DataStoreUtils.STORE_DATA_HEADER_LEN;
if (msgToken != DataStoreUtils.STORE_DATA_TOKER_BEGIN_VALUE
|| payLoadLen <= 0
|| payLoadLen > DataStoreUtils.STORE_MAX_MESSAGE_STORE_LEN) {
dataStart += 1;
continue;
}
if (payLoadLen > (dataRealLimit
- dataStart - DataStoreUtils.STORE_DATA_HEADER_LEN)) {
break;
}
// check message crc
final byte[] payLoadData = new byte[payLoadLen];
System.arraycopy(dataBuffer.array(), payLoadOffset,
payLoadData, 0, payLoadLen);
if (checkSum != CheckSum.crc32(payLoadData)) {
dataStart += 1;
continue;
}
// build index item
indexBuffer.clear();
indexBuffer.putInt(partitionId);
indexBuffer.putLong(curSegment.getStart()
+ curOffset + dataStart);
indexBuffer.putInt(msgSize);
indexBuffer.putInt(keyCode);
indexBuffer.putLong(timeRecv);
indexBuffer.flip();
dataStart += msgSize;
if (curPartSeg == null) {
File newFile = new File(this.indexDir,
DataStoreUtils.nameFromOffset(msgOffset, DataStoreUtils.INDEX_FILE_SUFFIX));
curPartSeg =
new FileSegment(msgOffset, newFile, SegmentType.INDEX);
}
// append index message
curPartSeg.append(indexBuffer, timeRecv, timeRecv);
if (curPartSeg.getCachedSize() >= maxIndexSegmentSize) {
curPartSeg.flush(true);
curPartSeg.close();
curPartSeg = null;
}
}
curOffset += dataStart;
}
if (curPartSeg != null) {
curPartSeg.flush(true);
}
} catch (Throwable ee) {
logger.error("Create Index file error ", ee);
} finally {
if (curSegment != null) {
curSegment.relViewRef();
}
}
}
try {
if (curPartSeg != null) {
curPartSeg.flush(true);
curPartSeg.close();
}
} catch (Throwable e2) {
logger.error("Close Index file error ", e2);
}
}