in paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java [571:772]
public boolean tryCommitOnce(
List<ManifestEntry> tableFiles,
List<ManifestEntry> changelogFiles,
List<IndexManifestEntry> indexFiles,
long identifier,
@Nullable Long watermark,
Map<Integer, Long> logOffsets,
Snapshot.CommitKind commitKind,
Snapshot latestSnapshot,
Long safeLatestSnapshotId) {
long newSnapshotId =
latestSnapshot == null ? Snapshot.FIRST_SNAPSHOT_ID : latestSnapshot.id() + 1;
Path newSnapshotPath = snapshotManager.snapshotPath(newSnapshotId);
if (LOG.isDebugEnabled()) {
LOG.debug("Ready to commit table files to snapshot #" + newSnapshotId);
for (ManifestEntry entry : tableFiles) {
LOG.debug(" * " + entry.toString());
}
LOG.debug("Ready to commit changelog to snapshot #" + newSnapshotId);
for (ManifestEntry entry : changelogFiles) {
LOG.debug(" * " + entry.toString());
}
}
if (latestSnapshot != null && !Objects.equals(latestSnapshot.id(), safeLatestSnapshotId)) {
// latestSnapshotId is different from the snapshot id we've checked for conflicts,
// so we have to check again
noConflictsOrFail(latestSnapshot.commitUser(), latestSnapshot, tableFiles);
}
Snapshot newSnapshot;
String previousChangesListName = null;
String newChangesListName = null;
String changelogListName = null;
String newIndexManifest = null;
List<ManifestFileMeta> oldMetas = new ArrayList<>();
List<ManifestFileMeta> newMetas = new ArrayList<>();
List<ManifestFileMeta> changelogMetas = new ArrayList<>();
try {
long previousTotalRecordCount = 0L;
Long currentWatermark = watermark;
String previousIndexManifest = null;
if (latestSnapshot != null) {
previousTotalRecordCount = latestSnapshot.totalRecordCount(scan);
List<ManifestFileMeta> previousManifests =
latestSnapshot.dataManifests(manifestList);
// read all previous manifest files
oldMetas.addAll(previousManifests);
// read the last snapshot to complete the bucket's offsets when logOffsets does not
// contain all buckets
latestSnapshot.logOffsets().forEach(logOffsets::putIfAbsent);
Long latestWatermark = latestSnapshot.watermark();
if (latestWatermark != null) {
currentWatermark =
currentWatermark == null
? latestWatermark
: Math.max(currentWatermark, latestWatermark);
}
previousIndexManifest = latestSnapshot.indexManifest();
}
// merge manifest files with changes
newMetas.addAll(
ManifestFileMeta.merge(
oldMetas,
manifestFile,
manifestTargetSize.getBytes(),
manifestMergeMinCount,
manifestFullCompactionSize.getBytes(),
partitionType));
previousChangesListName = manifestList.write(newMetas);
// write new changes into manifest files
long deltaRecordCount = Snapshot.recordCount(tableFiles);
List<ManifestFileMeta> newChangesManifests = manifestFile.write(tableFiles);
newMetas.addAll(newChangesManifests);
newChangesListName = manifestList.write(newChangesManifests);
// write changelog into manifest files
if (!changelogFiles.isEmpty()) {
changelogMetas.addAll(manifestFile.write(changelogFiles));
changelogListName = manifestList.write(changelogMetas);
}
// write new index manifest
String indexManifest = indexManifestFile.merge(previousIndexManifest, indexFiles);
if (!Objects.equals(indexManifest, previousIndexManifest)) {
newIndexManifest = indexManifest;
}
// prepare snapshot file
newSnapshot =
new Snapshot(
newSnapshotId,
schemaManager.latest().get().id(),
previousChangesListName,
newChangesListName,
changelogListName,
indexManifest,
commitUser,
identifier,
commitKind,
System.currentTimeMillis(),
logOffsets,
previousTotalRecordCount + deltaRecordCount,
deltaRecordCount,
Snapshot.recordCount(changelogFiles),
currentWatermark);
} catch (Throwable e) {
// fails when preparing for commit, we should clean up
cleanUpTmpManifests(
previousChangesListName,
newChangesListName,
changelogListName,
newIndexManifest,
oldMetas,
newMetas,
changelogMetas);
throw new RuntimeException(
String.format(
"Exception occurs when preparing snapshot #%d (path %s) by user %s "
+ "with hash %s and kind %s. Clean up.",
newSnapshotId,
newSnapshotPath.toString(),
commitUser,
identifier,
commitKind.name()),
e);
}
boolean success;
try {
Callable<Boolean> callable =
() -> {
boolean committed =
fileIO.writeFileUtf8(newSnapshotPath, newSnapshot.toJson());
if (committed) {
snapshotManager.commitLatestHint(newSnapshotId);
}
return committed;
};
if (lock != null) {
success =
lock.runWithLock(
() ->
// fs.rename may not returns false if target file
// already exists, or even not atomic
// as we're relying on external locking, we can first
// check if file exist then rename to work around this
// case
!fileIO.exists(newSnapshotPath) && callable.call());
} else {
success = callable.call();
}
} catch (Throwable e) {
// exception when performing the atomic rename,
// we cannot clean up because we can't determine the success
throw new RuntimeException(
String.format(
"Exception occurs when committing snapshot #%d (path %s) by user %s "
+ "with identifier %s and kind %s. "
+ "Cannot clean up because we can't determine the success.",
newSnapshotId,
newSnapshotPath,
commitUser,
identifier,
commitKind.name()),
e);
}
if (success) {
if (LOG.isDebugEnabled()) {
LOG.debug(
String.format(
"Successfully commit snapshot #%d (path %s) by user %s "
+ "with identifier %s and kind %s.",
newSnapshotId,
newSnapshotPath,
commitUser,
identifier,
commitKind.name()));
}
return true;
}
// atomic rename fails, clean up and try again
LOG.warn(
String.format(
"Atomic commit failed for snapshot #%d (path %s) by user %s "
+ "with identifier %s and kind %s. "
+ "Clean up and try again.",
newSnapshotId, newSnapshotPath, commitUser, identifier, commitKind.name()));
cleanUpTmpManifests(
previousChangesListName,
newChangesListName,
changelogListName,
newIndexManifest,
oldMetas,
newMetas,
changelogMetas);
return false;
}