in samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerRestoreUtil.java [264:335]
private static CompletableFuture<Checkpoint> restoreDeletedSnapshot(TaskName taskName,
Map<TaskName, Checkpoint> taskCheckpoints, CheckpointManager checkpointManager,
TaskRestoreManager taskRestoreManager, Config config, Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics,
ExecutorService executor, Set<String> storesToRestore, File loggedStoreBaseDirectory, JobContext jobContext,
ContainerModel containerModel) {
LOG.warn("Received DeletedException during restore for task {}. Attempting to get blobs with getDeleted set to true",
taskName.getTaskName());
// if taskInstanceMetrics are specified use those for store metrics,
// otherwise (in case of StorageRecovery) use a blank MetricsRegistryMap
MetricsRegistry metricsRegistry =
taskInstanceMetrics.get(taskName) != null ? taskInstanceMetrics.get(taskName).registry()
: new MetricsRegistryMap();
BlobStoreManager blobStoreManager = getBlobStoreManager(config, executor);
blobStoreManager.init();
JobConfig jobConfig = new JobConfig(config);
BlobStoreUtil blobStoreUtil =
new BlobStoreUtil(blobStoreManager, executor, new BlobStoreConfig(config), null,
new BlobStoreRestoreManagerMetrics(metricsRegistry));
BlobStoreRestoreManager blobStoreRestoreManager = (BlobStoreRestoreManager) taskRestoreManager;
CheckpointId checkpointId = CheckpointId.create();
Map<String, String> oldSCMs = ((CheckpointV2) taskCheckpoints.get(taskName)).getStateCheckpointMarkers()
.get(BlobStoreStateBackendFactory.class.getName());
// 1. Restore state with getDeleted flag set to true
CompletableFuture<Void> restoreFuture = blobStoreRestoreManager.restore(true);
// 2. Create a new checkpoint and back it up on the blob store
CompletableFuture<Map<String, String>> backupStoresFuture = restoreFuture.thenCompose(
r -> backupRecoveredStore(jobContext, containerModel, config, taskName, storesToRestore, checkpointId,
loggedStoreBaseDirectory, blobStoreManager, metricsRegistry, executor));
// 3. Mark new Snapshots to never expire
CompletableFuture<Void> removeNewSnapshotsTTLFuture = backupStoresFuture.thenCompose(
storeSCMs -> {
List<CompletableFuture<Void>> removeTTLForSnapshotIndexFutures = new ArrayList<>();
storeSCMs.forEach((store, scm) -> {
Metadata requestMetadata = new Metadata(Metadata.SNAPSHOT_INDEX_PAYLOAD_PATH, Optional.empty(),
jobConfig.getName().get(), jobConfig.getJobId(), taskName.getTaskName(), store);
removeTTLForSnapshotIndexFutures.add(blobStoreUtil.removeTTLForSnapshotIndex(scm, requestMetadata));
});
return CompletableFuture.allOf(removeTTLForSnapshotIndexFutures.toArray(new CompletableFuture[0]));
});
// 4. Delete prev SnapshotIndex including files/subdirs
CompletableFuture<Void> deleteOldSnapshotsFuture = removeNewSnapshotsTTLFuture.thenCompose(
ignore -> {
List<CompletableFuture<Void>> deletePrevSnapshotFutures = new ArrayList<>();
oldSCMs.forEach((store, oldSCM) -> {
Metadata requestMetadata = new Metadata(Metadata.SNAPSHOT_INDEX_PAYLOAD_PATH, Optional.empty(),
jobConfig.getName().get(), jobConfig.getJobId(), taskName.getTaskName(), store);
deletePrevSnapshotFutures.add(blobStoreUtil.cleanSnapshotIndex(oldSCM, requestMetadata, true).toCompletableFuture());
});
return CompletableFuture.allOf(deletePrevSnapshotFutures.toArray(new CompletableFuture[0]));
});
CompletableFuture<Checkpoint> newTaskCheckpointsFuture =
deleteOldSnapshotsFuture.thenCombine(backupStoresFuture, (aVoid, scms) -> {
// cleanup resources
blobStoreManager.close();
// 5. create new checkpoint
return writeNewCheckpoint(taskName, checkpointId, scms, checkpointManager);
});
return newTaskCheckpointsFuture.exceptionally(ex -> {
String msg = String.format("Could not restore task: %s after attempting to restore deleted blobs.", taskName);
throw new SamzaException(msg, ex);
});
}