in amoro-format-iceberg/src/main/java/org/apache/amoro/op/OverwriteBaseFiles.java [162:308]
protected List<StatisticsFile> apply(Transaction transaction) {
Preconditions.checkState(
this.dynamic != null,
"updateOptimizedSequence() or updateOptimizedSequenceDynamically() must be invoked");
applyDeleteExpression();
StructLikeMap<Long> sequenceForChangedPartitions = null;
if (this.dynamic) {
sequenceForChangedPartitions =
StructLikeMap.create(transaction.table().spec().partitionType());
}
UnkeyedTable baseTable = keyedTable.baseTable();
List<CreateSnapshotEvent> newSnapshots = Lists.newArrayList();
// step1: overwrite data files
if (!this.addFiles.isEmpty() || !this.deleteFiles.isEmpty()) {
OverwriteFiles overwriteFiles = transaction.newOverwrite();
overwriteFiles.set(MixedTableUtil.BLOB_TYPE_OPTIMIZED_SEQUENCE_EXIST, "true");
overwriteFiles.set(MixedTableUtil.BLOB_TYPE_BASE_OPTIMIZED_TIME_EXIST, "true");
if (conflictDetectionFilter != null && baseTable.currentSnapshot() != null) {
overwriteFiles.conflictDetectionFilter(conflictDetectionFilter).validateNoConflictingData();
overwriteFiles.validateFromSnapshot(baseTable.currentSnapshot().snapshotId());
}
if (this.dynamic) {
for (DataFile d : this.addFiles) {
sequenceForChangedPartitions.put(d.partition(), this.optimizedSequence);
}
for (DataFile d : this.deleteFiles) {
sequenceForChangedPartitions.put(d.partition(), this.optimizedSequence);
}
}
this.addFiles.forEach(overwriteFiles::addFile);
this.deleteFiles.forEach(overwriteFiles::deleteFile);
if (optimizedSequence != null && optimizedSequence > 0) {
overwriteFiles.set(PROPERTIES_TRANSACTION_ID, optimizedSequence + "");
}
if (MapUtils.isNotEmpty(properties)) {
properties.forEach(overwriteFiles::set);
}
overwriteFiles.commit();
newSnapshots.add((CreateSnapshotEvent) overwriteFiles.updateEvent());
}
// step2: RowDelta/Rewrite pos-delete files
if (CollectionUtils.isNotEmpty(addDeleteFiles)
|| CollectionUtils.isNotEmpty(deleteDeleteFiles)) {
if (CollectionUtils.isEmpty(deleteDeleteFiles)) {
RowDelta rowDelta = transaction.newRowDelta();
rowDelta.set(MixedTableUtil.BLOB_TYPE_OPTIMIZED_SEQUENCE_EXIST, "true");
rowDelta.set(MixedTableUtil.BLOB_TYPE_BASE_OPTIMIZED_TIME_EXIST, "true");
if (baseTable.currentSnapshot() != null) {
rowDelta.validateFromSnapshot(baseTable.currentSnapshot().snapshotId());
}
if (this.dynamic) {
for (DeleteFile d : this.addDeleteFiles) {
sequenceForChangedPartitions.put(d.partition(), this.optimizedSequence);
}
}
addDeleteFiles.forEach(rowDelta::addDeletes);
if (MapUtils.isNotEmpty(properties)) {
properties.forEach(rowDelta::set);
}
rowDelta.commit();
newSnapshots.add((CreateSnapshotEvent) rowDelta.updateEvent());
} else {
RewriteFiles rewriteFiles = transaction.newRewrite();
rewriteFiles.set(MixedTableUtil.BLOB_TYPE_OPTIMIZED_SEQUENCE_EXIST, "true");
rewriteFiles.set(MixedTableUtil.BLOB_TYPE_BASE_OPTIMIZED_TIME_EXIST, "true");
if (baseTable.currentSnapshot() != null) {
rewriteFiles.validateFromSnapshot(baseTable.currentSnapshot().snapshotId());
}
if (this.dynamic) {
for (DeleteFile d : this.addDeleteFiles) {
sequenceForChangedPartitions.put(d.partition(), this.optimizedSequence);
}
for (DeleteFile d : this.deleteDeleteFiles) {
sequenceForChangedPartitions.put(d.partition(), this.optimizedSequence);
}
}
rewriteFiles.rewriteFiles(
Collections.emptySet(),
new HashSet<>(deleteDeleteFiles),
Collections.emptySet(),
new HashSet<>(addDeleteFiles));
if (MapUtils.isNotEmpty(properties)) {
properties.forEach(rewriteFiles::set);
}
rewriteFiles.commit();
newSnapshots.add((CreateSnapshotEvent) rewriteFiles.updateEvent());
}
}
if (newSnapshots.isEmpty()) {
return Collections.emptyList();
}
// step3: set optimized sequence id, optimized time
long commitTime = System.currentTimeMillis();
PartitionSpec spec = transaction.table().spec();
StructLikeMap<Long> oldOptimizedSequence = MixedTableUtil.readOptimizedSequence(keyedTable);
StructLikeMap<Long> oldOptimizedTime = MixedTableUtil.readBaseOptimizedTime(keyedTable);
StructLikeMap<Long> optimizedSequence = StructLikeMap.create(spec.partitionType());
StructLikeMap<Long> optimizedTime = StructLikeMap.create(spec.partitionType());
if (oldOptimizedSequence != null) {
optimizedSequence.putAll(oldOptimizedSequence);
}
if (oldOptimizedTime != null) {
optimizedTime.putAll(oldOptimizedTime);
}
StructLikeMap<Long> toChangePartitionSequence;
if (this.dynamic) {
toChangePartitionSequence = sequenceForChangedPartitions;
} else {
toChangePartitionSequence = this.partitionOptimizedSequence;
}
toChangePartitionSequence.forEach(
(partition, sequence) -> {
optimizedSequence.put(partition, sequence);
optimizedTime.put(partition, commitTime);
});
StatisticsFile statisticsFile = null;
List<StatisticsFile> result = Lists.newArrayList();
for (CreateSnapshotEvent newSnapshot : newSnapshots) {
if (statisticsFile != null) {
result.add(StatisticsFileUtil.copyToSnapshot(statisticsFile, newSnapshot.snapshotId()));
} else {
Table table = transaction.table();
StatisticsFileUtil.PartitionDataSerializer<Long> dataSerializer =
StatisticsFileUtil.createPartitionDataSerializer(table.spec(), Long.class);
statisticsFile =
StatisticsFileUtil.writerBuilder(table)
.withSnapshotId(newSnapshot.snapshotId())
.build()
.add(MixedTableUtil.BLOB_TYPE_OPTIMIZED_SEQUENCE, optimizedSequence, dataSerializer)
.add(MixedTableUtil.BLOB_TYPE_BASE_OPTIMIZED_TIME, optimizedTime, dataSerializer)
.complete();
result.add(statisticsFile);
}
}
return result;
}