private void finishWrite()

in amoro-format-mixed/amoro-mixed-trino/src/main/java/org/apache/amoro/trino/unkeyed/IcebergMetadata.java [800:962]


  private void finishWrite(
      ConnectorSession session,
      IcebergTableHandle table,
      Collection<Slice> fragments,
      boolean runUpdateValidations) {
    Table icebergTable = transaction.table();

    List<CommitTaskData> commitTasks =
        fragments.stream()
            .map(slice -> commitTaskCodec.fromJson(slice.getBytes()))
            .collect(toImmutableList());

    if (commitTasks.isEmpty()) {
      // Avoid recording "empty" write operation
      transaction = null;
      return;
    }

    Schema schema = SchemaParser.fromJson(table.getTableSchemaJson());

    Map<String, List<CommitTaskData>> deletesByFilePath =
        commitTasks.stream()
            .filter(task -> task.getContent() == POSITION_DELETES)
            .collect(groupingBy(task -> task.getReferencedDataFile().orElseThrow()));
    Map<String, List<CommitTaskData>> fullyDeletedFiles =
        deletesByFilePath.entrySet().stream()
            .filter(entry -> fileIsFullyDeleted(entry.getValue()))
            .collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue));

    if (!deletesByFilePath.keySet().equals(fullyDeletedFiles.keySet())
        || commitTasks.stream().anyMatch(task -> task.getContent() == FileContent.DATA)) {
      RowDelta rowDelta = transaction.newRowDelta();
      table
          .getSnapshotId()
          .map(icebergTable::snapshot)
          .ifPresent(s -> rowDelta.validateFromSnapshot(s.snapshotId()));
      TupleDomain<IcebergColumnHandle> dataColumnPredicate =
          table
              .getEnforcedPredicate()
              .filter((column, domain) -> !isMetadataColumnId(column.getId()));
      if (!dataColumnPredicate.isAll()) {
        rowDelta.conflictDetectionFilter(toIcebergExpression(dataColumnPredicate));
      }
      IsolationLevel isolationLevel =
          IsolationLevel.fromName(
              icebergTable
                  .properties()
                  .getOrDefault(DELETE_ISOLATION_LEVEL, DELETE_ISOLATION_LEVEL_DEFAULT));
      if (isolationLevel == IsolationLevel.SERIALIZABLE) {
        rowDelta.validateNoConflictingDataFiles();
      }

      if (runUpdateValidations) {
        // Ensure a row that is updated by this commit was not deleted by a separate commit
        rowDelta.validateDeletedFiles();
        rowDelta.validateNoConflictingDeleteFiles();
      }

      ImmutableSet.Builder<String> writtenFiles = ImmutableSet.builder();
      ImmutableSet.Builder<String> referencedDataFiles = ImmutableSet.builder();
      for (CommitTaskData task : commitTasks) {
        PartitionSpec partitionSpec =
            PartitionSpecParser.fromJson(schema, task.getPartitionSpecJson());
        Type[] partitionColumnTypes =
            partitionSpec.fields().stream()
                .map(field -> field.transform().getResultType(schema.findType(field.sourceId())))
                .toArray(Type[]::new);
        switch (task.getContent()) {
          case POSITION_DELETES:
            if (fullyDeletedFiles.containsKey(task.getReferencedDataFile().orElseThrow())) {
              continue;
            }

            FileMetadata.Builder deleteBuilder =
                FileMetadata.deleteFileBuilder(partitionSpec)
                    .withPath(task.getPath())
                    .withFormat(task.getFileFormat().toIceberg())
                    .ofPositionDeletes()
                    .withFileSizeInBytes(task.getFileSizeInBytes())
                    .withMetrics(task.getMetrics().metrics());

            if (!partitionSpec.fields().isEmpty()) {
              String partitionDataJson =
                  task.getPartitionDataJson()
                      .orElseThrow(
                          () -> new VerifyException("No partition data for partitioned table"));
              deleteBuilder.withPartition(
                  PartitionData.fromJson(partitionDataJson, partitionColumnTypes));
            }

            rowDelta.addDeletes(deleteBuilder.build());
            writtenFiles.add(task.getPath());
            task.getReferencedDataFile().ifPresent(referencedDataFiles::add);
            break;
          case DATA:
            DataFiles.Builder builder =
                DataFiles.builder(partitionSpec)
                    .withPath(task.getPath())
                    .withFormat(task.getFileFormat().toIceberg())
                    .withFileSizeInBytes(task.getFileSizeInBytes())
                    .withMetrics(task.getMetrics().metrics());

            if (!icebergTable.spec().fields().isEmpty()) {
              String partitionDataJson =
                  task.getPartitionDataJson()
                      .orElseThrow(
                          () -> new VerifyException("No partition data for partitioned table"));
              builder.withPartition(
                  PartitionData.fromJson(partitionDataJson, partitionColumnTypes));
            }
            rowDelta.addRows(builder.build());
            writtenFiles.add(task.getPath());
            break;
          default:
            throw new UnsupportedOperationException(
                "Unsupported task content: " + task.getContent());
        }
      }

      // try to leave as little garbage as possible behind
      if (table.getRetryMode() != NO_RETRIES) {
        cleanExtraOutputFiles(session, writtenFiles.build());
      }

      rowDelta.validateDataFilesExist(referencedDataFiles.build());
      try {
        commit(rowDelta, session);
      } catch (ValidationException e) {
        throw new TrinoException(
            ICEBERG_COMMIT_ERROR,
            "Failed to commit Iceberg update to table: " + table.getSchemaTableName(),
            e);
      }
    }

    if (!fullyDeletedFiles.isEmpty()) {
      try {
        TrinoFileSystem fileSystem = fileSystemFactory.create(session);
        fileSystem.deleteFiles(
            fullyDeletedFiles.values().stream()
                .flatMap(Collection::stream)
                .map(CommitTaskData::getPath)
                .collect(toImmutableSet()));
      } catch (IOException e) {
        log.warn(e, "Failed to clean up uncommitted position delete files");
      }
    }

    try {
      if (!fullyDeletedFiles.isEmpty()) {
        DeleteFiles deleteFiles = transaction.newDelete();
        fullyDeletedFiles.keySet().forEach(deleteFiles::deleteFile);
        commit(deleteFiles, session);
      }
      transaction.commitTransaction();
    } catch (ValidationException e) {
      throw new TrinoException(
          ICEBERG_COMMIT_ERROR,
          "Failed to commit Iceberg update to table: " + table.getSchemaTableName(),
          e);
    }
    transaction = null;
  }