in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java [1515:1704]
public void flushOneMemTable() {
IMemTable memTableToFlush = flushingMemTables.getFirst();
// Signal memtable only may appear when calling asyncClose()
if (!memTableToFlush.isSignalMemTable()) {
if (memTableToFlush.isEmpty()) {
logger.info(
"This normal memtable is empty, skip flush. {}: {}",
dataRegionName,
tsFileResource.getTsFile().getName());
} else {
try {
writer.mark();
MemTableFlushTask flushTask =
new MemTableFlushTask(
memTableToFlush,
writer,
dataRegionName,
dataRegionInfo.getDataRegion().getDataRegionId());
flushTask.syncFlushMemTable();
memTableFlushPointCount = memTableToFlush.getTotalPointsNum();
} catch (Throwable e) {
if (writer == null) {
logger.info(
"{}: {} is closed during flush, abandon flush task",
dataRegionName,
tsFileResource.getTsFile().getAbsolutePath());
synchronized (flushingMemTables) {
flushingMemTables.notifyAll();
}
} else {
logger.error(
"{}: {} meet error when flushing a memtable, change system mode to error",
dataRegionName,
tsFileResource.getTsFile().getAbsolutePath(),
e);
CommonDescriptor.getInstance().getConfig().handleUnrecoverableError();
try {
logger.error(
"{}: {} IOTask meets error, truncate the corrupted data",
dataRegionName,
tsFileResource.getTsFile().getAbsolutePath(),
e);
writer.reset();
} catch (IOException e1) {
logger.error(
"{}: {} Truncate corrupted data meets error",
dataRegionName,
tsFileResource.getTsFile().getAbsolutePath(),
e1);
}
// Release resource
try {
syncReleaseFlushedMemTable(memTableToFlush);
// Make sure no read will search this file
tsFileResource.setTimeIndex(config.getTimeIndexLevel().getTimeIndex());
// This callback method will register this empty tsfile into TsFileManager
for (CloseFileListener closeFileListener : closeFileListeners) {
closeFileListener.onClosed(this);
}
// Close writer
writer.close();
writer = null;
synchronized (flushingMemTables) {
flushingMemTables.notifyAll();
}
} catch (Exception e1) {
logger.error(
"{}: {} Release resource meets error",
dataRegionName,
tsFileResource.getTsFile().getAbsolutePath(),
e1);
}
return;
}
}
}
}
try {
flushQueryLock.writeLock().lock();
Iterator<Pair<ModEntry, IMemTable>> iterator = modsToMemtable.iterator();
while (iterator.hasNext()) {
Pair<ModEntry, IMemTable> entry = iterator.next();
if (entry.right.equals(memTableToFlush)) {
this.tsFileResource.getModFileForWrite().write(entry.left);
tsFileResource.getModFileForWrite().close();
iterator.remove();
logger.info("[Deletion] Deletion : {} written when flush memtable", entry.left);
}
}
} catch (IOException e) {
logger.error(
"Meet error when writing into ModificationFile file of {} ",
tsFileResource.getTsFile().getAbsolutePath(),
e);
} finally {
flushQueryLock.writeLock().unlock();
}
if (logger.isDebugEnabled()) {
logger.debug(
"{}: {} try get lock to release a memtable (signal={})",
dataRegionName,
tsFileResource.getTsFile().getAbsolutePath(),
memTableToFlush.isSignalMemTable());
}
// For sync flush
syncReleaseFlushedMemTable(memTableToFlush);
try {
writer.getTsFileOutput().force();
} catch (IOException e) {
logger.error("fsync memTable data to disk error,", e);
}
// Call flushed listener after memtable is released safely
for (FlushListener flushListener : flushListeners) {
flushListener.onMemTableFlushed(memTableToFlush);
}
// Retry to avoid unnecessary read-only mode
int retryCnt = 0;
while (shouldClose && flushingMemTables.isEmpty() && writer != null) {
try {
if (isEmpty()) {
endEmptyFile();
} else {
writer.mark();
updateCompressionRatio();
if (logger.isDebugEnabled()) {
logger.debug(
"{}: {} flushingMemtables is empty and will close the file",
dataRegionName,
tsFileResource.getTsFile().getAbsolutePath());
}
endFile();
}
if (logger.isDebugEnabled()) {
logger.debug("{} flushingMemtables is clear", dataRegionName);
}
} catch (Exception e) {
logger.error(
"{}: {} marking or ending file meet error",
dataRegionName,
tsFileResource.getTsFile().getAbsolutePath(),
e);
// Truncate broken metadata
try {
writer.reset();
} catch (ClosedChannelException e1) {
// the file is closed
break;
} catch (IOException e1) {
logger.error(
"{}: {} truncate corrupted data meets error",
dataRegionName,
tsFileResource.getTsFile().getAbsolutePath(),
e1);
}
// Retry or set read-only
if (retryCnt < 3) {
logger.warn(
"{} meet error when flush FileMetadata to {}, retry it again",
dataRegionName,
tsFileResource.getTsFile().getAbsolutePath(),
e);
retryCnt++;
continue;
} else {
logger.error(
"{} meet error when flush FileMetadata to {}, change system mode to error",
dataRegionName,
tsFileResource.getTsFile().getAbsolutePath(),
e);
CommonDescriptor.getInstance().getConfig().handleUnrecoverableError();
break;
}
}
// For sync close
if (logger.isDebugEnabled()) {
logger.debug(
"{}: {} try to get flushingMemtables lock.",
dataRegionName,
tsFileResource.getTsFile().getAbsolutePath());
}
synchronized (flushingMemTables) {
flushingMemTables.notifyAll();
}
}
}