amoro-format-mixed/amoro-mixed-spark/v3.2/amoro-mixed-spark-3.2/src/main/java/org/apache/amoro/spark/MixedFormatSparkCatalog.java [66:237]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public class MixedFormatSparkCatalog extends MixedSparkCatalogBase {

  @Override
  public Table loadTable(Identifier ident) throws NoSuchTableException {
    checkAndRefreshCatalogMeta();
    TableIdentifier identifier;
    MixedTable table;
    try {
      if (isInnerTableIdentifier(ident)) {
        MixedTableStoreType type = MixedTableStoreType.from(ident.name());
        identifier = buildInnerTableIdentifier(ident);
        table = catalog.loadTable(identifier);
        return loadInnerTable(table, type);
      } else {
        identifier = buildIdentifier(ident);
        table = catalog.loadTable(identifier);
      }
    } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
      throw new NoSuchTableException(ident);
    }
    return MixedSparkTable.ofMixedTable(table, catalog, name());
  }

  private Table loadInnerTable(MixedTable table, MixedTableStoreType type) {
    if (type != null) {
      switch (type) {
        case CHANGE:
          return new SparkChangeTable(
              (BasicUnkeyedTable) table.asKeyedTable().changeTable(), false);
        default:
          throw new IllegalArgumentException("Unknown inner table type: " + type);
      }
    } else {
      throw new IllegalArgumentException("Table does not exist: " + table);
    }
  }

  @Override
  public Table createTable(
      Identifier ident, StructType schema, Transform[] transforms, Map<String, String> properties)
      throws TableAlreadyExistsException {
    checkAndRefreshCatalogMeta();
    properties = Maps.newHashMap(properties);
    Schema finalSchema = checkAndConvertSchema(schema, properties);
    TableIdentifier identifier = buildIdentifier(ident);
    TableBuilder builder = catalog.newTableBuilder(identifier, finalSchema);
    PartitionSpec spec = Spark3Util.toPartitionSpec(finalSchema, transforms);
    if (properties.containsKey(TableCatalog.PROP_LOCATION)
        && isIdentifierLocation(properties.get(TableCatalog.PROP_LOCATION), ident)) {
      properties.remove(TableCatalog.PROP_LOCATION);
    }
    try {
      if (properties.containsKey("primary.keys")) {
        PrimaryKeySpec primaryKeySpec =
            PrimaryKeySpec.fromDescription(finalSchema, properties.get("primary.keys"));
        properties.remove("primary.keys");
        builder
            .withPartitionSpec(spec)
            .withProperties(properties)
            .withPrimaryKeySpec(primaryKeySpec);
      } else {
        builder.withPartitionSpec(spec).withProperties(properties);
      }
      MixedTable table = builder.create();
      return MixedSparkTable.ofMixedTable(table, catalog, name());
    } catch (AlreadyExistsException e) {
      throw new TableAlreadyExistsException("Table " + ident + " already exists", Option.apply(e));
    }
  }

  private Schema checkAndConvertSchema(StructType schema, Map<String, String> properties) {
    Schema convertSchema;
    boolean useTimestampWithoutZoneInNewTables;
    SparkSession sparkSession = SparkSession.active();
    if (CatalogUtil.isMixedHiveCatalog(catalog)) {
      useTimestampWithoutZoneInNewTables = true;
    } else {
      useTimestampWithoutZoneInNewTables =
          Boolean.parseBoolean(
              sparkSession
                  .conf()
                  .get(
                      USE_TIMESTAMP_WITHOUT_TIME_ZONE_IN_NEW_TABLES,
                      USE_TIMESTAMP_WITHOUT_TIME_ZONE_IN_NEW_TABLES_DEFAULT));
    }
    if (useTimestampWithoutZoneInNewTables) {
      sparkSession.conf().set(HANDLE_TIMESTAMP_WITHOUT_TIMEZONE, true);
      convertSchema = SparkSchemaUtil.convert(schema, true);
    } else {
      convertSchema = SparkSchemaUtil.convert(schema, false);
    }

    // schema add primary keys
    if (properties.containsKey("primary.keys")) {
      PrimaryKeySpec primaryKeySpec =
          PrimaryKeySpec.fromDescription(convertSchema, properties.get("primary.keys"));
      List<String> primaryKeys = primaryKeySpec.fieldNames();
      Set<String> pkSet = new HashSet<>(primaryKeys);
      Set<Integer> identifierFieldIds = new HashSet<>();
      List<Types.NestedField> columnsWithPk = new ArrayList<>();
      convertSchema
          .columns()
          .forEach(
              nestedField -> {
                if (pkSet.contains(nestedField.name())) {
                  columnsWithPk.add(nestedField.asRequired());
                  identifierFieldIds.add(nestedField.fieldId());
                } else {
                  columnsWithPk.add(nestedField);
                }
              });
      return new Schema(columnsWithPk, identifierFieldIds);
    }
    return convertSchema;
  }

  @Override
  public Table alterTable(Identifier ident, TableChange... changes) throws NoSuchTableException {
    TableIdentifier identifier = buildIdentifier(ident);
    MixedTable table;
    try {
      table = catalog.loadTable(identifier);
    } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
      throw new NoSuchTableException(ident);
    }
    if (table.isUnkeyedTable()) {
      alterUnKeyedTable(table.asUnkeyedTable(), changes);
      return MixedSparkTable.ofMixedTable(table, catalog, name());
    } else if (table.isKeyedTable()) {
      alterKeyedTable(table.asKeyedTable(), changes);
      return MixedSparkTable.ofMixedTable(table, catalog, name());
    }
    throw new UnsupportedOperationException("Unsupported alter table");
  }

  private void alterKeyedTable(KeyedTable table, TableChange... changes) {
    List<TableChange> schemaChanges = new ArrayList<>();
    List<TableChange> propertyChanges = new ArrayList<>();
    for (TableChange change : changes) {
      if (change instanceof ColumnChange) {
        schemaChanges.add(change);
      } else if (change instanceof SetProperty) {
        propertyChanges.add(change);
      } else if (change instanceof RemoveProperty) {
        propertyChanges.add(change);
      } else {
        throw new UnsupportedOperationException("Cannot apply unknown table change: " + change);
      }
    }
    commitKeyedChanges(table, schemaChanges, propertyChanges);
  }

  private void commitKeyedChanges(
      KeyedTable table, List<TableChange> schemaChanges, List<TableChange> propertyChanges) {
    if (!schemaChanges.isEmpty()) {
      Spark3Util.applySchemaChanges(table.updateSchema(), schemaChanges).commit();
    }

    if (!propertyChanges.isEmpty()) {
      Spark3Util.applyPropertyChanges(table.updateProperties(), propertyChanges).commit();
    }
  }

  private void alterUnKeyedTable(UnkeyedTable table, TableChange... changes) {
    SetProperty setLocation = null;
    SetProperty setSnapshotId = null;
    SetProperty pickSnapshotId = null;
    List<TableChange> propertyChanges = new ArrayList<>();
    List<TableChange> schemaChanges = new ArrayList<>();

    for (TableChange change : changes) {
      if (change instanceof SetProperty) {
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/java/org/apache/amoro/spark/MixedFormatSparkCatalog.java [68:239]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public class MixedFormatSparkCatalog extends MixedSparkCatalogBase {

  @Override
  public Table loadTable(Identifier ident) throws NoSuchTableException {
    checkAndRefreshCatalogMeta();
    TableIdentifier identifier;
    MixedTable table;
    try {
      if (isInnerTableIdentifier(ident)) {
        MixedTableStoreType type = MixedTableStoreType.from(ident.name());
        identifier = buildInnerTableIdentifier(ident);
        table = catalog.loadTable(identifier);
        return loadInnerTable(table, type);
      } else {
        identifier = buildIdentifier(ident);
        table = catalog.loadTable(identifier);
      }
    } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
      throw new NoSuchTableException(ident);
    }
    return MixedSparkTable.ofMixedTable(table, catalog, name());
  }

  private Table loadInnerTable(MixedTable table, MixedTableStoreType type) {
    if (type != null) {
      switch (type) {
        case CHANGE:
          return new SparkChangeTable(
              (BasicUnkeyedTable) table.asKeyedTable().changeTable(), false);
        default:
          throw new IllegalArgumentException("Unknown inner table type: " + type);
      }
    } else {
      throw new IllegalArgumentException("Table does not exist: " + table);
    }
  }

  @Override
  public Table createTable(
      Identifier ident, StructType schema, Transform[] transforms, Map<String, String> properties)
      throws TableAlreadyExistsException {
    checkAndRefreshCatalogMeta();
    properties = Maps.newHashMap(properties);
    Schema finalSchema = checkAndConvertSchema(schema, properties);
    TableIdentifier identifier = buildIdentifier(ident);
    TableBuilder builder = catalog.newTableBuilder(identifier, finalSchema);
    PartitionSpec spec = Spark3Util.toPartitionSpec(finalSchema, transforms);
    if (properties.containsKey(TableCatalog.PROP_LOCATION)
        && isIdentifierLocation(properties.get(TableCatalog.PROP_LOCATION), ident)) {
      properties.remove(TableCatalog.PROP_LOCATION);
    }
    try {
      if (properties.containsKey("primary.keys")) {
        PrimaryKeySpec primaryKeySpec =
            PrimaryKeySpec.fromDescription(finalSchema, properties.get("primary.keys"));
        properties.remove("primary.keys");
        builder
            .withPartitionSpec(spec)
            .withProperties(properties)
            .withPrimaryKeySpec(primaryKeySpec);
      } else {
        builder.withPartitionSpec(spec).withProperties(properties);
      }
      MixedTable table = builder.create();
      return MixedSparkTable.ofMixedTable(table, catalog, name());
    } catch (AlreadyExistsException e) {
      throw new TableAlreadyExistsException("Table " + ident + " already exists", Option.apply(e));
    }
  }

  private Schema checkAndConvertSchema(StructType schema, Map<String, String> properties) {
    Schema convertSchema;
    boolean useTimestampWithoutZoneInNewTables;
    SparkSession sparkSession = SparkSession.active();
    if (CatalogUtil.isMixedHiveCatalog(catalog)) {
      useTimestampWithoutZoneInNewTables = true;
    } else {
      useTimestampWithoutZoneInNewTables =
          Boolean.parseBoolean(
              sparkSession
                  .conf()
                  .get(
                      USE_TIMESTAMP_WITHOUT_TIME_ZONE_IN_NEW_TABLES,
                      USE_TIMESTAMP_WITHOUT_TIME_ZONE_IN_NEW_TABLES_DEFAULT));
    }
    if (useTimestampWithoutZoneInNewTables) {
      sparkSession.conf().set(HANDLE_TIMESTAMP_WITHOUT_TIMEZONE, true);
      convertSchema = SparkSchemaUtil.convert(schema, true);
    } else {
      convertSchema = SparkSchemaUtil.convert(schema, false);
    }

    // schema add primary keys
    if (properties.containsKey("primary.keys")) {
      PrimaryKeySpec primaryKeySpec =
          PrimaryKeySpec.fromDescription(convertSchema, properties.get("primary.keys"));
      List<String> primaryKeys = primaryKeySpec.fieldNames();
      Set<String> pkSet = new HashSet<>(primaryKeys);
      Set<Integer> identifierFieldIds = new HashSet<>();
      List<Types.NestedField> columnsWithPk = new ArrayList<>();
      convertSchema
          .columns()
          .forEach(
              nestedField -> {
                if (pkSet.contains(nestedField.name())) {
                  columnsWithPk.add(nestedField.asRequired());
                  identifierFieldIds.add(nestedField.fieldId());
                } else {
                  columnsWithPk.add(nestedField);
                }
              });
      return new Schema(columnsWithPk, identifierFieldIds);
    }
    return convertSchema;
  }

  @Override
  public Table alterTable(Identifier ident, TableChange... changes) throws NoSuchTableException {
    TableIdentifier identifier = buildIdentifier(ident);
    MixedTable table;
    try {
      table = catalog.loadTable(identifier);
    } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
      throw new NoSuchTableException(ident);
    }
    if (table.isUnkeyedTable()) {
      alterUnKeyedTable(table.asUnkeyedTable(), changes);
      return MixedSparkTable.ofMixedTable(table, catalog, name());
    } else if (table.isKeyedTable()) {
      alterKeyedTable(table.asKeyedTable(), changes);
      return MixedSparkTable.ofMixedTable(table, catalog, name());
    }
    throw new UnsupportedOperationException("Unsupported alter table");
  }

  private void alterKeyedTable(KeyedTable table, TableChange... changes) {
    List<TableChange> schemaChanges = new ArrayList<>();
    List<TableChange> propertyChanges = new ArrayList<>();
    for (TableChange change : changes) {
      if (change instanceof ColumnChange) {
        schemaChanges.add(change);
      } else if (change instanceof SetProperty) {
        propertyChanges.add(change);
      } else if (change instanceof RemoveProperty) {
        propertyChanges.add(change);
      } else {
        throw new UnsupportedOperationException("Cannot apply unknown table change: " + change);
      }
    }
    commitKeyedChanges(table, schemaChanges, propertyChanges);
  }

  private void commitKeyedChanges(
      KeyedTable table, List<TableChange> schemaChanges, List<TableChange> propertyChanges) {
    if (!schemaChanges.isEmpty()) {
      Spark3Util.applySchemaChanges(table.updateSchema(), schemaChanges).commit();
    }

    if (!propertyChanges.isEmpty()) {
      Spark3Util.applyPropertyChanges(table.updateProperties(), propertyChanges).commit();
    }
  }

  private void alterUnKeyedTable(UnkeyedTable table, TableChange... changes) {
    SetProperty setLocation = null;
    SetProperty setSnapshotId = null;
    SetProperty pickSnapshotId = null;
    List<TableChange> propertyChanges = new ArrayList<>();
    List<TableChange> schemaChanges = new ArrayList<>();

    for (TableChange change : changes) {
      if (change instanceof SetProperty) {
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



