in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java [505:711]
private void recover() throws DataRegionException {
try {
recoverCompaction();
} catch (Exception e) {
// signal wal recover manager to recover this region's files
WALRecoverManager.getInstance()
.getAllDataRegionScannedLatch()
.countDownWithException(e.getMessage());
throw new DataRegionException(e);
}
try {
// collect candidate TsFiles from sequential and unsequential data directory
// split by partition so that we can find the last file of each partition and decide to
// close it or not
Map<Long, List<TsFileResource>> partitionTmpSeqTsFiles =
getAllFiles(TierManager.getInstance().getAllLocalSequenceFileFolders());
Map<Long, List<TsFileResource>> partitionTmpUnseqTsFiles =
getAllFiles(TierManager.getInstance().getAllLocalUnSequenceFileFolders());
DataRegionRecoveryContext dataRegionRecoveryContext =
new DataRegionRecoveryContext(
partitionTmpSeqTsFiles.values().stream().mapToLong(List::size).sum()
+ partitionTmpUnseqTsFiles.values().stream().mapToLong(List::size).sum());
// submit unsealed TsFiles to recover
List<WALRecoverListener> recoverListeners = new ArrayList<>();
for (List<TsFileResource> value : partitionTmpSeqTsFiles.values()) {
// tsFiles without resource file are unsealed
for (TsFileResource resource : value) {
if (resource.resourceFileExists()) {
FileMetrics.getInstance()
.addTsFile(
resource.getDatabaseName(),
resource.getDataRegionId(),
resource.getTsFile().length(),
true,
resource.getTsFile().getName());
if (ModificationFile.getExclusiveMods(resource.getTsFile()).exists()) {
// update mods file metrics
resource.getExclusiveModFile();
} else {
resource.upgradeModFile(upgradeModFileThreadPool);
}
}
}
while (!value.isEmpty()) {
TsFileResource tsFileResource = value.get(value.size() - 1);
if (tsFileResource.resourceFileExists()) {
break;
} else {
value.remove(value.size() - 1);
WALRecoverListener recoverListener =
recoverUnsealedTsFile(tsFileResource, dataRegionRecoveryContext, true);
if (recoverListener != null) {
recoverListeners.add(recoverListener);
}
}
}
}
for (List<TsFileResource> value : partitionTmpUnseqTsFiles.values()) {
// tsFiles without resource file are unsealed
for (TsFileResource resource : value) {
if (resource.resourceFileExists()) {
FileMetrics.getInstance()
.addTsFile(
resource.getDatabaseName(),
resource.getDataRegionId(),
resource.getTsFile().length(),
false,
resource.getTsFile().getName());
}
if (ModificationFile.getExclusiveMods(resource.getTsFile()).exists()) {
// update mods file metrics
resource.getExclusiveModFile();
} else {
resource.upgradeModFile(upgradeModFileThreadPool);
}
}
while (!value.isEmpty()) {
TsFileResource tsFileResource = value.get(value.size() - 1);
if (tsFileResource.resourceFileExists()) {
break;
} else {
value.remove(value.size() - 1);
WALRecoverListener recoverListener =
recoverUnsealedTsFile(tsFileResource, dataRegionRecoveryContext, false);
if (recoverListener != null) {
recoverListeners.add(recoverListener);
}
}
}
}
// signal wal recover manager to recover this region's files
WALRecoverManager.getInstance().getAllDataRegionScannedLatch().countDown();
// recover sealed TsFiles
if (!partitionTmpSeqTsFiles.isEmpty() || !partitionTmpUnseqTsFiles.isEmpty()) {
long latestPartitionId = Long.MIN_VALUE;
if (!partitionTmpSeqTsFiles.isEmpty()) {
latestPartitionId =
((TreeMap<Long, List<TsFileResource>>) partitionTmpSeqTsFiles).lastKey();
}
if (!partitionTmpUnseqTsFiles.isEmpty()) {
latestPartitionId =
Math.max(
latestPartitionId,
((TreeMap<Long, List<TsFileResource>>) partitionTmpUnseqTsFiles).lastKey());
}
File logFile = SystemFileFactory.INSTANCE.getFile(dataRegionSysDir, "FileTimeIndexCache_0");
Map<TsFileID, FileTimeIndex> fileTimeIndexMap = new HashMap<>();
if (logFile.exists()) {
try {
FileTimeIndexCacheReader logReader =
new FileTimeIndexCacheReader(logFile, dataRegionId);
logReader.read(fileTimeIndexMap);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
for (Entry<Long, List<TsFileResource>> partitionFiles : partitionTmpSeqTsFiles.entrySet()) {
Callable<Void> asyncRecoverTask =
recoverFilesInPartition(
partitionFiles.getKey(),
dataRegionRecoveryContext,
partitionFiles.getValue(),
fileTimeIndexMap,
true);
if (asyncRecoverTask != null) {
asyncTsFileResourceRecoverTaskList.add(asyncRecoverTask);
}
}
for (Entry<Long, List<TsFileResource>> partitionFiles :
partitionTmpUnseqTsFiles.entrySet()) {
Callable<Void> asyncRecoverTask =
recoverFilesInPartition(
partitionFiles.getKey(),
dataRegionRecoveryContext,
partitionFiles.getValue(),
fileTimeIndexMap,
false);
if (asyncRecoverTask != null) {
asyncTsFileResourceRecoverTaskList.add(asyncRecoverTask);
}
}
if (config.isEnableSeparateData()) {
TimePartitionManager.getInstance()
.registerTimePartitionInfo(
new TimePartitionInfo(
new DataRegionId(Integer.parseInt(dataRegionId)),
latestPartitionId,
false,
Long.MAX_VALUE,
lastFlushTimeMap.getMemSize(latestPartitionId)));
}
}
// wait until all unsealed TsFiles have been recovered
for (WALRecoverListener recoverListener : recoverListeners) {
if (recoverListener.waitForResult() == WALRecoverListener.Status.FAILURE) {
logger.error(
"Fail to recover unsealed TsFile {}, skip it.",
recoverListener.getFilePath(),
recoverListener.getCause());
}
// update VSGRecoveryContext
dataRegionRecoveryContext.incrementRecoveredFilesNum();
}
// recover unsealed TsFiles, sort make sure last flush time not be replaced by early files
dataRegionRecoveryContext.recoverPerformers.sort(
(p1, p2) ->
compareFileName(
p1.getTsFileResource().getTsFile(), p2.getTsFileResource().getTsFile()));
for (UnsealedTsFileRecoverPerformer recoverPerformer :
dataRegionRecoveryContext.recoverPerformers) {
recoverUnsealedTsFileCallBack(recoverPerformer);
}
for (TsFileResource resource : tsFileManager.getTsFileList(true)) {
long partitionNum = resource.getTimePartition();
updatePartitionFileVersion(partitionNum, resource.getVersion());
}
for (TsFileResource resource : tsFileManager.getTsFileList(false)) {
long partitionNum = resource.getTimePartition();
updatePartitionFileVersion(partitionNum, resource.getVersion());
}
} catch (IOException e) {
// signal wal recover manager to recover this region's files
WALRecoverManager.getInstance()
.getAllDataRegionScannedLatch()
.countDownWithException(e.getMessage());
throw new DataRegionException(e);
}
if (asyncTsFileResourceRecoverTaskList.isEmpty()) {
initCompactionSchedule();
}
if (StorageEngine.getInstance().isReadyForReadAndWrite()) {
if (config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS_V2)) {
IWALNode walNode =
WALManager.getInstance()
.applyForWALNode(databaseName + FILE_NAME_SEPARATOR + dataRegionId);
if (walNode instanceof WALNode) {
walNode.setSafelyDeletedSearchIndex(Long.MAX_VALUE);
}
}
logger.info("The data region {}[{}] is created successfully", databaseName, dataRegionId);
} else {
logger.info("The data region {}[{}] is recovered successfully", databaseName, dataRegionId);
}
}