public void overwrite()

in paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java [271:372]


    public void overwrite(
            Map<String, String> partition,
            ManifestCommittable committable,
            Map<String, String> properties) {
        if (LOG.isDebugEnabled()) {
            LOG.debug(
                    "Ready to overwrite partition {}\nManifestCommittable: {}\nProperties: {}",
                    partition,
                    committable,
                    properties);
        }

        List<ManifestEntry> appendTableFiles = new ArrayList<>();
        List<ManifestEntry> appendChangelog = new ArrayList<>();
        List<ManifestEntry> compactTableFiles = new ArrayList<>();
        List<ManifestEntry> compactChangelog = new ArrayList<>();
        List<IndexManifestEntry> appendIndexFiles = new ArrayList<>();
        collectChanges(
                committable.fileCommittables(),
                appendTableFiles,
                appendChangelog,
                compactTableFiles,
                compactChangelog,
                appendIndexFiles);

        if (!appendChangelog.isEmpty() || !compactChangelog.isEmpty()) {
            StringBuilder warnMessage =
                    new StringBuilder(
                            "Overwrite mode currently does not commit any changelog.\n"
                                    + "Please make sure that the partition you're overwriting "
                                    + "is not being consumed by a streaming reader.\n"
                                    + "Ignored changelog files are:\n");
            for (ManifestEntry entry : appendChangelog) {
                warnMessage.append("  * ").append(entry.toString()).append("\n");
            }
            for (ManifestEntry entry : compactChangelog) {
                warnMessage.append("  * ").append(entry.toString()).append("\n");
            }
            LOG.warn(warnMessage.toString());
        }

        boolean skipOverwrite = false;
        // partition filter is built from static or dynamic partition according to properties
        Predicate partitionFilter = null;
        if (dynamicPartitionOverwrite) {
            if (appendTableFiles.isEmpty()) {
                // in dynamic mode, if there is no changes to commit, no data will be deleted
                skipOverwrite = true;
            } else {
                partitionFilter =
                        appendTableFiles.stream()
                                .map(ManifestEntry::partition)
                                .distinct()
                                // partition filter is built from new data's partitions
                                .map(p -> PredicateBuilder.equalPartition(p, partitionType))
                                .reduce(PredicateBuilder::or)
                                .orElseThrow(
                                        () ->
                                                new RuntimeException(
                                                        "Failed to get dynamic partition filter. This is unexpected."));
            }
        } else {
            partitionFilter = PredicateBuilder.partition(partition, partitionType);
            // sanity check, all changes must be done within the given partition
            if (partitionFilter != null) {
                for (ManifestEntry entry : appendTableFiles) {
                    if (!partitionFilter.test(
                            partitionObjectConverter.convert(entry.partition()))) {
                        throw new IllegalArgumentException(
                                "Trying to overwrite partition "
                                        + partition
                                        + ", but the changes in "
                                        + pathFactory.getPartitionString(entry.partition())
                                        + " does not belong to this partition");
                    }
                }
            }
        }

        // overwrite new files
        if (!skipOverwrite) {
            tryOverwrite(
                    partitionFilter,
                    appendTableFiles,
                    appendIndexFiles,
                    committable.identifier(),
                    committable.watermark(),
                    committable.logOffsets());
        }

        if (!compactTableFiles.isEmpty()) {
            tryCommit(
                    compactTableFiles,
                    Collections.emptyList(),
                    Collections.emptyList(),
                    committable.identifier(),
                    committable.watermark(),
                    committable.logOffsets(),
                    Snapshot.CommitKind.COMPACT,
                    null);
        }
    }