amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/main/java/org/apache/amoro/spark/util/MixedFormatSparkUtils.java [96:149]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  public static ClusteredDistribution buildRequiredDistribution(MixedSparkTable mixedSparkTable) {
    // Fallback to use distribution mode parsed from table properties .
    String modeName =
        PropertyUtil.propertyAsString(
            mixedSparkTable.properties(), WRITE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_DEFAULT);
    DistributionMode writeMode = DistributionMode.fromName(modeName);
    switch (writeMode) {
      case NONE:
        return null;

      case HASH:
        DistributionHashMode distributionHashMode =
            DistributionHashMode.valueOfDesc(
                mixedSparkTable
                    .properties()
                    .getOrDefault(
                        TableProperties.WRITE_DISTRIBUTION_HASH_MODE,
                        TableProperties.WRITE_DISTRIBUTION_HASH_MODE_DEFAULT));
        List<Transform> transforms = new ArrayList<>();
        if (DistributionHashMode.AUTO.equals(distributionHashMode)) {
          distributionHashMode =
              DistributionHashMode.autoSelect(
                  mixedSparkTable.table().isKeyedTable(),
                  !mixedSparkTable.table().spec().isUnpartitioned());
        }
        if (distributionHashMode.isSupportPrimaryKey()) {
          Transform transform =
              toTransformsFromPrimary(
                  mixedSparkTable, mixedSparkTable.table().asKeyedTable().primaryKeySpec());
          transforms.add(transform);
          if (distributionHashMode.isSupportPartition()) {
            transforms.addAll(Arrays.asList(toTransforms(mixedSparkTable.table().spec())));
          }
          return Distributions.clustered(
              transforms.stream().filter(Objects::nonNull).toArray(Transform[]::new));
        } else {
          if (distributionHashMode.isSupportPartition()) {
            return Distributions.clustered(toTransforms(mixedSparkTable.table().spec()));
          } else {
            return null;
          }
        }

      case RANGE:
        LOG.warn(
            "Fallback to use 'none' distribution mode, because {}={} is not supported in spark now",
            WRITE_DISTRIBUTION_MODE,
            DistributionMode.RANGE.modeName());
        return null;

      default:
        throw new RuntimeException("Unrecognized write.distribution-mode: " + writeMode);
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



amoro-format-mixed/amoro-mixed-spark/v3.2/amoro-mixed-spark-3.2/src/main/java/org/apache/amoro/spark/util/MixedFormatSparkUtils.java [96:149]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  public static ClusteredDistribution buildRequiredDistribution(MixedSparkTable mixedSparkTable) {
    // Fallback to use distribution mode parsed from table properties .
    String modeName =
        PropertyUtil.propertyAsString(
            mixedSparkTable.properties(), WRITE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_DEFAULT);
    DistributionMode writeMode = DistributionMode.fromName(modeName);
    switch (writeMode) {
      case NONE:
        return null;

      case HASH:
        DistributionHashMode distributionHashMode =
            DistributionHashMode.valueOfDesc(
                mixedSparkTable
                    .properties()
                    .getOrDefault(
                        TableProperties.WRITE_DISTRIBUTION_HASH_MODE,
                        TableProperties.WRITE_DISTRIBUTION_HASH_MODE_DEFAULT));
        List<Transform> transforms = new ArrayList<>();
        if (DistributionHashMode.AUTO.equals(distributionHashMode)) {
          distributionHashMode =
              DistributionHashMode.autoSelect(
                  mixedSparkTable.table().isKeyedTable(),
                  !mixedSparkTable.table().spec().isUnpartitioned());
        }
        if (distributionHashMode.isSupportPrimaryKey()) {
          Transform transform =
              toTransformsFromPrimary(
                  mixedSparkTable, mixedSparkTable.table().asKeyedTable().primaryKeySpec());
          transforms.add(transform);
          if (distributionHashMode.isSupportPartition()) {
            transforms.addAll(Arrays.asList(toTransforms(mixedSparkTable.table().spec())));
          }
          return Distributions.clustered(
              transforms.stream().filter(Objects::nonNull).toArray(Transform[]::new));
        } else {
          if (distributionHashMode.isSupportPartition()) {
            return Distributions.clustered(toTransforms(mixedSparkTable.table().spec()));
          } else {
            return null;
          }
        }

      case RANGE:
        LOG.warn(
            "Fallback to use 'none' distribution mode, because {}={} is not supported in spark now",
            WRITE_DISTRIBUTION_MODE,
            DistributionMode.RANGE.modeName());
        return null;

      default:
        throw new RuntimeException("Unrecognized write.distribution-mode: " + writeMode);
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/java/org/apache/amoro/spark/util/MixedFormatSparkUtils.java [96:149]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  public static ClusteredDistribution buildRequiredDistribution(MixedSparkTable mixedSparkTable) {
    // Fallback to use distribution mode parsed from table properties .
    String modeName =
        PropertyUtil.propertyAsString(
            mixedSparkTable.properties(), WRITE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_DEFAULT);
    DistributionMode writeMode = DistributionMode.fromName(modeName);
    switch (writeMode) {
      case NONE:
        return null;

      case HASH:
        DistributionHashMode distributionHashMode =
            DistributionHashMode.valueOfDesc(
                mixedSparkTable
                    .properties()
                    .getOrDefault(
                        TableProperties.WRITE_DISTRIBUTION_HASH_MODE,
                        TableProperties.WRITE_DISTRIBUTION_HASH_MODE_DEFAULT));
        List<Transform> transforms = new ArrayList<>();
        if (DistributionHashMode.AUTO.equals(distributionHashMode)) {
          distributionHashMode =
              DistributionHashMode.autoSelect(
                  mixedSparkTable.table().isKeyedTable(),
                  !mixedSparkTable.table().spec().isUnpartitioned());
        }
        if (distributionHashMode.isSupportPrimaryKey()) {
          Transform transform =
              toTransformsFromPrimary(
                  mixedSparkTable, mixedSparkTable.table().asKeyedTable().primaryKeySpec());
          transforms.add(transform);
          if (distributionHashMode.isSupportPartition()) {
            transforms.addAll(Arrays.asList(toTransforms(mixedSparkTable.table().spec())));
          }
          return Distributions.clustered(
              transforms.stream().filter(Objects::nonNull).toArray(Transform[]::new));
        } else {
          if (distributionHashMode.isSupportPartition()) {
            return Distributions.clustered(toTransforms(mixedSparkTable.table().spec()));
          } else {
            return null;
          }
        }

      case RANGE:
        LOG.warn(
            "Fallback to use 'none' distribution mode, because {}={} is not supported in spark now",
            WRITE_DISTRIBUTION_MODE,
            DistributionMode.RANGE.modeName());
        return null;

      default:
        throw new RuntimeException("Unrecognized write.distribution-mode: " + writeMode);
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



