amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/java/org/apache/amoro/spark/util/MixedFormatSparkUtils.java [61:193]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public class MixedFormatSparkUtils {
  private static final Logger LOG = LoggerFactory.getLogger(MixedFormatSparkUtils.class);

  public static TableCatalogAndIdentifier tableCatalogAndIdentifier(
      SparkSession spark, List<String> nameParts) {
    Spark3Util.CatalogAndIdentifier catalogAndIdentifier =
        Spark3Util.catalogAndIdentifier(
            spark, nameParts, spark.sessionState().catalogManager().currentCatalog());
    CatalogPlugin catalog = catalogAndIdentifier.catalog();
    Preconditions.checkArgument(
        catalog instanceof TableCatalog,
        "Cannot resolver name-parts %s to catalog and identifier, %s is not a table catalog",
        Joiner.on(',').join(nameParts),
        catalog.name());
    return new TableCatalogAndIdentifier((TableCatalog) catalog, catalogAndIdentifier.identifier());
  }

  public static class TableCatalogAndIdentifier {
    TableCatalog tableCatalog;
    Identifier identifier;

    public TableCatalogAndIdentifier(TableCatalog tableCatalog, Identifier identifier) {
      this.tableCatalog = tableCatalog;
      this.identifier = identifier;
    }

    public TableCatalog catalog() {
      return this.tableCatalog;
    }

    public Identifier identifier() {
      return this.identifier;
    }
  }

  public static ClusteredDistribution buildRequiredDistribution(MixedSparkTable mixedSparkTable) {
    // Fallback to use distribution mode parsed from table properties .
    String modeName =
        PropertyUtil.propertyAsString(
            mixedSparkTable.properties(), WRITE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_DEFAULT);
    DistributionMode writeMode = DistributionMode.fromName(modeName);
    switch (writeMode) {
      case NONE:
        return null;

      case HASH:
        DistributionHashMode distributionHashMode =
            DistributionHashMode.valueOfDesc(
                mixedSparkTable
                    .properties()
                    .getOrDefault(
                        TableProperties.WRITE_DISTRIBUTION_HASH_MODE,
                        TableProperties.WRITE_DISTRIBUTION_HASH_MODE_DEFAULT));
        List<Transform> transforms = new ArrayList<>();
        if (DistributionHashMode.AUTO.equals(distributionHashMode)) {
          distributionHashMode =
              DistributionHashMode.autoSelect(
                  mixedSparkTable.table().isKeyedTable(),
                  !mixedSparkTable.table().spec().isUnpartitioned());
        }
        if (distributionHashMode.isSupportPrimaryKey()) {
          Transform transform =
              toTransformsFromPrimary(
                  mixedSparkTable, mixedSparkTable.table().asKeyedTable().primaryKeySpec());
          transforms.add(transform);
          if (distributionHashMode.isSupportPartition()) {
            transforms.addAll(Arrays.asList(toTransforms(mixedSparkTable.table().spec())));
          }
          return Distributions.clustered(
              transforms.stream().filter(Objects::nonNull).toArray(Transform[]::new));
        } else {
          if (distributionHashMode.isSupportPartition()) {
            return Distributions.clustered(toTransforms(mixedSparkTable.table().spec()));
          } else {
            return null;
          }
        }

      case RANGE:
        LOG.warn(
            "Fallback to use 'none' distribution mode, because {}={} is not supported in spark now",
            WRITE_DISTRIBUTION_MODE,
            DistributionMode.RANGE.modeName());
        return null;

      default:
        throw new RuntimeException("Unrecognized write.distribution-mode: " + writeMode);
    }
  }

  private static Transform toTransformsFromPrimary(
      MixedSparkTable mixedSparkTable, PrimaryKeySpec primaryKeySpec) {
    int numBucket =
        PropertyUtil.propertyAsInt(
            mixedSparkTable.properties(),
            TableProperties.BASE_FILE_INDEX_HASH_BUCKET,
            TableProperties.BASE_FILE_INDEX_HASH_BUCKET_DEFAULT);
    return Expressions.bucket(numBucket, primaryKeySpec.fieldNames().get(0));
  }

  public static Object convertConstant(Type type, Object value) {
    if (value == null) {
      return null;
    }

    switch (type.typeId()) {
      case DECIMAL:
        return Decimal.apply((BigDecimal) value);
      case STRING:
        if (value instanceof Utf8) {
          Utf8 utf8 = (Utf8) value;
          return UTF8String.fromBytes(utf8.getBytes(), 0, utf8.getByteLength());
        }
        return UTF8String.fromString(value.toString());
      case FIXED:
        if (value instanceof byte[]) {
          return value;
        } else if (value instanceof GenericData.Fixed) {
          return ((GenericData.Fixed) value).bytes();
        }
        return ByteBuffers.toByteArray((ByteBuffer) value);
      case BINARY:
        return ByteBuffers.toByteArray((ByteBuffer) value);
      default:
    }
    return value;
  }

  public static String mixedTableProvider(MixedTable table) {
    if (table.format().in(TableFormat.MIXED_HIVE, TableFormat.MIXED_ICEBERG)) {
      return table.format().name().toLowerCase(Locale.ROOT);
    } else {
      throw new IllegalArgumentException("Not a mixed-format table:" + table.format());
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/main/java/org/apache/amoro/spark/util/MixedFormatSparkUtils.java [61:193]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public class MixedFormatSparkUtils {
  private static final Logger LOG = LoggerFactory.getLogger(MixedFormatSparkUtils.class);

  public static TableCatalogAndIdentifier tableCatalogAndIdentifier(
      SparkSession spark, List<String> nameParts) {
    Spark3Util.CatalogAndIdentifier catalogAndIdentifier =
        Spark3Util.catalogAndIdentifier(
            spark, nameParts, spark.sessionState().catalogManager().currentCatalog());
    CatalogPlugin catalog = catalogAndIdentifier.catalog();
    Preconditions.checkArgument(
        catalog instanceof TableCatalog,
        "Cannot resolver name-parts %s to catalog and identifier, %s is not a table catalog",
        Joiner.on(',').join(nameParts),
        catalog.name());
    return new TableCatalogAndIdentifier((TableCatalog) catalog, catalogAndIdentifier.identifier());
  }

  public static class TableCatalogAndIdentifier {
    TableCatalog tableCatalog;
    Identifier identifier;

    public TableCatalogAndIdentifier(TableCatalog tableCatalog, Identifier identifier) {
      this.tableCatalog = tableCatalog;
      this.identifier = identifier;
    }

    public TableCatalog catalog() {
      return this.tableCatalog;
    }

    public Identifier identifier() {
      return this.identifier;
    }
  }

  public static ClusteredDistribution buildRequiredDistribution(MixedSparkTable mixedSparkTable) {
    // Fallback to use distribution mode parsed from table properties .
    String modeName =
        PropertyUtil.propertyAsString(
            mixedSparkTable.properties(), WRITE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_DEFAULT);
    DistributionMode writeMode = DistributionMode.fromName(modeName);
    switch (writeMode) {
      case NONE:
        return null;

      case HASH:
        DistributionHashMode distributionHashMode =
            DistributionHashMode.valueOfDesc(
                mixedSparkTable
                    .properties()
                    .getOrDefault(
                        TableProperties.WRITE_DISTRIBUTION_HASH_MODE,
                        TableProperties.WRITE_DISTRIBUTION_HASH_MODE_DEFAULT));
        List<Transform> transforms = new ArrayList<>();
        if (DistributionHashMode.AUTO.equals(distributionHashMode)) {
          distributionHashMode =
              DistributionHashMode.autoSelect(
                  mixedSparkTable.table().isKeyedTable(),
                  !mixedSparkTable.table().spec().isUnpartitioned());
        }
        if (distributionHashMode.isSupportPrimaryKey()) {
          Transform transform =
              toTransformsFromPrimary(
                  mixedSparkTable, mixedSparkTable.table().asKeyedTable().primaryKeySpec());
          transforms.add(transform);
          if (distributionHashMode.isSupportPartition()) {
            transforms.addAll(Arrays.asList(toTransforms(mixedSparkTable.table().spec())));
          }
          return Distributions.clustered(
              transforms.stream().filter(Objects::nonNull).toArray(Transform[]::new));
        } else {
          if (distributionHashMode.isSupportPartition()) {
            return Distributions.clustered(toTransforms(mixedSparkTable.table().spec()));
          } else {
            return null;
          }
        }

      case RANGE:
        LOG.warn(
            "Fallback to use 'none' distribution mode, because {}={} is not supported in spark now",
            WRITE_DISTRIBUTION_MODE,
            DistributionMode.RANGE.modeName());
        return null;

      default:
        throw new RuntimeException("Unrecognized write.distribution-mode: " + writeMode);
    }
  }

  private static Transform toTransformsFromPrimary(
      MixedSparkTable mixedSparkTable, PrimaryKeySpec primaryKeySpec) {
    int numBucket =
        PropertyUtil.propertyAsInt(
            mixedSparkTable.properties(),
            TableProperties.BASE_FILE_INDEX_HASH_BUCKET,
            TableProperties.BASE_FILE_INDEX_HASH_BUCKET_DEFAULT);
    return Expressions.bucket(numBucket, primaryKeySpec.fieldNames().get(0));
  }

  public static Object convertConstant(Type type, Object value) {
    if (value == null) {
      return null;
    }

    switch (type.typeId()) {
      case DECIMAL:
        return Decimal.apply((BigDecimal) value);
      case STRING:
        if (value instanceof Utf8) {
          Utf8 utf8 = (Utf8) value;
          return UTF8String.fromBytes(utf8.getBytes(), 0, utf8.getByteLength());
        }
        return UTF8String.fromString(value.toString());
      case FIXED:
        if (value instanceof byte[]) {
          return value;
        } else if (value instanceof GenericData.Fixed) {
          return ((GenericData.Fixed) value).bytes();
        }
        return ByteBuffers.toByteArray((ByteBuffer) value);
      case BINARY:
        return ByteBuffers.toByteArray((ByteBuffer) value);
      default:
    }
    return value;
  }

  public static String mixedTableProvider(MixedTable table) {
    if (table.format().in(TableFormat.MIXED_HIVE, TableFormat.MIXED_ICEBERG)) {
      return table.format().name().toLowerCase(Locale.ROOT);
    } else {
      throw new IllegalArgumentException("Not a mixed-format table:" + table.format());
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



