amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/java/org/apache/amoro/spark/writer/UnkeyedSparkBatchWrite.java [68:322]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public class UnkeyedSparkBatchWrite
    implements MixedFormatSparkWriteBuilder.MixedFormatWrite, Write {

  private final UnkeyedTable table;
  private final StructType dsSchema;
  private final String hiveSubdirectory = HiveTableUtil.newHiveSubdirectory();

  private final boolean orderedWriter;

  private final MixedFormatCatalog catalog;

  public UnkeyedSparkBatchWrite(
      UnkeyedTable table, LogicalWriteInfo info, MixedFormatCatalog catalog) {
    this.table = table;
    this.dsSchema = info.schema();
    this.orderedWriter =
        Boolean.parseBoolean(
            info.options().getOrDefault("writer.distributed-and-ordered", "false"));
    this.catalog = catalog;
  }

  @Override
  public BatchWrite asBatchAppend() {
    return new AppendWrite();
  }

  @Override
  public BatchWrite asDynamicOverwrite() {
    return new DynamicOverwrite();
  }

  @Override
  public BatchWrite asOverwriteByFilter(Expression overwriteExpr) {
    return new OverwriteByFilter(overwriteExpr);
  }

  @Override
  public BatchWrite asUpsertWrite() {
    return new UpsertWrite();
  }

  private abstract class BaseBatchWrite implements BatchWrite {

    protected TableBlockerManager tableBlockerManager;
    protected Blocker block;

    @Override
    public void abort(WriterCommitMessage[] messages) {
      try {
        Map<String, String> props = table.properties();
        Tasks.foreach(WriteTaskCommit.files(messages))
            .retry(
                PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
            .exponentialBackoff(
                PropertyUtil.propertyAsInt(
                    props, COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
                PropertyUtil.propertyAsInt(
                    props, COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
                PropertyUtil.propertyAsInt(
                    props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
                2.0 /* exponential */)
            .throwFailureWhenFinished()
            .run(
                file -> {
                  table.io().deleteFile(file.path().toString());
                });
      } finally {
        tableBlockerManager.release(block);
      }
    }

    public void checkBlocker(TableBlockerManager tableBlockerManager) {
      List<String> blockerIds =
          tableBlockerManager.getBlockers().stream()
              .map(Blocker::blockerId)
              .collect(Collectors.toList());
      if (!blockerIds.contains(block.blockerId())) {
        throw new IllegalStateException("block is not in blockerManager");
      }
    }

    public void getBlocker() {
      this.tableBlockerManager = catalog.getTableBlockerManager(table.id());
      ArrayList<BlockableOperation> operations = Lists.newArrayList();
      operations.add(BlockableOperation.BATCH_WRITE);
      operations.add(BlockableOperation.OPTIMIZE);
      try {
        this.block = tableBlockerManager.block(operations);
      } catch (OperationConflictException e) {
        throw new IllegalStateException(
            "failed to block table " + table.id() + " with " + operations, e);
      }
    }
  }

  private class AppendWrite extends BaseBatchWrite {

    @Override
    public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
      getBlocker();
      return new WriterFactory(table, dsSchema, false, null, orderedWriter);
    }

    @Override
    public void commit(WriterCommitMessage[] messages) {
      checkBlocker(tableBlockerManager);
      AppendFiles appendFiles = table.newAppend();
      for (DataFile file : WriteTaskCommit.files(messages)) {
        appendFiles.appendFile(file);
      }
      appendFiles.commit();
      tableBlockerManager.release(block);
    }
  }

  private class DynamicOverwrite extends BaseBatchWrite {

    @Override
    public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
      getBlocker();
      return new WriterFactory(table, dsSchema, true, hiveSubdirectory, orderedWriter);
    }

    @Override
    public void commit(WriterCommitMessage[] messages) {
      checkBlocker(tableBlockerManager);
      ReplacePartitions replacePartitions = table.newReplacePartitions();
      for (DataFile file : WriteTaskCommit.files(messages)) {
        replacePartitions.addFile(file);
      }
      replacePartitions.commit();
      tableBlockerManager.release(block);
    }
  }

  private class OverwriteByFilter extends BaseBatchWrite {
    private final Expression overwriteExpr;

    private OverwriteByFilter(Expression overwriteExpr) {
      this.overwriteExpr = overwriteExpr;
    }

    @Override
    public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
      getBlocker();
      return new WriterFactory(table, dsSchema, true, hiveSubdirectory, orderedWriter);
    }

    @Override
    public void commit(WriterCommitMessage[] messages) {
      checkBlocker(tableBlockerManager);
      OverwriteFiles overwriteFiles = table.newOverwrite();
      overwriteFiles.overwriteByRowFilter(overwriteExpr);
      overwriteFiles.set(DELETE_UNTRACKED_HIVE_FILE, "true");
      for (DataFile file : WriteTaskCommit.files(messages)) {
        overwriteFiles.addFile(file);
      }
      overwriteFiles.commit();
      tableBlockerManager.release(block);
    }
  }

  private class UpsertWrite extends BaseBatchWrite {
    @Override
    public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
      getBlocker();
      return new DeltaUpsertWriteFactory(table, dsSchema, orderedWriter);
    }

    @Override
    public void commit(WriterCommitMessage[] messages) {
      checkBlocker(tableBlockerManager);
      RowDelta rowDelta = table.newRowDelta();
      if (WriteTaskCommit.deleteFiles(messages).iterator().hasNext()) {
        for (DeleteFile file : WriteTaskCommit.deleteFiles(messages)) {
          rowDelta.addDeletes(file);
        }
      }
      if (WriteTaskCommit.files(messages).iterator().hasNext()) {
        for (DataFile file : WriteTaskCommit.files(messages)) {
          rowDelta.addRows(file);
        }
      }
      rowDelta.commit();
      tableBlockerManager.release(block);
    }
  }

  private static class WriterFactory implements DataWriterFactory, Serializable {
    protected final UnkeyedTable table;
    protected final StructType dsSchema;

    protected final String hiveSubdirectory;

    protected final boolean isOverwrite;
    protected final boolean orderedWriter;

    WriterFactory(
        UnkeyedTable table,
        StructType dsSchema,
        boolean isOverwrite,
        String hiveSubdirectory,
        boolean orderedWrite) {
      this.table = table;
      this.dsSchema = dsSchema;
      this.isOverwrite = isOverwrite;
      this.hiveSubdirectory = hiveSubdirectory;
      this.orderedWriter = orderedWrite;
    }

    @Override
    public DataWriter<InternalRow> createWriter(int partitionId, long taskId) {
      TaskWriters builder =
          TaskWriters.of(table)
              .withPartitionId(partitionId)
              .withTaskId(taskId)
              .withOrderedWriter(orderedWriter)
              .withDataSourceSchema(dsSchema)
              .withHiveSubdirectory(hiveSubdirectory);

      TaskWriter<InternalRow> writer = builder.newBaseWriter(this.isOverwrite);
      return new SimpleInternalRowDataWriter(writer);
    }

    public TaskWriter<InternalRow> newWriter(int partitionId, long taskId, StructType schema) {
      return TaskWriters.of(table)
          .withPartitionId(partitionId)
          .withTaskId(taskId)
          .withDataSourceSchema(schema)
          .newUnkeyedUpsertWriter();
    }
  }

  private static class DeltaUpsertWriteFactory extends WriterFactory {

    DeltaUpsertWriteFactory(UnkeyedTable table, StructType dsSchema, boolean ordredWriter) {
      super(table, dsSchema, false, null, ordredWriter);
    }

    @Override
    public DataWriter<InternalRow> createWriter(int partitionId, long taskId) {
      StructType schema =
          new StructType(
              Arrays.stream(dsSchema.fields())
                  .filter(
                      f ->
                          !f.name().equals("_file")
                              && !f.name().equals("_pos")
                              && !f.name().equals("_upsert_op"))
                  .toArray(StructField[]::new));
      return new SimpleRowLevelDataWriter(
          newWriter(partitionId, taskId, schema),
          newWriter(partitionId, taskId, schema),
          dsSchema,
          table.isKeyedTable());
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/main/java/org/apache/amoro/spark/writer/UnkeyedSparkBatchWrite.java [68:322]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public class UnkeyedSparkBatchWrite
    implements MixedFormatSparkWriteBuilder.MixedFormatWrite, Write {

  private final UnkeyedTable table;
  private final StructType dsSchema;
  private final String hiveSubdirectory = HiveTableUtil.newHiveSubdirectory();

  private final boolean orderedWriter;

  private final MixedFormatCatalog catalog;

  public UnkeyedSparkBatchWrite(
      UnkeyedTable table, LogicalWriteInfo info, MixedFormatCatalog catalog) {
    this.table = table;
    this.dsSchema = info.schema();
    this.orderedWriter =
        Boolean.parseBoolean(
            info.options().getOrDefault("writer.distributed-and-ordered", "false"));
    this.catalog = catalog;
  }

  @Override
  public BatchWrite asBatchAppend() {
    return new AppendWrite();
  }

  @Override
  public BatchWrite asDynamicOverwrite() {
    return new DynamicOverwrite();
  }

  @Override
  public BatchWrite asOverwriteByFilter(Expression overwriteExpr) {
    return new OverwriteByFilter(overwriteExpr);
  }

  @Override
  public BatchWrite asUpsertWrite() {
    return new UpsertWrite();
  }

  private abstract class BaseBatchWrite implements BatchWrite {

    protected TableBlockerManager tableBlockerManager;
    protected Blocker block;

    @Override
    public void abort(WriterCommitMessage[] messages) {
      try {
        Map<String, String> props = table.properties();
        Tasks.foreach(WriteTaskCommit.files(messages))
            .retry(
                PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
            .exponentialBackoff(
                PropertyUtil.propertyAsInt(
                    props, COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
                PropertyUtil.propertyAsInt(
                    props, COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
                PropertyUtil.propertyAsInt(
                    props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
                2.0 /* exponential */)
            .throwFailureWhenFinished()
            .run(
                file -> {
                  table.io().deleteFile(file.path().toString());
                });
      } finally {
        tableBlockerManager.release(block);
      }
    }

    public void checkBlocker(TableBlockerManager tableBlockerManager) {
      List<String> blockerIds =
          tableBlockerManager.getBlockers().stream()
              .map(Blocker::blockerId)
              .collect(Collectors.toList());
      if (!blockerIds.contains(block.blockerId())) {
        throw new IllegalStateException("block is not in blockerManager");
      }
    }

    public void getBlocker() {
      this.tableBlockerManager = catalog.getTableBlockerManager(table.id());
      ArrayList<BlockableOperation> operations = Lists.newArrayList();
      operations.add(BlockableOperation.BATCH_WRITE);
      operations.add(BlockableOperation.OPTIMIZE);
      try {
        this.block = tableBlockerManager.block(operations);
      } catch (OperationConflictException e) {
        throw new IllegalStateException(
            "failed to block table " + table.id() + " with " + operations, e);
      }
    }
  }

  private class AppendWrite extends BaseBatchWrite {

    @Override
    public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
      getBlocker();
      return new WriterFactory(table, dsSchema, false, null, orderedWriter);
    }

    @Override
    public void commit(WriterCommitMessage[] messages) {
      checkBlocker(tableBlockerManager);
      AppendFiles appendFiles = table.newAppend();
      for (DataFile file : WriteTaskCommit.files(messages)) {
        appendFiles.appendFile(file);
      }
      appendFiles.commit();
      tableBlockerManager.release(block);
    }
  }

  private class DynamicOverwrite extends BaseBatchWrite {

    @Override
    public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
      getBlocker();
      return new WriterFactory(table, dsSchema, true, hiveSubdirectory, orderedWriter);
    }

    @Override
    public void commit(WriterCommitMessage[] messages) {
      checkBlocker(tableBlockerManager);
      ReplacePartitions replacePartitions = table.newReplacePartitions();
      for (DataFile file : WriteTaskCommit.files(messages)) {
        replacePartitions.addFile(file);
      }
      replacePartitions.commit();
      tableBlockerManager.release(block);
    }
  }

  private class OverwriteByFilter extends BaseBatchWrite {
    private final Expression overwriteExpr;

    private OverwriteByFilter(Expression overwriteExpr) {
      this.overwriteExpr = overwriteExpr;
    }

    @Override
    public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
      getBlocker();
      return new WriterFactory(table, dsSchema, true, hiveSubdirectory, orderedWriter);
    }

    @Override
    public void commit(WriterCommitMessage[] messages) {
      checkBlocker(tableBlockerManager);
      OverwriteFiles overwriteFiles = table.newOverwrite();
      overwriteFiles.overwriteByRowFilter(overwriteExpr);
      overwriteFiles.set(DELETE_UNTRACKED_HIVE_FILE, "true");
      for (DataFile file : WriteTaskCommit.files(messages)) {
        overwriteFiles.addFile(file);
      }
      overwriteFiles.commit();
      tableBlockerManager.release(block);
    }
  }

  private class UpsertWrite extends BaseBatchWrite {
    @Override
    public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
      getBlocker();
      return new DeltaUpsertWriteFactory(table, dsSchema, orderedWriter);
    }

    @Override
    public void commit(WriterCommitMessage[] messages) {
      checkBlocker(tableBlockerManager);
      RowDelta rowDelta = table.newRowDelta();
      if (WriteTaskCommit.deleteFiles(messages).iterator().hasNext()) {
        for (DeleteFile file : WriteTaskCommit.deleteFiles(messages)) {
          rowDelta.addDeletes(file);
        }
      }
      if (WriteTaskCommit.files(messages).iterator().hasNext()) {
        for (DataFile file : WriteTaskCommit.files(messages)) {
          rowDelta.addRows(file);
        }
      }
      rowDelta.commit();
      tableBlockerManager.release(block);
    }
  }

  private static class WriterFactory implements DataWriterFactory, Serializable {
    protected final UnkeyedTable table;
    protected final StructType dsSchema;

    protected final String hiveSubdirectory;

    protected final boolean isOverwrite;
    protected final boolean orderedWriter;

    WriterFactory(
        UnkeyedTable table,
        StructType dsSchema,
        boolean isOverwrite,
        String hiveSubdirectory,
        boolean orderedWrite) {
      this.table = table;
      this.dsSchema = dsSchema;
      this.isOverwrite = isOverwrite;
      this.hiveSubdirectory = hiveSubdirectory;
      this.orderedWriter = orderedWrite;
    }

    @Override
    public DataWriter<InternalRow> createWriter(int partitionId, long taskId) {
      TaskWriters builder =
          TaskWriters.of(table)
              .withPartitionId(partitionId)
              .withTaskId(taskId)
              .withOrderedWriter(orderedWriter)
              .withDataSourceSchema(dsSchema)
              .withHiveSubdirectory(hiveSubdirectory);

      TaskWriter<InternalRow> writer = builder.newBaseWriter(this.isOverwrite);
      return new SimpleInternalRowDataWriter(writer);
    }

    public TaskWriter<InternalRow> newWriter(int partitionId, long taskId, StructType schema) {
      return TaskWriters.of(table)
          .withPartitionId(partitionId)
          .withTaskId(taskId)
          .withDataSourceSchema(schema)
          .newUnkeyedUpsertWriter();
    }
  }

  private static class DeltaUpsertWriteFactory extends WriterFactory {

    DeltaUpsertWriteFactory(UnkeyedTable table, StructType dsSchema, boolean ordredWriter) {
      super(table, dsSchema, false, null, ordredWriter);
    }

    @Override
    public DataWriter<InternalRow> createWriter(int partitionId, long taskId) {
      StructType schema =
          new StructType(
              Arrays.stream(dsSchema.fields())
                  .filter(
                      f ->
                          !f.name().equals("_file")
                              && !f.name().equals("_pos")
                              && !f.name().equals("_upsert_op"))
                  .toArray(StructField[]::new));
      return new SimpleRowLevelDataWriter(
          newWriter(partitionId, taskId, schema),
          newWriter(partitionId, taskId, schema),
          dsSchema,
          table.isKeyedTable());
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



