in gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java [201:330]
protected GetFilePathsToFileStatusResult getFilePathsToFileStatus(FileSystem targetFs, CopyConfiguration copyConfig, boolean shouldIncludeMetadataPath)
throws IOException {
IcebergTable icebergTable = this.getSrcIcebergTable();
/** @return whether `pathStr` is present on `targetFs`, caching results while tunneling checked exceptions outward */
Function<String, Boolean> isPresentOnTarget = CheckedExceptionFunction.wrapToTunneled(pathStr ->
// omit considering timestamp (or other markers of freshness), as files should be immutable
// ATTENTION: `CopyContext.getFileStatus()`, to partake in caching
copyConfig.getCopyContext().getFileStatus(targetFs, new Path(pathStr)).isPresent()
);
// check first for case of nothing to replicate, to avoid needless scanning of a potentially massive iceberg
// NOTE: if `shouldIncludeMetadataPath` was false during the prior executions, this condition will be false
IcebergSnapshotInfo currentSnapshotOverview = icebergTable.getCurrentSnapshotInfoOverviewOnly();
if (currentSnapshotOverview.getMetadataPath().map(isPresentOnTarget).orElse(false) &&
isPresentOnTarget.apply(currentSnapshotOverview.getManifestListPath())) {
log.info("~{}~ skipping entire iceberg, since snapshot '{}' at '{}' and metadata '{}' both present on target",
this.getFileSetId(), currentSnapshotOverview.getSnapshotId(),
currentSnapshotOverview.getManifestListPath(),
currentSnapshotOverview.getMetadataPath().orElse("<<ERROR: MISSING!>>"));
TableMetadata readTimeTableMetadata = currentSnapshotOverview.getTableMetadata().orElseThrow(() -> new RuntimeException(
String.format("~%s~ no table metadata for current snapshot '%s' at '%s' with metadata path '%s'",
this.getFileSetId(), currentSnapshotOverview.getSnapshotId(),
currentSnapshotOverview.getManifestListPath(),
currentSnapshotOverview.getMetadataPath().orElse("<<ERROR: MISSING!>>"))));
return new GetFilePathsToFileStatusResult(Maps.newHashMap(), readTimeTableMetadata);
}
List<TableMetadata> readTimeTableMetadataHolder = Lists.newArrayList(); // expecting exactly one elem
Iterator<IcebergSnapshotInfo> icebergIncrementalSnapshotInfos = icebergTable.getIncrementalSnapshotInfosIterator();
Iterator<String> filePathsIterator = Iterators.concat(
Iterators.transform(icebergIncrementalSnapshotInfos, snapshotInfo -> {
snapshotInfo.getTableMetadata().ifPresent(readTimeTableMetadataHolder::add);
// log each snapshot, for context, in case of `FileNotFoundException` during `FileSystem.getFileStatus()`
String manListPath = snapshotInfo.getManifestListPath();
log.info("~{}~ loaded snapshot '{}' at '{}' from metadata path: '{}'", this.getFileSetId(),
snapshotInfo.getSnapshotId(), manListPath, snapshotInfo.getMetadataPath().orElse("<<inherited>>"));
// ALGO: an iceberg's files form a tree of four levels: metadata.json -> manifest-list -> manifest -> data;
// most critically, all are presumed immutable and uniquely named, although any may be replaced. we depend
// also on incremental copy being run always atomically: to commit each iceberg only upon its full success.
// thus established, the presence of a file at dest (identified by path/name) guarantees its entire subtree is
// already copied--and, given immutability, completion of a prior copy naturally renders that file up-to-date.
// hence, its entire subtree may be short-circuited. nevertheless, absence of a file at dest cannot imply
// its entire subtree necessarily requires copying, because it is possible, even likely in practice, that some
// metadata files would have been replaced (e.g. during snapshot compaction). in such instances, at least
// some of the children pointed to within could have been copied prior, when they previously appeared as a
// child of the current file's predecessor (which this new meta file now replaces).
if (!isPresentOnTarget.apply(manListPath)) {
List<String> missingPaths = snapshotInfo.getSnapshotApexPaths(shouldIncludeMetadataPath);
for (IcebergSnapshotInfo.ManifestFileInfo mfi : snapshotInfo.getManifestFiles()) {
if (!isPresentOnTarget.apply(mfi.getManifestFilePath())) {
missingPaths.add(mfi.getManifestFilePath());
// being incremental info, no listed paths would have appeared prior w/ other snapshots, so add all now.
// skip verification despite corner case of a snapshot having reorganized/rebalanced manifest contents
// during a period where replication fell so far behind that no snapshots listed among current metadata
// are yet at dest. since the consequence of unnecessary copy is merely wasted data transfer and
// compute--and overall, potential is small--prefer sidestepping expense of exhaustive checking, since
// file count may run into 100k+ (even beyond!)
missingPaths.addAll(mfi.getListedFilePaths());
}
}
log.info("~{}~ snapshot '{}': collected {} additional source paths",
this.getFileSetId(), snapshotInfo.getSnapshotId(), missingPaths.size());
return missingPaths.iterator();
} else {
log.info("~{}~ snapshot '{}' already present on target... skipping (including contents)",
this.getFileSetId(), snapshotInfo.getSnapshotId());
// IMPORTANT: separately consider metadata path, to handle case of 'metadata-only' snapshot reusing manifest list
Optional<String> metadataPath = snapshotInfo.getMetadataPath();
Optional<String> nonReplicatedMetadataPath = metadataPath.filter(p -> !isPresentOnTarget.apply(p));
metadataPath.ifPresent(ignore ->
log.info("~{}~ metadata IS {} present on target",
this.getFileSetId(),
!nonReplicatedMetadataPath.isPresent()
? "already"
: shouldIncludeMetadataPath ? "NOT YET" : "NOT CHOSEN TO BE")
);
return nonReplicatedMetadataPath
.filter(ignore -> shouldIncludeMetadataPath)
.map(p -> Lists.newArrayList(p).iterator())
.orElse(Collections.emptyIterator());
}
})
);
Map<Path, FileStatus> results = Maps.newHashMap();
long numSourceFilesNotFound = 0L;
Iterable<String> filePathsIterable = () -> filePathsIterator;
try {
// TODO: investigate whether streaming initialization of `Map` preferable--`getFileStatus()` network calls likely
// to benefit from parallelism
GrowthMilestoneTracker growthTracker = new GrowthMilestoneTracker();
PathErrorConsolidator errorConsolidator = new PathErrorConsolidator();
for (String pathString : filePathsIterable) {
Path path = new Path(pathString);
try {
results.put(path, this.sourceFs.getFileStatus(path));
if (growthTracker.isAnotherMilestone(results.size())) {
log.info("~{}~ collected file status on '{}' source paths", this.getFileSetId(), results.size());
}
} catch (FileNotFoundException fnfe) {
if (!shouldTolerateMissingSourceFiles) {
throw fnfe;
} else {
// log, but otherwise swallow... to continue on
String total = ++numSourceFilesNotFound + " total";
String speculation = "either premature deletion broke time-travel or metadata read interleaved among delete";
errorConsolidator.prepLogMsg(path).ifPresent(msg ->
log.warn("~{}~ source {} ({}... {})", this.getFileSetId(), msg, speculation, total)
);
}
}
}
} catch (CheckedExceptionFunction.WrappedIOException wrapper) {
wrapper.rethrowWrapped();
}
if (readTimeTableMetadataHolder.size() != 1) {
final int firstNumToShow = 5;
String newline = System.lineSeparator();
String errMsg = readTimeTableMetadataHolder.size() == 0
? String.format("~%s~ no table metadata ever encountered!", this.getFileSetId())
: String.format("~%s~ multiple metadata (%d) encountered (exactly 1 expected) - first %d: [%s]",
this.getFileSetId(), readTimeTableMetadataHolder.size(), firstNumToShow,
readTimeTableMetadataHolder.stream().limit(firstNumToShow).map(md ->
md.uuid() + " - " + md.metadataFileLocation()
).collect(Collectors.joining(newline, newline, newline)));
throw new RuntimeException(errMsg);
}
return new GetFilePathsToFileStatusResult(results, readTimeTableMetadataHolder.get(0));
}