in paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java [268:382]
public void commit(
ManifestCommittable committable,
Map<String, String> properties,
boolean checkAppendFiles) {
if (LOG.isDebugEnabled()) {
LOG.debug("Ready to commit\n{}", committable.toString());
}
long started = System.nanoTime();
int generatedSnapshot = 0;
int attempts = 0;
Snapshot latestSnapshot = null;
Long safeLatestSnapshotId = null;
List<SimpleFileEntry> baseEntries = new ArrayList<>();
List<ManifestEntry> appendTableFiles = new ArrayList<>();
List<ManifestEntry> appendChangelog = new ArrayList<>();
List<ManifestEntry> compactTableFiles = new ArrayList<>();
List<ManifestEntry> compactChangelog = new ArrayList<>();
List<IndexManifestEntry> appendHashIndexFiles = new ArrayList<>();
List<IndexManifestEntry> compactDvIndexFiles = new ArrayList<>();
collectChanges(
committable.fileCommittables(),
appendTableFiles,
appendChangelog,
compactTableFiles,
compactChangelog,
appendHashIndexFiles,
compactDvIndexFiles);
try {
List<SimpleFileEntry> appendSimpleEntries = SimpleFileEntry.from(appendTableFiles);
if (!ignoreEmptyCommit
|| !appendTableFiles.isEmpty()
|| !appendChangelog.isEmpty()
|| !appendHashIndexFiles.isEmpty()) {
// Optimization for common path.
// Step 1:
// Read manifest entries from changed partitions here and check for conflicts.
// If there are no other jobs committing at the same time,
// we can skip conflict checking in tryCommit method.
// This optimization is mainly used to decrease the number of times we read from
// files.
latestSnapshot = snapshotManager.latestSnapshot();
if (latestSnapshot != null && checkAppendFiles) {
// it is possible that some partitions only have compact changes,
// so we need to contain all changes
baseEntries.addAll(
readAllEntriesFromChangedPartitions(
latestSnapshot, appendTableFiles, compactTableFiles));
noConflictsOrFail(
latestSnapshot.commitUser(), baseEntries, appendSimpleEntries);
safeLatestSnapshotId = latestSnapshot.id();
}
attempts +=
tryCommit(
appendTableFiles,
appendChangelog,
appendHashIndexFiles,
committable.identifier(),
committable.watermark(),
committable.logOffsets(),
Snapshot.CommitKind.APPEND,
noConflictCheck(),
null);
generatedSnapshot += 1;
}
if (!compactTableFiles.isEmpty()
|| !compactChangelog.isEmpty()
|| !compactDvIndexFiles.isEmpty()) {
// Optimization for common path.
// Step 2:
// Add appendChanges to the manifest entries read above and check for conflicts.
// If there are no other jobs committing at the same time,
// we can skip conflict checking in tryCommit method.
// This optimization is mainly used to decrease the number of times we read from
// files.
if (safeLatestSnapshotId != null) {
baseEntries.addAll(appendSimpleEntries);
noConflictsOrFail(
latestSnapshot.commitUser(),
baseEntries,
SimpleFileEntry.from(compactTableFiles));
// assume this compact commit follows just after the append commit created above
safeLatestSnapshotId += 1;
}
attempts +=
tryCommit(
compactTableFiles,
compactChangelog,
compactDvIndexFiles,
committable.identifier(),
committable.watermark(),
committable.logOffsets(),
Snapshot.CommitKind.COMPACT,
hasConflictChecked(safeLatestSnapshotId),
null);
generatedSnapshot += 1;
}
} finally {
long commitDuration = (System.nanoTime() - started) / 1_000_000;
if (this.commitMetrics != null) {
reportCommit(
appendTableFiles,
appendChangelog,
compactTableFiles,
compactChangelog,
commitDuration,
generatedSnapshot,
attempts);
}
}
}