in gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java [629:749]
protected static DiffPathSet fullPathDiff(HiveLocationDescriptor sourceLocation,
HiveLocationDescriptor desiredTargetLocation, Optional<HiveLocationDescriptor> currentTargetLocation,
Optional<Partition> partition, MultiTimingEvent multiTimer, HiveCopyEntityHelper helper) throws IOException {
// populate version strategy before analyzing diffs
sourceLocation.populateDataFileVersionStrategy();
desiredTargetLocation.populateDataFileVersionStrategy();
DiffPathSet.DiffPathSetBuilder builder = DiffPathSet.builder();
// check the strategy is not empty
if (!sourceLocation.versionStrategy.isPresent() || !desiredTargetLocation.versionStrategy.isPresent()) {
log.warn("Version strategy doesn't exist ({},{}), cannot handle copy.",
sourceLocation.versionStrategy.isPresent(),
desiredTargetLocation.versionStrategy.isPresent());
return builder.build();
}
// check if the src and dst strategy are the same
if (!sourceLocation.versionStrategy.get().getClass().getName()
.equals(desiredTargetLocation.versionStrategy.get().getClass().getName())) {
log.warn("Version strategy src: {} and dst: {} doesn't match, cannot handle copy.",
sourceLocation.versionStrategy.get().getClass().getName(),
desiredTargetLocation.versionStrategy.get().getClass().getName());
return builder.build();
}
multiTimer.nextStage(Stages.SOURCE_PATH_LISTING);
// These are the paths at the source
Map<Path, FileStatus> sourcePaths = sourceLocation.getPaths();
multiTimer.nextStage(Stages.TARGET_EXISTING_PATH_LISTING);
// These are the paths that the existing target table / partition uses now
Map<Path, FileStatus> targetExistingPaths = currentTargetLocation.isPresent()
? currentTargetLocation.get().getPaths() : Maps.<Path, FileStatus> newHashMap();
multiTimer.nextStage(Stages.DESIRED_PATHS_LISTING);
// These are the paths that exist at the destination and the new table / partition would pick up
Map<Path, FileStatus> desiredTargetExistingPaths;
try {
desiredTargetExistingPaths = desiredTargetLocation.getPaths();
} catch (IOException ioe) {
// Thrown if inputFormat cannot find location in target. Since location doesn't exist, this set is empty.
desiredTargetExistingPaths = Maps.newHashMap();
}
multiTimer.nextStage(Stages.PATH_DIFF);
for (FileStatus sourcePath : sourcePaths.values()) {
// For each source path
Path newPath = helper.getTargetPathHelper().getTargetPath(sourcePath.getPath(), desiredTargetLocation.getFileSystem(), partition, true);
boolean shouldCopy = true;
// Can optimize by using the mod time that has already been fetched
boolean useDirectGetModTime = sourceLocation.versionStrategy.isPresent()
&& sourceLocation.versionStrategy.get().getClass().getName().equals(
ModTimeDataFileVersionStrategy.class.getName());
if (desiredTargetExistingPaths.containsKey(newPath)) {
// If the file exists at the destination, check whether it should be replaced, if not, no need to copy
FileStatus existingTargetStatus = desiredTargetExistingPaths.get(newPath);
Comparable srcVer = useDirectGetModTime ? sourcePath.getModificationTime() :
sourceLocation.versionStrategy.get().getVersion(sourcePath.getPath());
Comparable dstVer = useDirectGetModTime ? existingTargetStatus.getModificationTime() :
desiredTargetLocation.versionStrategy.get().getVersion(existingTargetStatus.getPath());
// destination has higher version, skip the copy
if (srcVer.compareTo(dstVer) <= 0) {
if (!helper.isEnforceFileSizeMatch() || existingTargetStatus.getLen() == sourcePath.getLen()) {
log.debug("Copy from src {} (version:{}) to dst {} (version:{}) can be skipped since file size ({} bytes) is matching",
sourcePath.getPath(), srcVer, existingTargetStatus.getPath(), dstVer, sourcePath.getLen());
shouldCopy = false;
} else {
log.debug("Copy from src {} (version:{}) to dst {} (version:{}) can not be skipped because the file size is not matching or it is enforced by this config: {}",
sourcePath.getPath(), srcVer, existingTargetStatus.getPath(), dstVer, CopyConfiguration.ENFORCE_FILE_LENGTH_MATCH);
}
} else {
log.debug("Copy from src {} (v:{}) to dst {} (v:{}) is needed due to a higher version.",
sourcePath.getPath(), srcVer, existingTargetStatus.getPath(), dstVer);
}
}
if (shouldCopy) {
builder.copyFile(sourcePath);
} else {
// If not copying, we want to keep the file in the target
// at the end of this loop, all files in targetExistingPaths will be marked for deletion, so remove this file
targetExistingPaths.remove(newPath);
desiredTargetExistingPaths.remove(newPath);
}
}
multiTimer.nextStage(Stages.COMPUTE_DELETE_PATHS);
// At this point, targetExistingPaths contains paths managed by target partition / table, but that we don't want
// delete them
for (Path delete : targetExistingPaths.keySet()) {
builder.deleteFile(delete);
desiredTargetExistingPaths.remove(delete);
}
// Now desiredTargetExistingPaths contains paths that we don't want, but which are not managed by the existing
// table / partition.
// Ideally, we shouldn't delete them (they're not managed by Hive), and we don't want to pick
// them up in the new table / partition, so if there are any leftover files, we should abort copying
// this table / partition.
if (desiredTargetExistingPaths.size() > 0 && helper.getUnmanagedDataPolicy() != UnmanagedDataPolicy.DELETE_UNMANAGED_DATA) {
throw new IOException(String.format(
"New table / partition would pick up existing, undesired files in target file system. " + "%s, files %s.",
partition.isPresent() ? partition.get().getCompleteName() : helper.getDataset().getTable().getCompleteName(),
Arrays.toString(desiredTargetExistingPaths.keySet().toArray())));
}
// Unless, the policy requires us to delete such un-managed files - in which case: we will add the leftover files
// to the deletion list.
else if (desiredTargetExistingPaths.size() > 0) {
for (Path delete : desiredTargetExistingPaths.keySet()) {
builder.deleteFile(delete);
}
log.warn(String.format("Un-managed files detected in target file system, however deleting them "
+ "because of the policy: %s Files to be deleted are: %s", UnmanagedDataPolicy.DELETE_UNMANAGED_DATA,
StringUtils.join(desiredTargetExistingPaths.keySet(), ",")));
}
return builder.build();
}