in paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java [271:372]
public void overwrite(
Map<String, String> partition,
ManifestCommittable committable,
Map<String, String> properties) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Ready to overwrite partition {}\nManifestCommittable: {}\nProperties: {}",
partition,
committable,
properties);
}
List<ManifestEntry> appendTableFiles = new ArrayList<>();
List<ManifestEntry> appendChangelog = new ArrayList<>();
List<ManifestEntry> compactTableFiles = new ArrayList<>();
List<ManifestEntry> compactChangelog = new ArrayList<>();
List<IndexManifestEntry> appendIndexFiles = new ArrayList<>();
collectChanges(
committable.fileCommittables(),
appendTableFiles,
appendChangelog,
compactTableFiles,
compactChangelog,
appendIndexFiles);
if (!appendChangelog.isEmpty() || !compactChangelog.isEmpty()) {
StringBuilder warnMessage =
new StringBuilder(
"Overwrite mode currently does not commit any changelog.\n"
+ "Please make sure that the partition you're overwriting "
+ "is not being consumed by a streaming reader.\n"
+ "Ignored changelog files are:\n");
for (ManifestEntry entry : appendChangelog) {
warnMessage.append(" * ").append(entry.toString()).append("\n");
}
for (ManifestEntry entry : compactChangelog) {
warnMessage.append(" * ").append(entry.toString()).append("\n");
}
LOG.warn(warnMessage.toString());
}
boolean skipOverwrite = false;
// partition filter is built from static or dynamic partition according to properties
Predicate partitionFilter = null;
if (dynamicPartitionOverwrite) {
if (appendTableFiles.isEmpty()) {
// in dynamic mode, if there is no changes to commit, no data will be deleted
skipOverwrite = true;
} else {
partitionFilter =
appendTableFiles.stream()
.map(ManifestEntry::partition)
.distinct()
// partition filter is built from new data's partitions
.map(p -> PredicateBuilder.equalPartition(p, partitionType))
.reduce(PredicateBuilder::or)
.orElseThrow(
() ->
new RuntimeException(
"Failed to get dynamic partition filter. This is unexpected."));
}
} else {
partitionFilter = PredicateBuilder.partition(partition, partitionType);
// sanity check, all changes must be done within the given partition
if (partitionFilter != null) {
for (ManifestEntry entry : appendTableFiles) {
if (!partitionFilter.test(
partitionObjectConverter.convert(entry.partition()))) {
throw new IllegalArgumentException(
"Trying to overwrite partition "
+ partition
+ ", but the changes in "
+ pathFactory.getPartitionString(entry.partition())
+ " does not belong to this partition");
}
}
}
}
// overwrite new files
if (!skipOverwrite) {
tryOverwrite(
partitionFilter,
appendTableFiles,
appendIndexFiles,
committable.identifier(),
committable.watermark(),
committable.logOffsets());
}
if (!compactTableFiles.isEmpty()) {
tryCommit(
compactTableFiles,
Collections.emptyList(),
Collections.emptyList(),
committable.identifier(),
committable.watermark(),
committable.logOffsets(),
Snapshot.CommitKind.COMPACT,
null);
}
}