in paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java [144:230]
public List<CommitMessage> prepareCommit(boolean waitCompaction, long commitIdentifier)
throws Exception {
long latestCommittedIdentifier;
if (writers.values().stream()
.map(Map::values)
.flatMap(Collection::stream)
.mapToLong(w -> w.lastModifiedCommitIdentifier)
.max()
.orElse(Long.MIN_VALUE)
== Long.MIN_VALUE) {
// Optimization for the first commit.
//
// If this is the first commit, no writer has previous modified commit, so the value of
// `latestCommittedIdentifier` does not matter.
//
// Without this optimization, we may need to scan through all snapshots only to find
// that there is no previous snapshot by this user, which is very inefficient.
latestCommittedIdentifier = Long.MIN_VALUE;
} else {
latestCommittedIdentifier =
snapshotManager
.latestSnapshotOfUser(commitUser)
.map(Snapshot::commitIdentifier)
.orElse(Long.MIN_VALUE);
}
List<CommitMessage> result = new ArrayList<>();
Iterator<Map.Entry<BinaryRow, Map<Integer, WriterContainer<T>>>> partIter =
writers.entrySet().iterator();
while (partIter.hasNext()) {
Map.Entry<BinaryRow, Map<Integer, WriterContainer<T>>> partEntry = partIter.next();
BinaryRow partition = partEntry.getKey();
Iterator<Map.Entry<Integer, WriterContainer<T>>> bucketIter =
partEntry.getValue().entrySet().iterator();
while (bucketIter.hasNext()) {
Map.Entry<Integer, WriterContainer<T>> entry = bucketIter.next();
int bucket = entry.getKey();
WriterContainer<T> writerContainer = entry.getValue();
CommitIncrement increment = writerContainer.writer.prepareCommit(waitCompaction);
List<IndexFileMeta> newIndexFiles = new ArrayList<>();
if (writerContainer.indexMaintainer != null) {
newIndexFiles = writerContainer.indexMaintainer.prepareCommit();
}
CommitMessageImpl committable =
new CommitMessageImpl(
partition,
bucket,
increment.newFilesIncrement(),
increment.compactIncrement(),
new IndexIncrement(newIndexFiles));
result.add(committable);
if (committable.isEmpty()) {
if (writerContainer.lastModifiedCommitIdentifier <= latestCommittedIdentifier) {
// Clear writer if no update, and if its latest modification has committed.
//
// We need a mechanism to clear writers, otherwise there will be more and
// more such as yesterday's partition that no longer needs to be written.
if (LOG.isDebugEnabled()) {
LOG.debug(
"Closing writer for partition {}, bucket {}. "
+ "Writer's last modified identifier is {}, "
+ "while latest committed identifier is {}, "
+ "current commit identifier is {}.",
partition,
bucket,
writerContainer.lastModifiedCommitIdentifier,
latestCommittedIdentifier,
commitIdentifier);
}
writerContainer.writer.close();
bucketIter.remove();
}
} else {
writerContainer.lastModifiedCommitIdentifier = commitIdentifier;
}
}
if (partEntry.getValue().isEmpty()) {
partIter.remove();
}
}
return result;
}