amoro-format-mixed/amoro-mixed-spark/v3.2/amoro-mixed-spark-3.2/src/main/java/org/apache/amoro/spark/table/MixedSparkTable.java [53:238]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public class MixedSparkTable
    implements Table,
        SupportsRead,
        SupportsWrite,
        SupportsRowLevelOperator,
        SupportsPartitionManagement {
  private static final Set<String> RESERVED_PROPERTIES =
      Sets.newHashSet("provider", "format", "current-snapshot-id");
  private static final Set<TableCapability> CAPABILITIES =
      ImmutableSet.of(
          TableCapability.BATCH_READ,
          TableCapability.BATCH_WRITE,
          TableCapability.STREAMING_WRITE,
          TableCapability.OVERWRITE_BY_FILTER,
          TableCapability.OVERWRITE_DYNAMIC);

  private final MixedTable mixedTable;
  private final String sparkCatalogName;
  private StructType lazyTableSchema = null;
  private SparkSession lazySpark = null;
  private final MixedFormatCatalog catalog;

  public static Table ofMixedTable(
      MixedTable table, MixedFormatCatalog catalog, String sparkCatalogName) {
    if (table.isUnkeyedTable()) {
      if (!(table instanceof SupportHive)) {
        return new UnkeyedSparkTable(table.asUnkeyedTable(), false, sparkCatalogName);
      }
    }
    return new MixedSparkTable(table, catalog, sparkCatalogName);
  }

  public MixedSparkTable(
      MixedTable mixedTable, MixedFormatCatalog catalog, String sparkCatalogName) {
    this.mixedTable = mixedTable;
    this.sparkCatalogName = sparkCatalogName;
    this.catalog = catalog;
  }

  private SparkSession sparkSession() {
    if (lazySpark == null) {
      this.lazySpark = SparkSession.active();
    }

    return lazySpark;
  }

  public MixedTable table() {
    return mixedTable;
  }

  @Override
  public String name() {
    return sparkCatalogName
        + "."
        + mixedTable.id().getDatabase()
        + "."
        + mixedTable.id().getTableName();
  }

  @Override
  public StructType schema() {
    if (lazyTableSchema == null) {
      Schema tableSchema = mixedTable.schema();
      this.lazyTableSchema = SparkSchemaUtil.convert(tableSchema);
    }

    return lazyTableSchema;
  }

  @Override
  public Transform[] partitioning() {
    return Spark3Util.toTransforms(mixedTable.spec());
  }

  @Override
  public Map<String, String> properties() {
    ImmutableMap.Builder<String, String> propsBuilder = ImmutableMap.builder();

    if (!mixedTable.properties().containsKey(TableProperties.BASE_FILE_FORMAT)) {
      propsBuilder.put(TableProperties.BASE_FILE_FORMAT, TableProperties.BASE_FILE_FORMAT_DEFAULT);
    }

    if (!mixedTable.properties().containsKey(TableProperties.DELTA_FILE_FORMAT)) {
      propsBuilder.put(
          TableProperties.DELTA_FILE_FORMAT,
          mixedTable
              .properties()
              .getOrDefault(
                  TableProperties.CHANGE_FILE_FORMAT, TableProperties.CHANGE_FILE_FORMAT_DEFAULT));
    }
    propsBuilder.put("provider", MixedFormatSparkUtils.mixedTableProvider(table()));
    mixedTable.properties().entrySet().stream()
        .filter(entry -> !RESERVED_PROPERTIES.contains(entry.getKey()))
        .forEach(propsBuilder::put);

    return propsBuilder.build();
  }

  @Override
  public Set<TableCapability> capabilities() {
    return CAPABILITIES;
  }

  @Override
  public String toString() {
    return mixedTable.toString();
  }

  @Override
  public boolean equals(Object other) {
    if (this == other) {
      return true;
    } else if (other == null || getClass() != other.getClass()) {
      return false;
    }

    // use only name in order to correctly invalidate Spark cache
    MixedSparkTable that = (MixedSparkTable) other;
    return mixedTable.id().equals(that.mixedTable.id());
  }

  @Override
  public int hashCode() {
    // use only name in order to correctly invalidate Spark cache
    return mixedTable.id().hashCode();
  }

  @Override
  public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
    return new SparkScanBuilder(sparkSession(), mixedTable, options);
  }

  @Override
  public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
    return new MixedFormatSparkWriteBuilder(mixedTable, info, catalog);
  }

  @Override
  public SupportsExtendIdentColumns newUpsertScanBuilder(CaseInsensitiveStringMap options) {
    return new SparkScanBuilder(sparkSession(), mixedTable, options);
  }

  @Override
  public boolean requireAdditionIdentifierColumns() {
    return true;
  }

  @Override
  public boolean appendAsUpsert() {
    return mixedTable.isKeyedTable()
        && Boolean.parseBoolean(
            mixedTable.properties().getOrDefault(TableProperties.UPSERT_ENABLED, "false"));
  }

  @Override
  public StructType partitionSchema() {
    return SparkSchemaUtil.convert(new Schema(table().spec().partitionType().fields()));
  }

  @Override
  public void createPartition(InternalRow ident, Map<String, String> properties)
      throws PartitionAlreadyExistsException, UnsupportedOperationException {
    throw new UnsupportedOperationException("not supported create partition");
  }

  @Override
  public boolean dropPartition(InternalRow ident) {
    return false;
  }

  @Override
  public void replacePartitionMetadata(InternalRow ident, Map<String, String> properties)
      throws NoSuchPartitionException, UnsupportedOperationException {
    throw new UnsupportedOperationException("not supported replace partition");
  }

  @Override
  public Map<String, String> loadPartitionMetadata(InternalRow ident)
      throws UnsupportedOperationException {
    return null;
  }

  @Override
  public InternalRow[] listPartitionIdentifiers(String[] names, InternalRow ident) {
    return new InternalRow[0];
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/java/org/apache/amoro/spark/table/MixedSparkTable.java [53:238]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public class MixedSparkTable
    implements Table,
        SupportsRead,
        SupportsWrite,
        SupportsRowLevelOperator,
        SupportsPartitionManagement {
  private static final Set<String> RESERVED_PROPERTIES =
      Sets.newHashSet("provider", "format", "current-snapshot-id");
  private static final Set<TableCapability> CAPABILITIES =
      ImmutableSet.of(
          TableCapability.BATCH_READ,
          TableCapability.BATCH_WRITE,
          TableCapability.STREAMING_WRITE,
          TableCapability.OVERWRITE_BY_FILTER,
          TableCapability.OVERWRITE_DYNAMIC);

  private final MixedTable mixedTable;
  private final String sparkCatalogName;
  private StructType lazyTableSchema = null;
  private SparkSession lazySpark = null;
  private final MixedFormatCatalog catalog;

  public static Table ofMixedTable(
      MixedTable table, MixedFormatCatalog catalog, String sparkCatalogName) {
    if (table.isUnkeyedTable()) {
      if (!(table instanceof SupportHive)) {
        return new UnkeyedSparkTable(table.asUnkeyedTable(), false, sparkCatalogName);
      }
    }
    return new MixedSparkTable(table, catalog, sparkCatalogName);
  }

  public MixedSparkTable(
      MixedTable mixedTable, MixedFormatCatalog catalog, String sparkCatalogName) {
    this.mixedTable = mixedTable;
    this.sparkCatalogName = sparkCatalogName;
    this.catalog = catalog;
  }

  private SparkSession sparkSession() {
    if (lazySpark == null) {
      this.lazySpark = SparkSession.active();
    }

    return lazySpark;
  }

  public MixedTable table() {
    return mixedTable;
  }

  @Override
  public String name() {
    return sparkCatalogName
        + "."
        + mixedTable.id().getDatabase()
        + "."
        + mixedTable.id().getTableName();
  }

  @Override
  public StructType schema() {
    if (lazyTableSchema == null) {
      Schema tableSchema = mixedTable.schema();
      this.lazyTableSchema = SparkSchemaUtil.convert(tableSchema);
    }

    return lazyTableSchema;
  }

  @Override
  public Transform[] partitioning() {
    return Spark3Util.toTransforms(mixedTable.spec());
  }

  @Override
  public Map<String, String> properties() {
    ImmutableMap.Builder<String, String> propsBuilder = ImmutableMap.builder();

    if (!mixedTable.properties().containsKey(TableProperties.BASE_FILE_FORMAT)) {
      propsBuilder.put(TableProperties.BASE_FILE_FORMAT, TableProperties.BASE_FILE_FORMAT_DEFAULT);
    }

    if (!mixedTable.properties().containsKey(TableProperties.DELTA_FILE_FORMAT)) {
      propsBuilder.put(
          TableProperties.DELTA_FILE_FORMAT,
          mixedTable
              .properties()
              .getOrDefault(
                  TableProperties.CHANGE_FILE_FORMAT, TableProperties.CHANGE_FILE_FORMAT_DEFAULT));
    }
    propsBuilder.put("provider", MixedFormatSparkUtils.mixedTableProvider(table()));
    mixedTable.properties().entrySet().stream()
        .filter(entry -> !RESERVED_PROPERTIES.contains(entry.getKey()))
        .forEach(propsBuilder::put);

    return propsBuilder.build();
  }

  @Override
  public Set<TableCapability> capabilities() {
    return CAPABILITIES;
  }

  @Override
  public String toString() {
    return mixedTable.toString();
  }

  @Override
  public boolean equals(Object other) {
    if (this == other) {
      return true;
    } else if (other == null || getClass() != other.getClass()) {
      return false;
    }

    // use only name in order to correctly invalidate Spark cache
    MixedSparkTable that = (MixedSparkTable) other;
    return mixedTable.id().equals(that.mixedTable.id());
  }

  @Override
  public int hashCode() {
    // use only name in order to correctly invalidate Spark cache
    return mixedTable.id().hashCode();
  }

  @Override
  public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
    return new SparkScanBuilder(sparkSession(), mixedTable, options);
  }

  @Override
  public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
    return new MixedFormatSparkWriteBuilder(mixedTable, info, catalog);
  }

  @Override
  public SupportsExtendIdentColumns newUpsertScanBuilder(CaseInsensitiveStringMap options) {
    return new SparkScanBuilder(sparkSession(), mixedTable, options);
  }

  @Override
  public boolean requireAdditionIdentifierColumns() {
    return true;
  }

  @Override
  public boolean appendAsUpsert() {
    return mixedTable.isKeyedTable()
        && Boolean.parseBoolean(
            mixedTable.properties().getOrDefault(TableProperties.UPSERT_ENABLED, "false"));
  }

  @Override
  public StructType partitionSchema() {
    return SparkSchemaUtil.convert(new Schema(table().spec().partitionType().fields()));
  }

  @Override
  public void createPartition(InternalRow ident, Map<String, String> properties)
      throws PartitionAlreadyExistsException, UnsupportedOperationException {
    throw new UnsupportedOperationException("not supported create partition");
  }

  @Override
  public boolean dropPartition(InternalRow ident) {
    return false;
  }

  @Override
  public void replacePartitionMetadata(InternalRow ident, Map<String, String> properties)
      throws NoSuchPartitionException, UnsupportedOperationException {
    throw new UnsupportedOperationException("not supported replace partition");
  }

  @Override
  public Map<String, String> loadPartitionMetadata(InternalRow ident)
      throws UnsupportedOperationException {
    return null;
  }

  @Override
  public InternalRow[] listPartitionIdentifiers(String[] names, InternalRow ident) {
    return new InternalRow[0];
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



