private void recover()

in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java [505:711]


  private void recover() throws DataRegionException {
    try {
      recoverCompaction();
    } catch (Exception e) {
      // signal wal recover manager to recover this region's files
      WALRecoverManager.getInstance()
          .getAllDataRegionScannedLatch()
          .countDownWithException(e.getMessage());
      throw new DataRegionException(e);
    }

    try {
      // collect candidate TsFiles from sequential and unsequential data directory
      // split by partition so that we can find the last file of each partition and decide to
      // close it or not
      Map<Long, List<TsFileResource>> partitionTmpSeqTsFiles =
          getAllFiles(TierManager.getInstance().getAllLocalSequenceFileFolders());
      Map<Long, List<TsFileResource>> partitionTmpUnseqTsFiles =
          getAllFiles(TierManager.getInstance().getAllLocalUnSequenceFileFolders());
      DataRegionRecoveryContext dataRegionRecoveryContext =
          new DataRegionRecoveryContext(
              partitionTmpSeqTsFiles.values().stream().mapToLong(List::size).sum()
                  + partitionTmpUnseqTsFiles.values().stream().mapToLong(List::size).sum());
      // submit unsealed TsFiles to recover
      List<WALRecoverListener> recoverListeners = new ArrayList<>();
      for (List<TsFileResource> value : partitionTmpSeqTsFiles.values()) {
        // tsFiles without resource file are unsealed
        for (TsFileResource resource : value) {
          if (resource.resourceFileExists()) {
            FileMetrics.getInstance()
                .addTsFile(
                    resource.getDatabaseName(),
                    resource.getDataRegionId(),
                    resource.getTsFile().length(),
                    true,
                    resource.getTsFile().getName());
            if (ModificationFile.getExclusiveMods(resource.getTsFile()).exists()) {
              // update mods file metrics
              resource.getExclusiveModFile();
            } else {
              resource.upgradeModFile(upgradeModFileThreadPool);
            }
          }
        }
        while (!value.isEmpty()) {
          TsFileResource tsFileResource = value.get(value.size() - 1);
          if (tsFileResource.resourceFileExists()) {
            break;
          } else {
            value.remove(value.size() - 1);
            WALRecoverListener recoverListener =
                recoverUnsealedTsFile(tsFileResource, dataRegionRecoveryContext, true);
            if (recoverListener != null) {
              recoverListeners.add(recoverListener);
            }
          }
        }
      }
      for (List<TsFileResource> value : partitionTmpUnseqTsFiles.values()) {
        // tsFiles without resource file are unsealed
        for (TsFileResource resource : value) {
          if (resource.resourceFileExists()) {
            FileMetrics.getInstance()
                .addTsFile(
                    resource.getDatabaseName(),
                    resource.getDataRegionId(),
                    resource.getTsFile().length(),
                    false,
                    resource.getTsFile().getName());
          }
          if (ModificationFile.getExclusiveMods(resource.getTsFile()).exists()) {
            // update mods file metrics
            resource.getExclusiveModFile();
          } else {
            resource.upgradeModFile(upgradeModFileThreadPool);
          }
        }
        while (!value.isEmpty()) {
          TsFileResource tsFileResource = value.get(value.size() - 1);
          if (tsFileResource.resourceFileExists()) {
            break;
          } else {
            value.remove(value.size() - 1);
            WALRecoverListener recoverListener =
                recoverUnsealedTsFile(tsFileResource, dataRegionRecoveryContext, false);
            if (recoverListener != null) {
              recoverListeners.add(recoverListener);
            }
          }
        }
      }
      // signal wal recover manager to recover this region's files
      WALRecoverManager.getInstance().getAllDataRegionScannedLatch().countDown();
      // recover sealed TsFiles
      if (!partitionTmpSeqTsFiles.isEmpty() || !partitionTmpUnseqTsFiles.isEmpty()) {
        long latestPartitionId = Long.MIN_VALUE;
        if (!partitionTmpSeqTsFiles.isEmpty()) {
          latestPartitionId =
              ((TreeMap<Long, List<TsFileResource>>) partitionTmpSeqTsFiles).lastKey();
        }
        if (!partitionTmpUnseqTsFiles.isEmpty()) {
          latestPartitionId =
              Math.max(
                  latestPartitionId,
                  ((TreeMap<Long, List<TsFileResource>>) partitionTmpUnseqTsFiles).lastKey());
        }
        File logFile = SystemFileFactory.INSTANCE.getFile(dataRegionSysDir, "FileTimeIndexCache_0");
        Map<TsFileID, FileTimeIndex> fileTimeIndexMap = new HashMap<>();
        if (logFile.exists()) {
          try {
            FileTimeIndexCacheReader logReader =
                new FileTimeIndexCacheReader(logFile, dataRegionId);
            logReader.read(fileTimeIndexMap);
          } catch (Exception e) {
            throw new RuntimeException(e);
          }
        }
        for (Entry<Long, List<TsFileResource>> partitionFiles : partitionTmpSeqTsFiles.entrySet()) {
          Callable<Void> asyncRecoverTask =
              recoverFilesInPartition(
                  partitionFiles.getKey(),
                  dataRegionRecoveryContext,
                  partitionFiles.getValue(),
                  fileTimeIndexMap,
                  true);
          if (asyncRecoverTask != null) {
            asyncTsFileResourceRecoverTaskList.add(asyncRecoverTask);
          }
        }
        for (Entry<Long, List<TsFileResource>> partitionFiles :
            partitionTmpUnseqTsFiles.entrySet()) {
          Callable<Void> asyncRecoverTask =
              recoverFilesInPartition(
                  partitionFiles.getKey(),
                  dataRegionRecoveryContext,
                  partitionFiles.getValue(),
                  fileTimeIndexMap,
                  false);
          if (asyncRecoverTask != null) {
            asyncTsFileResourceRecoverTaskList.add(asyncRecoverTask);
          }
        }
        if (config.isEnableSeparateData()) {
          TimePartitionManager.getInstance()
              .registerTimePartitionInfo(
                  new TimePartitionInfo(
                      new DataRegionId(Integer.parseInt(dataRegionId)),
                      latestPartitionId,
                      false,
                      Long.MAX_VALUE,
                      lastFlushTimeMap.getMemSize(latestPartitionId)));
        }
      }
      // wait until all unsealed TsFiles have been recovered
      for (WALRecoverListener recoverListener : recoverListeners) {
        if (recoverListener.waitForResult() == WALRecoverListener.Status.FAILURE) {
          logger.error(
              "Fail to recover unsealed TsFile {}, skip it.",
              recoverListener.getFilePath(),
              recoverListener.getCause());
        }
        // update VSGRecoveryContext
        dataRegionRecoveryContext.incrementRecoveredFilesNum();
      }
      // recover unsealed TsFiles, sort make sure last flush time not be replaced by early files
      dataRegionRecoveryContext.recoverPerformers.sort(
          (p1, p2) ->
              compareFileName(
                  p1.getTsFileResource().getTsFile(), p2.getTsFileResource().getTsFile()));
      for (UnsealedTsFileRecoverPerformer recoverPerformer :
          dataRegionRecoveryContext.recoverPerformers) {
        recoverUnsealedTsFileCallBack(recoverPerformer);
      }
      for (TsFileResource resource : tsFileManager.getTsFileList(true)) {
        long partitionNum = resource.getTimePartition();
        updatePartitionFileVersion(partitionNum, resource.getVersion());
      }
      for (TsFileResource resource : tsFileManager.getTsFileList(false)) {
        long partitionNum = resource.getTimePartition();
        updatePartitionFileVersion(partitionNum, resource.getVersion());
      }
    } catch (IOException e) {
      // signal wal recover manager to recover this region's files
      WALRecoverManager.getInstance()
          .getAllDataRegionScannedLatch()
          .countDownWithException(e.getMessage());
      throw new DataRegionException(e);
    }

    if (asyncTsFileResourceRecoverTaskList.isEmpty()) {
      initCompactionSchedule();
    }

    if (StorageEngine.getInstance().isReadyForReadAndWrite()) {
      if (config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS_V2)) {
        IWALNode walNode =
            WALManager.getInstance()
                .applyForWALNode(databaseName + FILE_NAME_SEPARATOR + dataRegionId);
        if (walNode instanceof WALNode) {
          walNode.setSafelyDeletedSearchIndex(Long.MAX_VALUE);
        }
      }
      logger.info("The data region {}[{}] is created successfully", databaseName, dataRegionId);
    } else {
      logger.info("The data region {}[{}] is recovered successfully", databaseName, dataRegionId);
    }
  }