private void createIndexFiles()

in inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/StoreRepairAdmin.java [334:445]


        private void createIndexFiles() {
            final Segment[] segments = this.segments.getView();
            if (segments.length == 0) {
                return;
            }
            Segment curPartSeg = null;
            final ByteBuffer dataBuffer =
                    ByteBuffer.allocate(TBaseConstants.META_MAX_MESSAGE_DATA_SIZE_UPPER_LIMIT);
            final ByteBuffer indexBuffer =
                    ByteBuffer.allocate(DataStoreUtils.STORE_INDEX_HEAD_LEN);
            for (Segment curSegment : segments) {
                if (curSegment == null) {
                    continue;
                }
                try {
                    long curOffset = 0L;
                    while (curOffset < curSegment.getCachedSize()) {
                        dataBuffer.clear();
                        curSegment.relRead(dataBuffer, curOffset);
                        dataBuffer.flip();
                        int dataRealLimit = dataBuffer.limit();
                        if (dataRealLimit < DataStoreUtils.STORE_DATA_HEADER_LEN) {
                            break;
                        }
                        int dataStart = 0;
                        while (dataStart < dataRealLimit) {
                            if (dataRealLimit - dataStart <= DataStoreUtils.STORE_DATA_HEADER_LEN) {
                                break;
                            }
                            // read message fields
                            final int msgLen =
                                    dataBuffer.getInt(dataStart + DataStoreUtils.STORE_HEADER_POS_LENGTH);
                            final int msgToken =
                                    dataBuffer.getInt(dataStart + DataStoreUtils.STORE_HEADER_POS_DATATYPE);
                            final int checkSum =
                                    dataBuffer.getInt(dataStart + DataStoreUtils.STORE_HEADER_POS_CHECKSUM);
                            final int partitionId =
                                    dataBuffer.getInt(dataStart + DataStoreUtils.STORE_HEADER_POS_QUEUEID);
                            final long queueOffset =
                                    dataBuffer.getLong(dataStart + DataStoreUtils.STORE_HEADER_POS_QUEUE_LOGICOFF);
                            final long timeRecv =
                                    dataBuffer.getLong(dataStart + DataStoreUtils.STORE_HEADER_POS_RECEIVEDTIME);
                            final int keyCode =
                                    dataBuffer.getInt(dataStart + DataStoreUtils.STORE_HEADER_POS_KEYCODE);
                            final int msgSize = msgLen + 4;
                            final long msgOffset = queueOffset - queueOffset % DataStoreUtils.STORE_INDEX_HEAD_LEN;
                            int payLoadLen = msgLen - DataStoreUtils.STORE_DATA_PREFX_LEN;
                            int payLoadOffset = dataStart + DataStoreUtils.STORE_DATA_HEADER_LEN;
                            if (msgToken != DataStoreUtils.STORE_DATA_TOKER_BEGIN_VALUE
                                    || payLoadLen <= 0
                                    || payLoadLen > DataStoreUtils.STORE_MAX_MESSAGE_STORE_LEN) {
                                dataStart += 1;
                                continue;
                            }
                            if (payLoadLen > (dataRealLimit
                                    - dataStart - DataStoreUtils.STORE_DATA_HEADER_LEN)) {
                                break;
                            }
                            // check message crc
                            final byte[] payLoadData = new byte[payLoadLen];
                            System.arraycopy(dataBuffer.array(), payLoadOffset,
                                    payLoadData, 0, payLoadLen);
                            if (checkSum != CheckSum.crc32(payLoadData)) {
                                dataStart += 1;
                                continue;
                            }
                            // build index item
                            indexBuffer.clear();
                            indexBuffer.putInt(partitionId);
                            indexBuffer.putLong(curSegment.getStart()
                                    + curOffset + dataStart);
                            indexBuffer.putInt(msgSize);
                            indexBuffer.putInt(keyCode);
                            indexBuffer.putLong(timeRecv);
                            indexBuffer.flip();
                            dataStart += msgSize;
                            if (curPartSeg == null) {
                                File newFile = new File(this.indexDir,
                                        DataStoreUtils.nameFromOffset(msgOffset, DataStoreUtils.INDEX_FILE_SUFFIX));
                                curPartSeg =
                                        new FileSegment(msgOffset, newFile, SegmentType.INDEX);
                            }
                            // append index message
                            curPartSeg.append(indexBuffer, timeRecv, timeRecv);
                            if (curPartSeg.getCachedSize() >= maxIndexSegmentSize) {
                                curPartSeg.flush(true);
                                curPartSeg.close();
                                curPartSeg = null;
                            }
                        }
                        curOffset += dataStart;
                    }
                    if (curPartSeg != null) {
                        curPartSeg.flush(true);
                    }
                } catch (Throwable ee) {
                    logger.error("Create Index file error ", ee);
                } finally {
                    if (curSegment != null) {
                        curSegment.relViewRef();
                    }
                }
            }
            try {
                if (curPartSeg != null) {
                    curPartSeg.flush(true);
                    curPartSeg.close();
                }
            } catch (Throwable e2) {
                logger.error("Close Index file error ", e2);
            }
        }