in hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java [809:1035]
void generateSnapshotDiffReport(final String jobKey,
final String jobId,
final String volumeName,
final String bucketName,
final String fromSnapshotName,
final String toSnapshotName,
final boolean forceFullDiff,
final boolean disableNativeDiff) {
LOG.info("Started snap diff report generation for volume: '{}', " +
"bucket: '{}', fromSnapshot: '{}', toSnapshot: '{}'",
volumeName, bucketName, fromSnapshotName, toSnapshotName);
ozoneManager.getMetrics().incNumSnapshotDiffJobs();
ColumnFamilyHandle fromSnapshotColumnFamily = null;
ColumnFamilyHandle toSnapshotColumnFamily = null;
ColumnFamilyHandle objectIDsColumnFamily = null;
// Creates temporary unique dir for the snapDiff job to keep SST files
// hardlinks. JobId is used as dir name for uniqueness.
// It is required to prevent that SST files get deleted for in_progress
// job by RocksDBCheckpointDiffer#pruneOlderSnapshotsWithCompactionHistory.
Path path = Paths.get(sstBackupDirForSnapDiffJobs + "/" + jobId);
ReferenceCounted<OmSnapshot> rcFromSnapshot = null;
ReferenceCounted<OmSnapshot> rcToSnapshot = null;
try {
if (!areDiffJobAndSnapshotsActive(volumeName, bucketName,
fromSnapshotName, toSnapshotName)) {
return;
}
rcFromSnapshot =
ozoneManager.getOmSnapshotManager()
.getActiveSnapshot(volumeName, bucketName, fromSnapshotName);
rcToSnapshot =
ozoneManager.getOmSnapshotManager()
.getActiveSnapshot(volumeName, bucketName, toSnapshotName);
OmSnapshot fromSnapshot = rcFromSnapshot.get();
OmSnapshot toSnapshot = rcToSnapshot.get();
SnapshotInfo fsInfo = getSnapshotInfo(ozoneManager,
volumeName, bucketName, fromSnapshotName);
SnapshotInfo tsInfo = getSnapshotInfo(ozoneManager,
volumeName, bucketName, toSnapshotName);
Files.createDirectories(path);
// JobId is prepended to column families name to make them unique
// for request.
fromSnapshotColumnFamily =
createColumnFamily(jobId + FROM_SNAP_TABLE_SUFFIX);
toSnapshotColumnFamily =
createColumnFamily(jobId + TO_SNAP_TABLE_SUFFIX);
objectIDsColumnFamily =
createColumnFamily(jobId + UNIQUE_IDS_TABLE_SUFFIX);
// ObjectId to keyName map to keep key info for fromSnapshot.
// objectIdToKeyNameMap is used to identify what keys were touched
// in which snapshot and to know the difference if operation was
// creation, deletion, modify or rename.
// Stores only keyName instead of OmKeyInfo to reduce the memory
// footprint.
// Note: Store objectId and keyName as byte array to reduce unnecessary
// serialization and deserialization.
final PersistentMap<byte[], byte[]> objectIdToKeyNameMapForFromSnapshot =
new RocksDbPersistentMap<>(db, fromSnapshotColumnFamily,
codecRegistry, byte[].class, byte[].class);
// ObjectId to keyName map to keep key info for toSnapshot.
final PersistentMap<byte[], byte[]> objectIdToKeyNameMapForToSnapshot =
new RocksDbPersistentMap<>(db, toSnapshotColumnFamily, codecRegistry,
byte[].class, byte[].class);
// Set of unique objectId between fromSnapshot and toSnapshot.
final PersistentMap<byte[], Boolean> objectIdToIsDirMap =
new RocksDbPersistentMap<>(db, objectIDsColumnFamily, codecRegistry,
byte[].class, Boolean.class);
final BucketLayout bucketLayout = getBucketLayout(volumeName, bucketName,
fromSnapshot.getMetadataManager());
Map<String, String> tablePrefixes =
getColumnFamilyToKeyPrefixMap(toSnapshot.getMetadataManager(),
volumeName, bucketName);
boolean useFullDiff = snapshotForceFullDiff || forceFullDiff;
boolean performNonNativeDiff = diffDisableNativeLibs || disableNativeDiff;
if (!areDiffJobAndSnapshotsActive(volumeName, bucketName,
fromSnapshotName, toSnapshotName)) {
return;
}
Table<String, OmKeyInfo> fsKeyTable = fromSnapshot.getMetadataManager()
.getKeyTable(bucketLayout);
Table<String, OmKeyInfo> tsKeyTable = toSnapshot.getMetadataManager()
.getKeyTable(bucketLayout);
Table<String, OmDirectoryInfo> fsDirTable;
Table<String, OmDirectoryInfo> tsDirTable;
final Optional<Set<Long>> oldParentIds;
final Optional<Set<Long>> newParentIds;
if (bucketLayout.isFileSystemOptimized()) {
oldParentIds = Optional.of(new HashSet<>());
newParentIds = Optional.of(new HashSet<>());
fsDirTable = fromSnapshot.getMetadataManager().getDirectoryTable();
tsDirTable = toSnapshot.getMetadataManager().getDirectoryTable();
} else {
oldParentIds = Optional.empty();
newParentIds = Optional.empty();
fsDirTable = null;
tsDirTable = null;
}
final Optional<Map<Long, Path>> oldParentIdPathMap;
final Optional<Map<Long, Path>> newParentIdPathMap;
if (bucketLayout.isFileSystemOptimized()) {
oldParentIdPathMap = Optional.of(Maps.newHashMap());
newParentIdPathMap = Optional.of(Maps.newHashMap());
} else {
oldParentIdPathMap = Optional.empty();
newParentIdPathMap = Optional.empty();
}
// These are the most time and resource consuming method calls.
// Split the calls into steps and store them in an array, to avoid
// repetition while constantly checking if the job is cancelled.
Callable<Void>[] methodCalls = new Callable[]{
() -> {
recordActivity(jobKey, OBJECT_ID_MAP_GEN_OBS);
getDeltaFilesAndDiffKeysToObjectIdToKeyMap(fsKeyTable, tsKeyTable,
fromSnapshot, toSnapshot, fsInfo, tsInfo, useFullDiff,
performNonNativeDiff, tablePrefixes,
objectIdToKeyNameMapForFromSnapshot,
objectIdToKeyNameMapForToSnapshot, objectIdToIsDirMap,
oldParentIds, newParentIds, path.toString(), jobKey);
return null;
},
() -> {
if (bucketLayout.isFileSystemOptimized()) {
recordActivity(jobKey, OBJECT_ID_MAP_GEN_FSO);
getDeltaFilesAndDiffKeysToObjectIdToKeyMap(fsDirTable, tsDirTable,
fromSnapshot, toSnapshot, fsInfo, tsInfo, useFullDiff,
performNonNativeDiff, tablePrefixes,
objectIdToKeyNameMapForFromSnapshot,
objectIdToKeyNameMapForToSnapshot, objectIdToIsDirMap,
oldParentIds, newParentIds, path.toString(), jobKey);
}
return null;
},
() -> {
if (bucketLayout.isFileSystemOptimized()) {
long bucketId = toSnapshot.getMetadataManager()
.getBucketId(volumeName, bucketName);
String tablePrefix = getTablePrefix(tablePrefixes,
fromSnapshot.getMetadataManager()
.getDirectoryTable().getName());
oldParentIdPathMap.get().putAll(new FSODirectoryPathResolver(
tablePrefix, bucketId,
fromSnapshot.getMetadataManager().getDirectoryTable())
.getAbsolutePathForObjectIDs(oldParentIds));
newParentIdPathMap.get().putAll(new FSODirectoryPathResolver(
tablePrefix, bucketId,
toSnapshot.getMetadataManager().getDirectoryTable())
.getAbsolutePathForObjectIDs(newParentIds, true));
}
return null;
},
() -> {
recordActivity(jobKey, DIFF_REPORT_GEN);
long totalDiffEntries = generateDiffReport(jobId,
fsKeyTable,
tsKeyTable,
fsDirTable,
tsDirTable,
objectIdToIsDirMap,
objectIdToKeyNameMapForFromSnapshot,
objectIdToKeyNameMapForToSnapshot,
volumeName, bucketName,
fromSnapshotName, toSnapshotName,
bucketLayout.isFileSystemOptimized(), oldParentIdPathMap,
newParentIdPathMap, tablePrefixes);
// If job is cancelled, totalDiffEntries will be equal to -1.
if (totalDiffEntries >= 0 &&
areDiffJobAndSnapshotsActive(volumeName, bucketName,
fromSnapshotName, toSnapshotName)) {
updateJobStatusToDone(jobKey, totalDiffEntries);
}
return null;
}
};
// Check if the job is cancelled, before every method call.
for (Callable<Void> methodCall : methodCalls) {
if (!areDiffJobAndSnapshotsActive(volumeName, bucketName,
fromSnapshotName, toSnapshotName)) {
return;
}
methodCall.call();
}
} catch (IOException | RocksDBException exception) {
updateJobStatusToFailed(jobKey, exception.getMessage());
LOG.error("Caught checked exception during diff report generation for " +
"volume: {} bucket: {}, fromSnapshot: {} and toSnapshot: {}",
volumeName, bucketName, fromSnapshotName, toSnapshotName, exception);
// TODO: [SNAPSHOT] Fail gracefully. Also check if it is even needed to
// throw this exception.
throw new RuntimeException(exception);
} catch (Exception exception) {
updateJobStatusToFailed(jobKey, exception.getMessage());
LOG.error("Caught unchecked exception during diff report generation " +
"for volume: {} bucket: {}, fromSnapshot: {} and toSnapshot: {}",
volumeName, bucketName, fromSnapshotName, toSnapshotName, exception);
// TODO: [SNAPSHOT] Fail gracefully. Also check if it is even needed to
// throw this exception.
throw new RuntimeException(exception);
} finally {
// Clean up: drop the intermediate column family and close them.
dropAndCloseColumnFamilyHandle(fromSnapshotColumnFamily);
dropAndCloseColumnFamilyHandle(toSnapshotColumnFamily);
dropAndCloseColumnFamilyHandle(objectIDsColumnFamily);
// Delete SST files backup directory.
deleteDir(path);
// Decrement ref counts
if (rcFromSnapshot != null) {
rcFromSnapshot.close();
}
if (rcToSnapshot != null) {
rcToSnapshot.close();
}
}
}