in paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java [822:1044]
CommitResult tryCommitOnce(
@Nullable RetryResult retryResult,
List<ManifestEntry> deltaFiles,
List<ManifestEntry> changelogFiles,
List<IndexManifestEntry> indexFiles,
long identifier,
@Nullable Long watermark,
Map<Integer, Long> logOffsets,
Snapshot.CommitKind commitKind,
@Nullable Snapshot latestSnapshot,
ConflictCheck conflictCheck,
@Nullable String newStatsFileName) {
long startMillis = System.currentTimeMillis();
long newSnapshotId =
latestSnapshot == null ? Snapshot.FIRST_SNAPSHOT_ID : latestSnapshot.id() + 1;
if (LOG.isDebugEnabled()) {
LOG.debug("Ready to commit table files to snapshot {}", newSnapshotId);
for (ManifestEntry entry : deltaFiles) {
LOG.debug(" * {}", entry);
}
LOG.debug("Ready to commit changelog to snapshot {}", newSnapshotId);
for (ManifestEntry entry : changelogFiles) {
LOG.debug(" * {}", entry);
}
}
List<SimpleFileEntry> baseDataFiles = new ArrayList<>();
if (latestSnapshot != null && conflictCheck.shouldCheck(latestSnapshot.id())) {
// latestSnapshotId is different from the snapshot id we've checked for conflicts,
// so we have to check again
try {
List<BinaryRow> changedPartitions =
deltaFiles.stream()
.map(ManifestEntry::partition)
.distinct()
.collect(Collectors.toList());
if (retryResult != null && retryResult.latestSnapshot != null) {
baseDataFiles = new ArrayList<>(retryResult.baseDataFiles);
List<SimpleFileEntry> incremental =
readIncrementalChanges(
retryResult.latestSnapshot, latestSnapshot, changedPartitions);
if (!incremental.isEmpty()) {
baseDataFiles.addAll(incremental);
baseDataFiles = new ArrayList<>(FileEntry.mergeEntries(baseDataFiles));
}
} else {
baseDataFiles =
readAllEntriesFromChangedPartitions(latestSnapshot, changedPartitions);
}
noConflictsOrFail(
latestSnapshot.commitUser(),
baseDataFiles,
SimpleFileEntry.from(deltaFiles));
} catch (Exception e) {
if (retryResult != null) {
retryResult.cleanAll();
}
throw e;
}
}
Snapshot newSnapshot;
Pair<String, Long> baseManifestList = null;
Pair<String, Long> deltaManifestList = null;
List<PartitionEntry> deltaStatistics;
Pair<String, Long> changelogManifestList = null;
String oldIndexManifest = null;
String indexManifest = null;
List<ManifestFileMeta> mergeBeforeManifests = new ArrayList<>();
List<ManifestFileMeta> mergeAfterManifests = new ArrayList<>();
try {
long previousTotalRecordCount = 0L;
Long currentWatermark = watermark;
if (latestSnapshot != null) {
previousTotalRecordCount = scan.totalRecordCount(latestSnapshot);
// read all previous manifest files
mergeBeforeManifests = manifestList.readDataManifests(latestSnapshot);
// read the last snapshot to complete the bucket's offsets when logOffsets does not
// contain all buckets
Map<Integer, Long> latestLogOffsets = latestSnapshot.logOffsets();
if (latestLogOffsets != null) {
latestLogOffsets.forEach(logOffsets::putIfAbsent);
}
Long latestWatermark = latestSnapshot.watermark();
if (latestWatermark != null) {
currentWatermark =
currentWatermark == null
? latestWatermark
: Math.max(currentWatermark, latestWatermark);
}
oldIndexManifest = latestSnapshot.indexManifest();
}
// try to merge old manifest files to create base manifest list
mergeAfterManifests =
ManifestFileMerger.merge(
mergeBeforeManifests,
manifestFile,
manifestTargetSize.getBytes(),
manifestMergeMinCount,
manifestFullCompactionSize.getBytes(),
partitionType,
manifestReadParallelism);
baseManifestList = manifestList.write(mergeAfterManifests);
// the added records subtract the deleted records from
long deltaRecordCount = recordCountAdd(deltaFiles) - recordCountDelete(deltaFiles);
long totalRecordCount = previousTotalRecordCount + deltaRecordCount;
boolean rewriteIndexManifest = true;
if (retryResult != null) {
deltaStatistics = retryResult.deltaStatistics;
deltaManifestList = retryResult.deltaManifestList;
changelogManifestList = retryResult.changelogManifestList;
if (Objects.equals(oldIndexManifest, retryResult.oldIndexManifest)) {
rewriteIndexManifest = false;
indexManifest = retryResult.newIndexManifest;
LOG.info("Reusing index manifest {} for retry.", indexManifest);
} else {
cleanIndexManifest(retryResult.oldIndexManifest, retryResult.newIndexManifest);
}
} else {
// write new delta files into manifest files
deltaStatistics = new ArrayList<>(PartitionEntry.merge(deltaFiles));
deltaManifestList = manifestList.write(manifestFile.write(deltaFiles));
// write changelog into manifest files
if (!changelogFiles.isEmpty()) {
changelogManifestList = manifestList.write(manifestFile.write(changelogFiles));
}
}
if (rewriteIndexManifest) {
indexManifest =
indexManifestFile.writeIndexFiles(oldIndexManifest, indexFiles, bucketMode);
}
long latestSchemaId = schemaManager.latest().get().id();
// write new stats or inherit from the previous snapshot
String statsFileName = null;
if (newStatsFileName != null) {
statsFileName = newStatsFileName;
} else if (latestSnapshot != null) {
Optional<Statistics> previousStatistic = statsFileHandler.readStats(latestSnapshot);
if (previousStatistic.isPresent()) {
if (previousStatistic.get().schemaId() != latestSchemaId) {
LOG.warn("Schema changed, stats will not be inherited");
} else {
statsFileName = latestSnapshot.statistics();
}
}
}
// prepare snapshot file
newSnapshot =
new Snapshot(
newSnapshotId,
latestSchemaId,
baseManifestList.getLeft(),
baseManifestList.getRight(),
deltaManifestList.getKey(),
deltaManifestList.getRight(),
changelogManifestList == null ? null : changelogManifestList.getKey(),
changelogManifestList == null ? null : changelogManifestList.getRight(),
indexManifest,
commitUser,
identifier,
commitKind,
System.currentTimeMillis(),
logOffsets,
totalRecordCount,
deltaRecordCount,
recordCount(changelogFiles),
currentWatermark,
statsFileName);
} catch (Throwable e) {
// fails when preparing for commit, we should clean up
if (retryResult != null) {
retryResult.cleanAll();
}
cleanUpReuseTmpManifests(
deltaManifestList, changelogManifestList, oldIndexManifest, indexManifest);
cleanUpNoReuseTmpManifests(baseManifestList, mergeBeforeManifests, mergeAfterManifests);
throw new RuntimeException(
String.format(
"Exception occurs when preparing snapshot #%d by user %s "
+ "with hash %s and kind %s. Clean up.",
newSnapshotId, commitUser, identifier, commitKind.name()),
e);
}
if (commitSnapshotImpl(newSnapshot, deltaStatistics)) {
if (LOG.isDebugEnabled()) {
LOG.debug(
String.format(
"Successfully commit snapshot #%d by user %s "
+ "with identifier %s and kind %s.",
newSnapshotId, commitUser, identifier, commitKind.name()));
}
commitCallbacks.forEach(callback -> callback.call(deltaFiles, newSnapshot));
return new SuccessResult();
}
// atomic rename fails, clean up and try again
long commitTime = (System.currentTimeMillis() - startMillis) / 1000;
LOG.warn(
String.format(
"Atomic commit failed for snapshot #%d by user %s "
+ "with identifier %s and kind %s after %s seconds. "
+ "Clean up and try again.",
newSnapshotId, commitUser, identifier, commitKind.name(), commitTime));
cleanUpNoReuseTmpManifests(baseManifestList, mergeBeforeManifests, mergeAfterManifests);
return new RetryResult(
deltaStatistics,
deltaManifestList,
changelogManifestList,
oldIndexManifest,
indexManifest,
latestSnapshot,
baseDataFiles);
}