in amoro-format-mixed/amoro-mixed-trino/src/main/java/org/apache/amoro/trino/unkeyed/IcebergMetadata.java [800:962]
private void finishWrite(
ConnectorSession session,
IcebergTableHandle table,
Collection<Slice> fragments,
boolean runUpdateValidations) {
Table icebergTable = transaction.table();
List<CommitTaskData> commitTasks =
fragments.stream()
.map(slice -> commitTaskCodec.fromJson(slice.getBytes()))
.collect(toImmutableList());
if (commitTasks.isEmpty()) {
// Avoid recording "empty" write operation
transaction = null;
return;
}
Schema schema = SchemaParser.fromJson(table.getTableSchemaJson());
Map<String, List<CommitTaskData>> deletesByFilePath =
commitTasks.stream()
.filter(task -> task.getContent() == POSITION_DELETES)
.collect(groupingBy(task -> task.getReferencedDataFile().orElseThrow()));
Map<String, List<CommitTaskData>> fullyDeletedFiles =
deletesByFilePath.entrySet().stream()
.filter(entry -> fileIsFullyDeleted(entry.getValue()))
.collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue));
if (!deletesByFilePath.keySet().equals(fullyDeletedFiles.keySet())
|| commitTasks.stream().anyMatch(task -> task.getContent() == FileContent.DATA)) {
RowDelta rowDelta = transaction.newRowDelta();
table
.getSnapshotId()
.map(icebergTable::snapshot)
.ifPresent(s -> rowDelta.validateFromSnapshot(s.snapshotId()));
TupleDomain<IcebergColumnHandle> dataColumnPredicate =
table
.getEnforcedPredicate()
.filter((column, domain) -> !isMetadataColumnId(column.getId()));
if (!dataColumnPredicate.isAll()) {
rowDelta.conflictDetectionFilter(toIcebergExpression(dataColumnPredicate));
}
IsolationLevel isolationLevel =
IsolationLevel.fromName(
icebergTable
.properties()
.getOrDefault(DELETE_ISOLATION_LEVEL, DELETE_ISOLATION_LEVEL_DEFAULT));
if (isolationLevel == IsolationLevel.SERIALIZABLE) {
rowDelta.validateNoConflictingDataFiles();
}
if (runUpdateValidations) {
// Ensure a row that is updated by this commit was not deleted by a separate commit
rowDelta.validateDeletedFiles();
rowDelta.validateNoConflictingDeleteFiles();
}
ImmutableSet.Builder<String> writtenFiles = ImmutableSet.builder();
ImmutableSet.Builder<String> referencedDataFiles = ImmutableSet.builder();
for (CommitTaskData task : commitTasks) {
PartitionSpec partitionSpec =
PartitionSpecParser.fromJson(schema, task.getPartitionSpecJson());
Type[] partitionColumnTypes =
partitionSpec.fields().stream()
.map(field -> field.transform().getResultType(schema.findType(field.sourceId())))
.toArray(Type[]::new);
switch (task.getContent()) {
case POSITION_DELETES:
if (fullyDeletedFiles.containsKey(task.getReferencedDataFile().orElseThrow())) {
continue;
}
FileMetadata.Builder deleteBuilder =
FileMetadata.deleteFileBuilder(partitionSpec)
.withPath(task.getPath())
.withFormat(task.getFileFormat().toIceberg())
.ofPositionDeletes()
.withFileSizeInBytes(task.getFileSizeInBytes())
.withMetrics(task.getMetrics().metrics());
if (!partitionSpec.fields().isEmpty()) {
String partitionDataJson =
task.getPartitionDataJson()
.orElseThrow(
() -> new VerifyException("No partition data for partitioned table"));
deleteBuilder.withPartition(
PartitionData.fromJson(partitionDataJson, partitionColumnTypes));
}
rowDelta.addDeletes(deleteBuilder.build());
writtenFiles.add(task.getPath());
task.getReferencedDataFile().ifPresent(referencedDataFiles::add);
break;
case DATA:
DataFiles.Builder builder =
DataFiles.builder(partitionSpec)
.withPath(task.getPath())
.withFormat(task.getFileFormat().toIceberg())
.withFileSizeInBytes(task.getFileSizeInBytes())
.withMetrics(task.getMetrics().metrics());
if (!icebergTable.spec().fields().isEmpty()) {
String partitionDataJson =
task.getPartitionDataJson()
.orElseThrow(
() -> new VerifyException("No partition data for partitioned table"));
builder.withPartition(
PartitionData.fromJson(partitionDataJson, partitionColumnTypes));
}
rowDelta.addRows(builder.build());
writtenFiles.add(task.getPath());
break;
default:
throw new UnsupportedOperationException(
"Unsupported task content: " + task.getContent());
}
}
// try to leave as little garbage as possible behind
if (table.getRetryMode() != NO_RETRIES) {
cleanExtraOutputFiles(session, writtenFiles.build());
}
rowDelta.validateDataFilesExist(referencedDataFiles.build());
try {
commit(rowDelta, session);
} catch (ValidationException e) {
throw new TrinoException(
ICEBERG_COMMIT_ERROR,
"Failed to commit Iceberg update to table: " + table.getSchemaTableName(),
e);
}
}
if (!fullyDeletedFiles.isEmpty()) {
try {
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
fileSystem.deleteFiles(
fullyDeletedFiles.values().stream()
.flatMap(Collection::stream)
.map(CommitTaskData::getPath)
.collect(toImmutableSet()));
} catch (IOException e) {
log.warn(e, "Failed to clean up uncommitted position delete files");
}
}
try {
if (!fullyDeletedFiles.isEmpty()) {
DeleteFiles deleteFiles = transaction.newDelete();
fullyDeletedFiles.keySet().forEach(deleteFiles::deleteFile);
commit(deleteFiles, session);
}
transaction.commitTransaction();
} catch (ValidationException e) {
throw new TrinoException(
ICEBERG_COMMIT_ERROR,
"Failed to commit Iceberg update to table: " + table.getSchemaTableName(),
e);
}
transaction = null;
}