private static CompletableFuture restoreDeletedSnapshot()

in samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerRestoreUtil.java [264:335]


  private static CompletableFuture<Checkpoint> restoreDeletedSnapshot(TaskName taskName,
      Map<TaskName, Checkpoint> taskCheckpoints, CheckpointManager checkpointManager,
      TaskRestoreManager taskRestoreManager, Config config, Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics,
      ExecutorService executor, Set<String> storesToRestore, File loggedStoreBaseDirectory, JobContext jobContext,
      ContainerModel containerModel) {

    LOG.warn("Received DeletedException during restore for task {}. Attempting to get blobs with getDeleted set to true",
        taskName.getTaskName());
    // if taskInstanceMetrics are specified use those for store metrics,
    // otherwise (in case of StorageRecovery) use a blank MetricsRegistryMap
    MetricsRegistry metricsRegistry =
        taskInstanceMetrics.get(taskName) != null ? taskInstanceMetrics.get(taskName).registry()
            : new MetricsRegistryMap();

    BlobStoreManager blobStoreManager = getBlobStoreManager(config, executor);
    blobStoreManager.init();
    JobConfig jobConfig = new JobConfig(config);
    BlobStoreUtil blobStoreUtil =
        new BlobStoreUtil(blobStoreManager, executor, new BlobStoreConfig(config), null,
            new BlobStoreRestoreManagerMetrics(metricsRegistry));

    BlobStoreRestoreManager blobStoreRestoreManager = (BlobStoreRestoreManager) taskRestoreManager;

    CheckpointId checkpointId = CheckpointId.create();
    Map<String, String> oldSCMs = ((CheckpointV2) taskCheckpoints.get(taskName)).getStateCheckpointMarkers()
        .get(BlobStoreStateBackendFactory.class.getName());

    // 1. Restore state with getDeleted flag set to true
    CompletableFuture<Void> restoreFuture = blobStoreRestoreManager.restore(true);

    // 2. Create a new checkpoint and back it up on the blob store
    CompletableFuture<Map<String, String>> backupStoresFuture = restoreFuture.thenCompose(
      r -> backupRecoveredStore(jobContext, containerModel, config, taskName, storesToRestore, checkpointId,
          loggedStoreBaseDirectory, blobStoreManager, metricsRegistry, executor));

    // 3. Mark new Snapshots to never expire
    CompletableFuture<Void> removeNewSnapshotsTTLFuture = backupStoresFuture.thenCompose(
      storeSCMs -> {
        List<CompletableFuture<Void>> removeTTLForSnapshotIndexFutures = new ArrayList<>();
        storeSCMs.forEach((store, scm) -> {
          Metadata requestMetadata = new Metadata(Metadata.SNAPSHOT_INDEX_PAYLOAD_PATH, Optional.empty(),
              jobConfig.getName().get(), jobConfig.getJobId(), taskName.getTaskName(), store);
          removeTTLForSnapshotIndexFutures.add(blobStoreUtil.removeTTLForSnapshotIndex(scm, requestMetadata));
        });
        return CompletableFuture.allOf(removeTTLForSnapshotIndexFutures.toArray(new CompletableFuture[0]));
      });

    // 4. Delete prev SnapshotIndex including files/subdirs
    CompletableFuture<Void> deleteOldSnapshotsFuture = removeNewSnapshotsTTLFuture.thenCompose(
      ignore -> {
        List<CompletableFuture<Void>> deletePrevSnapshotFutures = new ArrayList<>();
        oldSCMs.forEach((store, oldSCM) -> {
          Metadata requestMetadata = new Metadata(Metadata.SNAPSHOT_INDEX_PAYLOAD_PATH, Optional.empty(),
              jobConfig.getName().get(), jobConfig.getJobId(), taskName.getTaskName(), store);
          deletePrevSnapshotFutures.add(blobStoreUtil.cleanSnapshotIndex(oldSCM, requestMetadata, true).toCompletableFuture());
        });
        return CompletableFuture.allOf(deletePrevSnapshotFutures.toArray(new CompletableFuture[0]));
      });

    CompletableFuture<Checkpoint> newTaskCheckpointsFuture =
        deleteOldSnapshotsFuture.thenCombine(backupStoresFuture, (aVoid, scms) -> {
          // cleanup resources
          blobStoreManager.close();
          // 5. create new checkpoint
          return writeNewCheckpoint(taskName, checkpointId, scms, checkpointManager);
        });

    return newTaskCheckpointsFuture.exceptionally(ex -> {
      String msg = String.format("Could not restore task: %s after attempting to restore deleted blobs.", taskName);
      throw new SamzaException(msg, ex);
    });
  }