in inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/disk/MsgFileStore.java [136:278]
public Tuple3<Boolean, Long, Long> appendMsg(boolean fromMem, long currTime,
StringBuilder sb, int msgCnt,
int indexSize, ByteBuffer indexBuffer,
int dataSize, ByteBuffer dataBuffer,
long leftTime, long rightTime) {
// append message, put in data file first, then index file.
if (this.closed.get()) {
throw new IllegalStateException(new StringBuilder(512)
.append("Closed MessageStore for storeKey ")
.append(this.storeKey).toString());
}
// Various parameters that trigger data refresh
boolean isDataSegFlushed = false;
boolean isIndexSegFlushed = false;
boolean pendingMsgCntExceed = false;
boolean pendingMsgSizeExceed = false;
boolean pendingMsgTimeExceed = false;
boolean isForceMetadata = false;
// flushed message count and data size info
long flushedMsgCnt = 0;
long flushedDataSize = 0;
// Temporary variables in calculations
long inIndexOffset;
Segment curDataSeg;
long dataOffset = -1;
long inDataOffset;
Segment curIndexSeg;
long indexOffset = -1;
// new file paths of creating
String newDataFilePath = null;
String newIndexFilePath = null;
boolean fileStoreOK = false;
this.writeLock.lock();
try {
// position last segments
curDataSeg = this.dataSegments.last();
curIndexSeg = this.indexSegments.last();
// get inputted offsets
if (fromMem) {
inIndexOffset = dataBuffer.getLong(DataStoreUtils.STORE_HEADER_POS_QUEUE_LOGICOFF);
inDataOffset = indexBuffer.getLong(DataStoreUtils.INDEX_POS_DATAOFFSET);
} else {
inIndexOffset = curIndexSeg.getLast();
inDataOffset = curDataSeg.getLast();
indexBuffer.putLong(DataStoreUtils.INDEX_POS_DATAOFFSET, inDataOffset);
dataBuffer.putLong(DataStoreUtils.STORE_HEADER_POS_QUEUE_LOGICOFF, inIndexOffset);
}
// filling data segment.
this.curUnflushSize.addAndGet(dataSize);
dataOffset = curDataSeg.append(dataBuffer, leftTime, rightTime);
// judge whether we need to create a new data segment.
if (curDataSeg.getCachedSize() >= this.tubeConfig.getMaxSegmentSize()) {
isDataSegFlushed = true;
long newDataOffset = curDataSeg.flush(true);
File newDataFile =
new File(this.dataDir,
DataStoreUtils.nameFromOffset(newDataOffset, DataStoreUtils.DATA_FILE_SUFFIX));
curDataSeg.setMutable(false);
newDataFilePath = newDataFile.getAbsolutePath();
this.dataSegments.append(new FileSegment(newDataOffset, newDataFile, SegmentType.DATA));
}
// filling index data.
indexOffset = curIndexSeg.append(indexBuffer, leftTime, rightTime);
// judge whether we need to create a new index segment.
if (curIndexSeg.getCachedSize() >= this.tubeConfig.getMaxIndexSegmentSize()) {
isIndexSegFlushed = true;
long newIndexOffset = curIndexSeg.flush(true);
curIndexSeg.setMutable(false);
File newIndexFile =
new File(this.indexDir,
DataStoreUtils.nameFromOffset(newIndexOffset, DataStoreUtils.INDEX_FILE_SUFFIX));
newIndexFilePath = newIndexFile.getAbsolutePath();
this.indexSegments.append(new FileSegment(newIndexOffset,
newIndexFile, SegmentType.INDEX));
}
// check whether we need to flush to disk.
pendingMsgSizeExceed = (messageStore.getUnflushDataHold() > 0)
&& (curUnflushSize.get() >= messageStore.getUnflushDataHold());
pendingMsgCntExceed =
(this.curUnflushed.addAndGet(msgCnt) >= messageStore.getUnflushThreshold());
pendingMsgTimeExceed =
(currTime - this.lastFlushTime.get() >= messageStore.getUnflushInterval());
if (pendingMsgCntExceed || pendingMsgTimeExceed
|| pendingMsgSizeExceed || isDataSegFlushed || isIndexSegFlushed) {
isForceMetadata = (isDataSegFlushed || isIndexSegFlushed
|| (currTime - this.lastMetaFlushTime.get() > MAX_META_REFRESH_DUR));
if (!isDataSegFlushed) {
curDataSeg.flush(isForceMetadata);
}
if (!isIndexSegFlushed) {
curIndexSeg.flush(isForceMetadata);
}
flushedMsgCnt = this.curUnflushed.getAndSet(0);
flushedDataSize = this.curUnflushSize.getAndSet(0);
this.lastFlushTime.set(currTime);
if (isForceMetadata) {
this.lastMetaFlushTime.set(this.lastFlushTime.get());
}
}
// print abnormal information
if (inIndexOffset != indexOffset || inDataOffset != dataOffset) {
ServiceStatusHolder.addWriteIOErrCnt();
BrokerSrvStatsHolder.incDiskIOExcCnt();
logger.error(sb.append("[File Store]: appendMsg data Error, storekey=")
.append(this.storeKey).append(",msgCnt=").append(msgCnt)
.append(",indexSize=").append(indexSize)
.append(",inIndexOffset=").append(inIndexOffset)
.append(",indexOffset=").append(indexOffset)
.append(",dataSize=").append(dataSize)
.append(",inDataOffset=").append(inDataOffset)
.append(",dataOffset=").append(dataOffset).toString());
sb.delete(0, sb.length());
} else {
fileStoreOK = true;
}
} catch (Throwable e) {
if (!closed.get()) {
ServiceStatusHolder.addWriteIOErrCnt();
BrokerSrvStatsHolder.incDiskIOExcCnt();
}
samplePrintCtrl.printExceptionCaught(e);
} finally {
this.writeLock.unlock();
// add statistics.
if (fileStoreOK) {
msgStoreStatsHolder.addFileFlushStatsInfo(msgCnt, indexSize, dataSize,
flushedMsgCnt, flushedDataSize, isDataSegFlushed, isIndexSegFlushed,
pendingMsgSizeExceed, pendingMsgCntExceed, pendingMsgTimeExceed,
isForceMetadata, System.currentTimeMillis() - currTime);
if (isDataSegFlushed) {
logger.info(sb.append("[File Store] Created data segment ")
.append(newDataFilePath).toString());
sb.delete(0, sb.length());
}
if (isIndexSegFlushed) {
logger.info(sb.append("[File Store] Created index segment ")
.append(newIndexFilePath).toString());
sb.delete(0, sb.length());
}
}
}
return new Tuple3<>(fileStoreOK, indexOffset, dataOffset);
}