CommitResult tryCommitOnce()

in paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java [822:1044]


    CommitResult tryCommitOnce(
            @Nullable RetryResult retryResult,
            List<ManifestEntry> deltaFiles,
            List<ManifestEntry> changelogFiles,
            List<IndexManifestEntry> indexFiles,
            long identifier,
            @Nullable Long watermark,
            Map<Integer, Long> logOffsets,
            Snapshot.CommitKind commitKind,
            @Nullable Snapshot latestSnapshot,
            ConflictCheck conflictCheck,
            @Nullable String newStatsFileName) {
        long startMillis = System.currentTimeMillis();
        long newSnapshotId =
                latestSnapshot == null ? Snapshot.FIRST_SNAPSHOT_ID : latestSnapshot.id() + 1;

        if (LOG.isDebugEnabled()) {
            LOG.debug("Ready to commit table files to snapshot {}", newSnapshotId);
            for (ManifestEntry entry : deltaFiles) {
                LOG.debug("  * {}", entry);
            }
            LOG.debug("Ready to commit changelog to snapshot {}", newSnapshotId);
            for (ManifestEntry entry : changelogFiles) {
                LOG.debug("  * {}", entry);
            }
        }

        List<SimpleFileEntry> baseDataFiles = new ArrayList<>();
        if (latestSnapshot != null && conflictCheck.shouldCheck(latestSnapshot.id())) {
            // latestSnapshotId is different from the snapshot id we've checked for conflicts,
            // so we have to check again
            try {
                List<BinaryRow> changedPartitions =
                        deltaFiles.stream()
                                .map(ManifestEntry::partition)
                                .distinct()
                                .collect(Collectors.toList());
                if (retryResult != null && retryResult.latestSnapshot != null) {
                    baseDataFiles = new ArrayList<>(retryResult.baseDataFiles);
                    List<SimpleFileEntry> incremental =
                            readIncrementalChanges(
                                    retryResult.latestSnapshot, latestSnapshot, changedPartitions);
                    if (!incremental.isEmpty()) {
                        baseDataFiles.addAll(incremental);
                        baseDataFiles = new ArrayList<>(FileEntry.mergeEntries(baseDataFiles));
                    }
                } else {
                    baseDataFiles =
                            readAllEntriesFromChangedPartitions(latestSnapshot, changedPartitions);
                }
                noConflictsOrFail(
                        latestSnapshot.commitUser(),
                        baseDataFiles,
                        SimpleFileEntry.from(deltaFiles));
            } catch (Exception e) {
                if (retryResult != null) {
                    retryResult.cleanAll();
                }
                throw e;
            }
        }

        Snapshot newSnapshot;
        Pair<String, Long> baseManifestList = null;
        Pair<String, Long> deltaManifestList = null;
        List<PartitionEntry> deltaStatistics;
        Pair<String, Long> changelogManifestList = null;
        String oldIndexManifest = null;
        String indexManifest = null;
        List<ManifestFileMeta> mergeBeforeManifests = new ArrayList<>();
        List<ManifestFileMeta> mergeAfterManifests = new ArrayList<>();
        try {
            long previousTotalRecordCount = 0L;
            Long currentWatermark = watermark;
            if (latestSnapshot != null) {
                previousTotalRecordCount = scan.totalRecordCount(latestSnapshot);
                // read all previous manifest files
                mergeBeforeManifests = manifestList.readDataManifests(latestSnapshot);
                // read the last snapshot to complete the bucket's offsets when logOffsets does not
                // contain all buckets
                Map<Integer, Long> latestLogOffsets = latestSnapshot.logOffsets();
                if (latestLogOffsets != null) {
                    latestLogOffsets.forEach(logOffsets::putIfAbsent);
                }
                Long latestWatermark = latestSnapshot.watermark();
                if (latestWatermark != null) {
                    currentWatermark =
                            currentWatermark == null
                                    ? latestWatermark
                                    : Math.max(currentWatermark, latestWatermark);
                }
                oldIndexManifest = latestSnapshot.indexManifest();
            }

            // try to merge old manifest files to create base manifest list
            mergeAfterManifests =
                    ManifestFileMerger.merge(
                            mergeBeforeManifests,
                            manifestFile,
                            manifestTargetSize.getBytes(),
                            manifestMergeMinCount,
                            manifestFullCompactionSize.getBytes(),
                            partitionType,
                            manifestReadParallelism);
            baseManifestList = manifestList.write(mergeAfterManifests);

            // the added records subtract the deleted records from
            long deltaRecordCount = recordCountAdd(deltaFiles) - recordCountDelete(deltaFiles);
            long totalRecordCount = previousTotalRecordCount + deltaRecordCount;

            boolean rewriteIndexManifest = true;
            if (retryResult != null) {
                deltaStatistics = retryResult.deltaStatistics;
                deltaManifestList = retryResult.deltaManifestList;
                changelogManifestList = retryResult.changelogManifestList;
                if (Objects.equals(oldIndexManifest, retryResult.oldIndexManifest)) {
                    rewriteIndexManifest = false;
                    indexManifest = retryResult.newIndexManifest;
                    LOG.info("Reusing index manifest {} for retry.", indexManifest);
                } else {
                    cleanIndexManifest(retryResult.oldIndexManifest, retryResult.newIndexManifest);
                }
            } else {
                // write new delta files into manifest files
                deltaStatistics = new ArrayList<>(PartitionEntry.merge(deltaFiles));
                deltaManifestList = manifestList.write(manifestFile.write(deltaFiles));

                // write changelog into manifest files
                if (!changelogFiles.isEmpty()) {
                    changelogManifestList = manifestList.write(manifestFile.write(changelogFiles));
                }
            }

            if (rewriteIndexManifest) {
                indexManifest =
                        indexManifestFile.writeIndexFiles(oldIndexManifest, indexFiles, bucketMode);
            }

            long latestSchemaId = schemaManager.latest().get().id();

            // write new stats or inherit from the previous snapshot
            String statsFileName = null;
            if (newStatsFileName != null) {
                statsFileName = newStatsFileName;
            } else if (latestSnapshot != null) {
                Optional<Statistics> previousStatistic = statsFileHandler.readStats(latestSnapshot);
                if (previousStatistic.isPresent()) {
                    if (previousStatistic.get().schemaId() != latestSchemaId) {
                        LOG.warn("Schema changed, stats will not be inherited");
                    } else {
                        statsFileName = latestSnapshot.statistics();
                    }
                }
            }

            // prepare snapshot file
            newSnapshot =
                    new Snapshot(
                            newSnapshotId,
                            latestSchemaId,
                            baseManifestList.getLeft(),
                            baseManifestList.getRight(),
                            deltaManifestList.getKey(),
                            deltaManifestList.getRight(),
                            changelogManifestList == null ? null : changelogManifestList.getKey(),
                            changelogManifestList == null ? null : changelogManifestList.getRight(),
                            indexManifest,
                            commitUser,
                            identifier,
                            commitKind,
                            System.currentTimeMillis(),
                            logOffsets,
                            totalRecordCount,
                            deltaRecordCount,
                            recordCount(changelogFiles),
                            currentWatermark,
                            statsFileName);
        } catch (Throwable e) {
            // fails when preparing for commit, we should clean up
            if (retryResult != null) {
                retryResult.cleanAll();
            }
            cleanUpReuseTmpManifests(
                    deltaManifestList, changelogManifestList, oldIndexManifest, indexManifest);
            cleanUpNoReuseTmpManifests(baseManifestList, mergeBeforeManifests, mergeAfterManifests);
            throw new RuntimeException(
                    String.format(
                            "Exception occurs when preparing snapshot #%d by user %s "
                                    + "with hash %s and kind %s. Clean up.",
                            newSnapshotId, commitUser, identifier, commitKind.name()),
                    e);
        }

        if (commitSnapshotImpl(newSnapshot, deltaStatistics)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug(
                        String.format(
                                "Successfully commit snapshot #%d by user %s "
                                        + "with identifier %s and kind %s.",
                                newSnapshotId, commitUser, identifier, commitKind.name()));
            }
            commitCallbacks.forEach(callback -> callback.call(deltaFiles, newSnapshot));
            return new SuccessResult();
        }

        // atomic rename fails, clean up and try again
        long commitTime = (System.currentTimeMillis() - startMillis) / 1000;
        LOG.warn(
                String.format(
                        "Atomic commit failed for snapshot #%d by user %s "
                                + "with identifier %s and kind %s after %s seconds. "
                                + "Clean up and try again.",
                        newSnapshotId, commitUser, identifier, commitKind.name(), commitTime));
        cleanUpNoReuseTmpManifests(baseManifestList, mergeBeforeManifests, mergeAfterManifests);
        return new RetryResult(
                deltaStatistics,
                deltaManifestList,
                changelogManifestList,
                oldIndexManifest,
                indexManifest,
                latestSnapshot,
                baseDataFiles);
    }