in samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java [228:310]
static CompletableFuture<Void> restoreStores(String jobName, String jobId, TaskName taskName, Set<String> storesToRestore,
Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes,
File loggedBaseDir, StorageConfig storageConfig, BlobStoreRestoreManagerMetrics metrics,
StorageManagerUtil storageManagerUtil, BlobStoreUtil blobStoreUtil, DirDiffUtil dirDiffUtil,
ExecutorService executor, boolean getDeleted, boolean compareFileOwners) {
long restoreStartTime = System.nanoTime();
List<CompletionStage<Void>> restoreFutures = new ArrayList<>();
LOG.debug("Starting restore for task: {} stores: {}", taskName, storesToRestore);
storesToRestore.forEach(storeName -> {
if (!prevStoreSnapshotIndexes.containsKey(storeName)) {
LOG.info("No checkpointed snapshot index found for task: {} store: {}. Skipping restore.", taskName, storeName);
// TODO HIGH shesharm what should we do with the local state already present on disk, if any?
// E.g. this will be the case if user changes a store from changelog based backup and restore to
// blob store based backup and restore, both at the same time.
return;
}
Pair<String, SnapshotIndex> scmAndSnapshotIndex = prevStoreSnapshotIndexes.get(storeName);
long storeRestoreStartTime = System.nanoTime();
SnapshotIndex snapshotIndex = scmAndSnapshotIndex.getRight();
DirIndex dirIndex = snapshotIndex.getDirIndex();
DirIndex.Stats stats = DirIndex.getStats(dirIndex);
metrics.filesToRestore.getValue().addAndGet(stats.filesPresent);
metrics.bytesToRestore.getValue().addAndGet(stats.bytesPresent);
metrics.filesRemaining.getValue().addAndGet(stats.filesPresent);
metrics.bytesRemaining.getValue().addAndGet(stats.bytesPresent);
CheckpointId checkpointId = snapshotIndex.getSnapshotMetadata().getCheckpointId();
File storeDir = storageManagerUtil.getTaskStoreDir(loggedBaseDir, storeName, taskName, TaskMode.Active);
Path storeCheckpointDir = Paths.get(storageManagerUtil.getStoreCheckpointDir(storeDir, checkpointId));
LOG.trace("Got task: {} store: {} local store directory: {} and local store checkpoint directory: {}",
taskName, storeName, storeDir, storeCheckpointDir);
// we always delete the store dir to preserve transactional state guarantees.
try {
LOG.debug("Deleting local store directory: {}. Will be restored from local store checkpoint directory " +
"or remote snapshot.", storeDir);
if (storeDir.exists() && storeDir.isDirectory()) {
PathUtils.deleteDirectory(storeDir.toPath());
}
} catch (IOException e) {
throw new SamzaException(String.format("Error deleting store directory: %s", storeDir), e);
}
// Restore from blob store if:
// 1. shouldRestore() returns true - there is a diff between local and remote snapshot.
// 2. getDeleted is set - Some blobs in the blob store were deleted incorrectly (SAMZA-2787). Download/restore
// everything locally ignoring the diff. This will be backed up afresh by
// ContainerStorageManager recovery path.
boolean shouldRestore = getDeleted || shouldRestore(taskName.getTaskName(), storeName, dirIndex,
storeCheckpointDir, storageConfig, dirDiffUtil);
if (shouldRestore) { // restore the store from the remote blob store
// delete all store checkpoint directories. if we only delete the store directory and don't
// delete the checkpoint directories, the store size on disk will grow to 2x after restore
// until the first commit is completed and older checkpoint dirs are deleted. This is
// because the hard-linked checkpoint dir files will no longer be de-duped with the
// now-deleted main store directory contents and will take up additional space of their
// own during the restore.
deleteCheckpointDirs(taskName, storeName, loggedBaseDir, storageManagerUtil);
metrics.storePreRestoreNs.get(storeName).set(System.nanoTime() - storeRestoreStartTime);
enqueueRestore(jobName, jobId, taskName.toString(), storeName, storeDir, dirIndex, storeRestoreStartTime,
restoreFutures, blobStoreUtil, dirDiffUtil, metrics, executor, getDeleted, compareFileOwners);
} else {
LOG.debug("Renaming store checkpoint directory: {} to store directory: {} since its contents are identical " +
"to the remote snapshot.", storeCheckpointDir, storeDir);
// atomically rename the checkpoint dir to the store dir
new FileUtil().move(storeCheckpointDir.toFile(), storeDir);
// delete any other checkpoint dirs.
deleteCheckpointDirs(taskName, storeName, loggedBaseDir, storageManagerUtil);
}
});
// wait for all restores to finish
return FutureUtil.allOf(restoreFutures).whenComplete((res, ex) -> {
LOG.info("Restore completed for task: {} stores", taskName);
metrics.restoreNs.set(System.nanoTime() - restoreStartTime);
});
}