protected GetFilePathsToFileStatusResult getFilePathsToFileStatus()

in gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java [201:330]


  protected GetFilePathsToFileStatusResult getFilePathsToFileStatus(FileSystem targetFs, CopyConfiguration copyConfig, boolean shouldIncludeMetadataPath)
      throws IOException {
    IcebergTable icebergTable = this.getSrcIcebergTable();
    /** @return whether `pathStr` is present on `targetFs`, caching results while tunneling checked exceptions outward */
    Function<String, Boolean> isPresentOnTarget = CheckedExceptionFunction.wrapToTunneled(pathStr ->
      // omit considering timestamp (or other markers of freshness), as files should be immutable
      // ATTENTION: `CopyContext.getFileStatus()`, to partake in caching
      copyConfig.getCopyContext().getFileStatus(targetFs, new Path(pathStr)).isPresent()
    );

    // check first for case of nothing to replicate, to avoid needless scanning of a potentially massive iceberg
    // NOTE: if `shouldIncludeMetadataPath` was false during the prior executions, this condition will be false
    IcebergSnapshotInfo currentSnapshotOverview = icebergTable.getCurrentSnapshotInfoOverviewOnly();
    if (currentSnapshotOverview.getMetadataPath().map(isPresentOnTarget).orElse(false) &&
        isPresentOnTarget.apply(currentSnapshotOverview.getManifestListPath())) {
      log.info("~{}~ skipping entire iceberg, since snapshot '{}' at '{}' and metadata '{}' both present on target",
          this.getFileSetId(), currentSnapshotOverview.getSnapshotId(),
          currentSnapshotOverview.getManifestListPath(),
          currentSnapshotOverview.getMetadataPath().orElse("<<ERROR: MISSING!>>"));
      TableMetadata readTimeTableMetadata = currentSnapshotOverview.getTableMetadata().orElseThrow(() -> new RuntimeException(
          String.format("~%s~ no table metadata for current snapshot '%s' at '%s' with metadata path '%s'",
              this.getFileSetId(), currentSnapshotOverview.getSnapshotId(),
              currentSnapshotOverview.getManifestListPath(),
              currentSnapshotOverview.getMetadataPath().orElse("<<ERROR: MISSING!>>"))));
      return new GetFilePathsToFileStatusResult(Maps.newHashMap(), readTimeTableMetadata);
    }

    List<TableMetadata> readTimeTableMetadataHolder = Lists.newArrayList(); // expecting exactly one elem
    Iterator<IcebergSnapshotInfo> icebergIncrementalSnapshotInfos = icebergTable.getIncrementalSnapshotInfosIterator();
    Iterator<String> filePathsIterator = Iterators.concat(
        Iterators.transform(icebergIncrementalSnapshotInfos, snapshotInfo -> {
          snapshotInfo.getTableMetadata().ifPresent(readTimeTableMetadataHolder::add);
          // log each snapshot, for context, in case of `FileNotFoundException` during `FileSystem.getFileStatus()`
          String manListPath = snapshotInfo.getManifestListPath();
          log.info("~{}~ loaded snapshot '{}' at '{}' from metadata path: '{}'", this.getFileSetId(),
              snapshotInfo.getSnapshotId(), manListPath, snapshotInfo.getMetadataPath().orElse("<<inherited>>"));
          // ALGO: an iceberg's files form a tree of four levels: metadata.json -> manifest-list -> manifest -> data;
          // most critically, all are presumed immutable and uniquely named, although any may be replaced.  we depend
          // also on incremental copy being run always atomically: to commit each iceberg only upon its full success.
          // thus established, the presence of a file at dest (identified by path/name) guarantees its entire subtree is
          // already copied--and, given immutability, completion of a prior copy naturally renders that file up-to-date.
          // hence, its entire subtree may be short-circuited.  nevertheless, absence of a file at dest cannot imply
          // its entire subtree necessarily requires copying, because it is possible, even likely in practice, that some
          // metadata files would have been replaced (e.g. during snapshot compaction).  in such instances, at least
          // some of the children pointed to within could have been copied prior, when they previously appeared as a
          // child of the current file's predecessor (which this new meta file now replaces).
          if (!isPresentOnTarget.apply(manListPath)) {
            List<String> missingPaths = snapshotInfo.getSnapshotApexPaths(shouldIncludeMetadataPath);
            for (IcebergSnapshotInfo.ManifestFileInfo mfi : snapshotInfo.getManifestFiles()) {
              if (!isPresentOnTarget.apply(mfi.getManifestFilePath())) {
                missingPaths.add(mfi.getManifestFilePath());
                // being incremental info, no listed paths would have appeared prior w/ other snapshots, so add all now.
                // skip verification despite corner case of a snapshot having reorganized/rebalanced manifest contents
                // during a period where replication fell so far behind that no snapshots listed among current metadata
                // are yet at dest.  since the consequence of unnecessary copy is merely wasted data transfer and
                // compute--and overall, potential is small--prefer sidestepping expense of exhaustive checking, since
                // file count may run into 100k+ (even beyond!)
                missingPaths.addAll(mfi.getListedFilePaths());
              }
            }
            log.info("~{}~ snapshot '{}': collected {} additional source paths",
                this.getFileSetId(), snapshotInfo.getSnapshotId(), missingPaths.size());
            return missingPaths.iterator();
          } else {
            log.info("~{}~ snapshot '{}' already present on target... skipping (including contents)",
                this.getFileSetId(), snapshotInfo.getSnapshotId());
            // IMPORTANT: separately consider metadata path, to handle case of 'metadata-only' snapshot reusing manifest list
            Optional<String> metadataPath = snapshotInfo.getMetadataPath();
            Optional<String> nonReplicatedMetadataPath = metadataPath.filter(p -> !isPresentOnTarget.apply(p));
            metadataPath.ifPresent(ignore ->
                log.info("~{}~ metadata IS {} present on target",
                    this.getFileSetId(),
                    !nonReplicatedMetadataPath.isPresent()
                        ? "already"
                        : shouldIncludeMetadataPath ? "NOT YET" : "NOT CHOSEN TO BE")
            );
            return nonReplicatedMetadataPath
                .filter(ignore -> shouldIncludeMetadataPath)
                .map(p -> Lists.newArrayList(p).iterator())
                .orElse(Collections.emptyIterator());
          }
        })
    );

    Map<Path, FileStatus> results = Maps.newHashMap();
    long numSourceFilesNotFound = 0L;
    Iterable<String> filePathsIterable = () -> filePathsIterator;
    try {
      // TODO: investigate whether streaming initialization of `Map` preferable--`getFileStatus()` network calls likely
      // to benefit from parallelism
      GrowthMilestoneTracker growthTracker = new GrowthMilestoneTracker();
      PathErrorConsolidator errorConsolidator = new PathErrorConsolidator();
      for (String pathString : filePathsIterable) {
        Path path = new Path(pathString);
        try {
          results.put(path, this.sourceFs.getFileStatus(path));
          if (growthTracker.isAnotherMilestone(results.size())) {
            log.info("~{}~ collected file status on '{}' source paths", this.getFileSetId(), results.size());
          }
        } catch (FileNotFoundException fnfe) {
          if (!shouldTolerateMissingSourceFiles) {
            throw fnfe;
          } else {
            // log, but otherwise swallow... to continue on
            String total = ++numSourceFilesNotFound + " total";
            String speculation = "either premature deletion broke time-travel or metadata read interleaved among delete";
            errorConsolidator.prepLogMsg(path).ifPresent(msg ->
                log.warn("~{}~ source {} ({}... {})", this.getFileSetId(), msg, speculation, total)
            );
          }
        }
      }
    } catch (CheckedExceptionFunction.WrappedIOException wrapper) {
      wrapper.rethrowWrapped();
    }

    if (readTimeTableMetadataHolder.size() != 1) {
      final int firstNumToShow = 5;
      String newline = System.lineSeparator();
      String errMsg = readTimeTableMetadataHolder.size() == 0
          ? String.format("~%s~ no table metadata ever encountered!", this.getFileSetId())
          : String.format("~%s~ multiple metadata (%d) encountered (exactly 1 expected) - first %d: [%s]",
              this.getFileSetId(), readTimeTableMetadataHolder.size(), firstNumToShow,
              readTimeTableMetadataHolder.stream().limit(firstNumToShow).map(md ->
                  md.uuid() + " - " + md.metadataFileLocation()
              ).collect(Collectors.joining(newline, newline, newline)));
      throw new RuntimeException(errMsg);
    }
    return new GetFilePathsToFileStatusResult(results, readTimeTableMetadataHolder.get(0));
  }