public Optional getTargetCommitIdentifier()

in xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java [226:275]


  public Optional<String> getTargetCommitIdentifier(String sourceIdentifier) {
    Snapshot currentSnapshot = deltaLog.currentSnapshot().snapshot();

    Iterator<Tuple2<Object, Seq<Action>>> versionIterator =
        JavaConverters.asJavaIteratorConverter(
                deltaLog.getChanges(currentSnapshot.version(), false))
            .asJava();
    while (versionIterator.hasNext()) {
      Tuple2<Object, Seq<Action>> currentChange = versionIterator.next();
      Long targetVersion = currentSnapshot.version();
      List<Action> actions = JavaConverters.seqAsJavaListConverter(currentChange._2()).asJava();

      // Find the CommitInfo in the changes belongs to certain version
      Optional<CommitInfo> commitInfo =
          actions.stream()
              .filter(action -> action instanceof CommitInfo)
              .map(action -> (CommitInfo) action)
              .findFirst();
      if (!commitInfo.isPresent()) {
        continue;
      }

      Option<scala.collection.immutable.Map<String, String>> tags = commitInfo.get().tags();
      if (tags.isEmpty()) {
        continue;
      }

      Option<String> sourceMetadataJson = tags.get().get(TableSyncMetadata.XTABLE_METADATA);
      if (sourceMetadataJson.isEmpty()) {
        continue;
      }

      try {
        Optional<TableSyncMetadata> optionalMetadata =
            TableSyncMetadata.fromJson(sourceMetadataJson.get());
        if (!optionalMetadata.isPresent()) {
          continue;
        }

        TableSyncMetadata metadata = optionalMetadata.get();
        if (sourceIdentifier.equals(metadata.getSourceIdentifier())) {
          return Optional.of(String.valueOf(targetVersion));
        }
      } catch (Exception e) {
        log.warn("Failed to parse commit metadata for commit: {}", targetVersion, e);
      }
    }

    return Optional.empty();
  }