static CompletableFuture restoreStores()

in samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java [228:310]


  static CompletableFuture<Void> restoreStores(String jobName, String jobId, TaskName taskName, Set<String> storesToRestore,
      Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes,
      File loggedBaseDir, StorageConfig storageConfig, BlobStoreRestoreManagerMetrics metrics,
      StorageManagerUtil storageManagerUtil, BlobStoreUtil blobStoreUtil, DirDiffUtil dirDiffUtil,
      ExecutorService executor, boolean getDeleted, boolean compareFileOwners) {
    long restoreStartTime = System.nanoTime();
    List<CompletionStage<Void>> restoreFutures = new ArrayList<>();
    LOG.debug("Starting restore for task: {} stores: {}", taskName, storesToRestore);
    storesToRestore.forEach(storeName -> {
      if (!prevStoreSnapshotIndexes.containsKey(storeName)) {
        LOG.info("No checkpointed snapshot index found for task: {} store: {}. Skipping restore.", taskName, storeName);
        // TODO HIGH shesharm what should we do with the local state already present on disk, if any?
        // E.g. this will be the case if user changes a store from changelog based backup and restore to
        // blob store based backup and restore, both at the same time.
        return;
      }

      Pair<String, SnapshotIndex> scmAndSnapshotIndex = prevStoreSnapshotIndexes.get(storeName);

      long storeRestoreStartTime = System.nanoTime();
      SnapshotIndex snapshotIndex = scmAndSnapshotIndex.getRight();
      DirIndex dirIndex = snapshotIndex.getDirIndex();

      DirIndex.Stats stats = DirIndex.getStats(dirIndex);
      metrics.filesToRestore.getValue().addAndGet(stats.filesPresent);
      metrics.bytesToRestore.getValue().addAndGet(stats.bytesPresent);
      metrics.filesRemaining.getValue().addAndGet(stats.filesPresent);
      metrics.bytesRemaining.getValue().addAndGet(stats.bytesPresent);

      CheckpointId checkpointId = snapshotIndex.getSnapshotMetadata().getCheckpointId();
      File storeDir = storageManagerUtil.getTaskStoreDir(loggedBaseDir, storeName, taskName, TaskMode.Active);
      Path storeCheckpointDir = Paths.get(storageManagerUtil.getStoreCheckpointDir(storeDir, checkpointId));
      LOG.trace("Got task: {} store: {} local store directory: {} and local store checkpoint directory: {}",
          taskName, storeName, storeDir, storeCheckpointDir);

      // we always delete the store dir to preserve transactional state guarantees.
      try {
        LOG.debug("Deleting local store directory: {}. Will be restored from local store checkpoint directory " +
            "or remote snapshot.", storeDir);
        if (storeDir.exists() && storeDir.isDirectory()) {
          PathUtils.deleteDirectory(storeDir.toPath());
        }
      } catch (IOException e) {
        throw new SamzaException(String.format("Error deleting store directory: %s", storeDir), e);
      }

      // Restore from blob store if:
      // 1. shouldRestore() returns true - there is a diff between local and remote snapshot.
      // 2. getDeleted is set - Some blobs in the blob store were deleted incorrectly (SAMZA-2787). Download/restore
      //                             everything locally ignoring the diff. This will be backed up afresh by
      //                             ContainerStorageManager recovery path.
      boolean shouldRestore = getDeleted || shouldRestore(taskName.getTaskName(), storeName, dirIndex,
          storeCheckpointDir, storageConfig, dirDiffUtil);

      if (shouldRestore) { // restore the store from the remote blob store
        // delete all store checkpoint directories. if we only delete the store directory and don't
        // delete the checkpoint directories, the store size on disk will grow to 2x after restore
        // until the first commit is completed and older checkpoint dirs are deleted. This is
        // because the hard-linked checkpoint dir files will no longer be de-duped with the
        // now-deleted main store directory contents and will take up additional space of their
        // own during the restore.
        deleteCheckpointDirs(taskName, storeName, loggedBaseDir, storageManagerUtil);

        metrics.storePreRestoreNs.get(storeName).set(System.nanoTime() - storeRestoreStartTime);
        enqueueRestore(jobName, jobId, taskName.toString(), storeName, storeDir, dirIndex, storeRestoreStartTime,
            restoreFutures, blobStoreUtil, dirDiffUtil, metrics, executor, getDeleted, compareFileOwners);
      } else {
        LOG.debug("Renaming store checkpoint directory: {} to store directory: {} since its contents are identical " +
            "to the remote snapshot.", storeCheckpointDir, storeDir);
        // atomically rename the checkpoint dir to the store dir
        new FileUtil().move(storeCheckpointDir.toFile(), storeDir);

        // delete any other checkpoint dirs.
        deleteCheckpointDirs(taskName, storeName, loggedBaseDir, storageManagerUtil);
      }
    });

    // wait for all restores to finish
    return FutureUtil.allOf(restoreFutures).whenComplete((res, ex) -> {
      LOG.info("Restore completed for task: {} stores", taskName);
      metrics.restoreNs.set(System.nanoTime() - restoreStartTime);
    });
  }