public Tuple3 appendMsg()

in inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java [136:278]


    public Tuple3<Boolean, Long, Long> appendMsg(boolean fromMem, long currTime,
            StringBuilder sb, int msgCnt,
            int indexSize, ByteBuffer indexBuffer,
            int dataSize, ByteBuffer dataBuffer,
            long leftTime, long rightTime) {
        // append message, put in data file first, then index file.
        if (this.closed.get()) {
            throw new IllegalStateException(new StringBuilder(512)
                    .append("Closed MessageStore for storeKey ")
                    .append(this.storeKey).toString());
        }
        // Various parameters that trigger data refresh
        boolean isDataSegFlushed = false;
        boolean isIndexSegFlushed = false;
        boolean pendingMsgCntExceed = false;
        boolean pendingMsgSizeExceed = false;
        boolean pendingMsgTimeExceed = false;
        boolean isForceMetadata = false;
        // flushed message count and data size info
        long flushedMsgCnt = 0;
        long flushedDataSize = 0;
        // Temporary variables in calculations
        long inIndexOffset;
        Segment curDataSeg;
        long dataOffset = -1;
        long inDataOffset;
        Segment curIndexSeg;
        long indexOffset = -1;
        // new file paths of creating
        String newDataFilePath = null;
        String newIndexFilePath = null;
        boolean fileStoreOK = false;
        this.writeLock.lock();
        try {
            // position last segments
            curDataSeg = this.dataSegments.last();
            curIndexSeg = this.indexSegments.last();
            // get inputted offsets
            if (fromMem) {
                inIndexOffset = dataBuffer.getLong(DataStoreUtils.STORE_HEADER_POS_QUEUE_LOGICOFF);
                inDataOffset = indexBuffer.getLong(DataStoreUtils.INDEX_POS_DATAOFFSET);
            } else {
                inIndexOffset = curIndexSeg.getLast();
                inDataOffset = curDataSeg.getLast();
                indexBuffer.putLong(DataStoreUtils.INDEX_POS_DATAOFFSET, inDataOffset);
                dataBuffer.putLong(DataStoreUtils.STORE_HEADER_POS_QUEUE_LOGICOFF, inIndexOffset);
            }
            // filling data segment.
            this.curUnflushSize.addAndGet(dataSize);
            dataOffset = curDataSeg.append(dataBuffer, leftTime, rightTime);
            // judge whether we need to create a new data segment.
            if (curDataSeg.getCachedSize() >= this.tubeConfig.getMaxSegmentSize()) {
                isDataSegFlushed = true;
                long newDataOffset = curDataSeg.flush(true);
                File newDataFile =
                        new File(this.dataDir,
                                DataStoreUtils.nameFromOffset(newDataOffset, DataStoreUtils.DATA_FILE_SUFFIX));
                curDataSeg.setMutable(false);
                newDataFilePath = newDataFile.getAbsolutePath();
                this.dataSegments.append(new FileSegment(newDataOffset, newDataFile, SegmentType.DATA));
            }
            // filling index data.
            indexOffset = curIndexSeg.append(indexBuffer, leftTime, rightTime);
            // judge whether we need to create a new index segment.
            if (curIndexSeg.getCachedSize() >= this.tubeConfig.getMaxIndexSegmentSize()) {
                isIndexSegFlushed = true;
                long newIndexOffset = curIndexSeg.flush(true);
                curIndexSeg.setMutable(false);
                File newIndexFile =
                        new File(this.indexDir,
                                DataStoreUtils.nameFromOffset(newIndexOffset, DataStoreUtils.INDEX_FILE_SUFFIX));
                newIndexFilePath = newIndexFile.getAbsolutePath();
                this.indexSegments.append(new FileSegment(newIndexOffset,
                        newIndexFile, SegmentType.INDEX));
            }
            // check whether we need to flush to disk.
            pendingMsgSizeExceed = (messageStore.getUnflushDataHold() > 0)
                    && (curUnflushSize.get() >= messageStore.getUnflushDataHold());
            pendingMsgCntExceed =
                    (this.curUnflushed.addAndGet(msgCnt) >= messageStore.getUnflushThreshold());
            pendingMsgTimeExceed =
                    (currTime - this.lastFlushTime.get() >= messageStore.getUnflushInterval());
            if (pendingMsgCntExceed || pendingMsgTimeExceed
                    || pendingMsgSizeExceed || isDataSegFlushed || isIndexSegFlushed) {
                isForceMetadata = (isDataSegFlushed || isIndexSegFlushed
                        || (currTime - this.lastMetaFlushTime.get() > MAX_META_REFRESH_DUR));
                if (!isDataSegFlushed) {
                    curDataSeg.flush(isForceMetadata);
                }
                if (!isIndexSegFlushed) {
                    curIndexSeg.flush(isForceMetadata);
                }
                flushedMsgCnt = this.curUnflushed.getAndSet(0);
                flushedDataSize = this.curUnflushSize.getAndSet(0);
                this.lastFlushTime.set(currTime);
                if (isForceMetadata) {
                    this.lastMetaFlushTime.set(this.lastFlushTime.get());
                }
            }
            // print abnormal information
            if (inIndexOffset != indexOffset || inDataOffset != dataOffset) {
                ServiceStatusHolder.addWriteIOErrCnt();
                BrokerSrvStatsHolder.incDiskIOExcCnt();
                logger.error(sb.append("[File Store]: appendMsg data Error, storekey=")
                        .append(this.storeKey).append(",msgCnt=").append(msgCnt)
                        .append(",indexSize=").append(indexSize)
                        .append(",inIndexOffset=").append(inIndexOffset)
                        .append(",indexOffset=").append(indexOffset)
                        .append(",dataSize=").append(dataSize)
                        .append(",inDataOffset=").append(inDataOffset)
                        .append(",dataOffset=").append(dataOffset).toString());
                sb.delete(0, sb.length());
            } else {
                fileStoreOK = true;
            }
        } catch (Throwable e) {
            if (!closed.get()) {
                ServiceStatusHolder.addWriteIOErrCnt();
                BrokerSrvStatsHolder.incDiskIOExcCnt();
            }
            samplePrintCtrl.printExceptionCaught(e);
        } finally {
            this.writeLock.unlock();
            // add statistics.
            if (fileStoreOK) {
                msgStoreStatsHolder.addFileFlushStatsInfo(msgCnt, indexSize, dataSize,
                        flushedMsgCnt, flushedDataSize, isDataSegFlushed, isIndexSegFlushed,
                        pendingMsgSizeExceed, pendingMsgCntExceed, pendingMsgTimeExceed,
                        isForceMetadata, System.currentTimeMillis() - currTime);
                if (isDataSegFlushed) {
                    logger.info(sb.append("[File Store] Created data segment ")
                            .append(newDataFilePath).toString());
                    sb.delete(0, sb.length());
                }
                if (isIndexSegFlushed) {
                    logger.info(sb.append("[File Store] Created index segment ")
                            .append(newIndexFilePath).toString());
                    sb.delete(0, sb.length());
                }
            }
        }
        return new Tuple3<>(fileStoreOK, indexOffset, dataOffset);
    }