in xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java [226:275]
public Optional<String> getTargetCommitIdentifier(String sourceIdentifier) {
Snapshot currentSnapshot = deltaLog.currentSnapshot().snapshot();
Iterator<Tuple2<Object, Seq<Action>>> versionIterator =
JavaConverters.asJavaIteratorConverter(
deltaLog.getChanges(currentSnapshot.version(), false))
.asJava();
while (versionIterator.hasNext()) {
Tuple2<Object, Seq<Action>> currentChange = versionIterator.next();
Long targetVersion = currentSnapshot.version();
List<Action> actions = JavaConverters.seqAsJavaListConverter(currentChange._2()).asJava();
// Find the CommitInfo in the changes belongs to certain version
Optional<CommitInfo> commitInfo =
actions.stream()
.filter(action -> action instanceof CommitInfo)
.map(action -> (CommitInfo) action)
.findFirst();
if (!commitInfo.isPresent()) {
continue;
}
Option<scala.collection.immutable.Map<String, String>> tags = commitInfo.get().tags();
if (tags.isEmpty()) {
continue;
}
Option<String> sourceMetadataJson = tags.get().get(TableSyncMetadata.XTABLE_METADATA);
if (sourceMetadataJson.isEmpty()) {
continue;
}
try {
Optional<TableSyncMetadata> optionalMetadata =
TableSyncMetadata.fromJson(sourceMetadataJson.get());
if (!optionalMetadata.isPresent()) {
continue;
}
TableSyncMetadata metadata = optionalMetadata.get();
if (sourceIdentifier.equals(metadata.getSourceIdentifier())) {
return Optional.of(String.valueOf(targetVersion));
}
} catch (Exception e) {
log.warn("Failed to parse commit metadata for commit: {}", targetVersion, e);
}
}
return Optional.empty();
}