in paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java [405:527]
public void overwrite(
Map<String, String> partition,
ManifestCommittable committable,
Map<String, String> properties) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Ready to overwrite partition {}\nManifestCommittable: {}\nProperties: {}",
partition,
committable,
properties);
}
long started = System.nanoTime();
int generatedSnapshot = 0;
int attempts = 0;
List<ManifestEntry> appendTableFiles = new ArrayList<>();
List<ManifestEntry> appendChangelog = new ArrayList<>();
List<ManifestEntry> compactTableFiles = new ArrayList<>();
List<ManifestEntry> compactChangelog = new ArrayList<>();
List<IndexManifestEntry> appendHashIndexFiles = new ArrayList<>();
List<IndexManifestEntry> compactDvIndexFiles = new ArrayList<>();
collectChanges(
committable.fileCommittables(),
appendTableFiles,
appendChangelog,
compactTableFiles,
compactChangelog,
appendHashIndexFiles,
compactDvIndexFiles);
if (!appendChangelog.isEmpty() || !compactChangelog.isEmpty()) {
StringBuilder warnMessage =
new StringBuilder(
"Overwrite mode currently does not commit any changelog.\n"
+ "Please make sure that the partition you're overwriting "
+ "is not being consumed by a streaming reader.\n"
+ "Ignored changelog files are:\n");
for (ManifestEntry entry : appendChangelog) {
warnMessage.append(" * ").append(entry.toString()).append("\n");
}
for (ManifestEntry entry : compactChangelog) {
warnMessage.append(" * ").append(entry.toString()).append("\n");
}
LOG.warn(warnMessage.toString());
}
try {
boolean skipOverwrite = false;
// partition filter is built from static or dynamic partition according to properties
PartitionPredicate partitionFilter = null;
if (dynamicPartitionOverwrite) {
if (appendTableFiles.isEmpty()) {
// in dynamic mode, if there is no changes to commit, no data will be deleted
skipOverwrite = true;
} else {
Set<BinaryRow> partitions =
appendTableFiles.stream()
.map(ManifestEntry::partition)
.collect(Collectors.toSet());
partitionFilter = PartitionPredicate.fromMultiple(partitionType, partitions);
}
} else {
// partition may be partial partition fields, so here must to use predicate way.
Predicate partitionPredicate =
createPartitionPredicate(partition, partitionType, partitionDefaultName);
partitionFilter =
PartitionPredicate.fromPredicate(partitionType, partitionPredicate);
// sanity check, all changes must be done within the given partition
if (partitionFilter != null) {
for (ManifestEntry entry : appendTableFiles) {
if (!partitionFilter.test(entry.partition())) {
throw new IllegalArgumentException(
"Trying to overwrite partition "
+ partition
+ ", but the changes in "
+ pathFactory.getPartitionString(entry.partition())
+ " does not belong to this partition");
}
}
}
}
// overwrite new files
if (!skipOverwrite) {
attempts +=
tryOverwrite(
partitionFilter,
appendTableFiles,
appendHashIndexFiles,
committable.identifier(),
committable.watermark(),
committable.logOffsets());
generatedSnapshot += 1;
}
if (!compactTableFiles.isEmpty() || !compactDvIndexFiles.isEmpty()) {
attempts +=
tryCommit(
compactTableFiles,
emptyList(),
compactDvIndexFiles,
committable.identifier(),
committable.watermark(),
committable.logOffsets(),
Snapshot.CommitKind.COMPACT,
mustConflictCheck(),
null);
generatedSnapshot += 1;
}
} finally {
long commitDuration = (System.nanoTime() - started) / 1_000_000;
if (this.commitMetrics != null) {
reportCommit(
appendTableFiles,
emptyList(),
compactTableFiles,
emptyList(),
commitDuration,
generatedSnapshot,
attempts);
}
}
}