amoro-format-mixed/amoro-mixed-spark/v3.2/amoro-mixed-spark-3.2/src/main/java/org/apache/amoro/spark/command/MigrateToMixedFormatCommand.java [119:346]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  @Override
  public StructType outputType() {
    return OUTPUT_TYPE;
  }

  @Override
  public Row[] execute() throws AnalysisException {
    List<DataFile> dataFiles;
    TableIdentifier ident;
    PartitionSpec spec;
    Schema schema;
    LOG.info(
        "start to migrate {} to {}, using temp backup table {}",
        sourceIdentifier,
        targetIdentifier,
        backupV1TableIdentifier);
    V1Table sourceTable = loadV1Table(sourceCatalog, backupV1TableIdentifier);
    ident =
        new TableIdentifier(
            backupV1TableIdentifier.name(), Some.apply(backupV1TableIdentifier.namespace()[0]));
    dataFiles = loadDataFiles(ident);
    UnkeyedTable table = createUnkeyedTable(sourceTable);

    spec = table.spec();

    AppendFiles appendFiles = table.newAppend();
    dataFiles.forEach(appendFiles::appendFile);
    appendFiles.commit();

    LOG.info(
        "migrate table {} finished, remove metadata of backup {} table",
        targetIdentifier,
        backupV1TableIdentifier);

    if (PartitionSpec.unpartitioned().equals(spec)) {
      return new Row[] {RowFactory.create("ALL", dataFiles.size())};
    }

    Map<String, List<DataFile>> partitions = Maps.newHashMap();
    dataFiles.forEach(
        d -> {
          String partition = spec.partitionToPath(d.partition());
          List<DataFile> df = partitions.computeIfAbsent(partition, p -> Lists.newArrayList());
          df.add(d);
        });
    return partitions.keySet().stream()
        .sorted()
        .map(p -> RowFactory.create(p, partitions.get(p).size()))
        .toArray(Row[]::new);
  }

  private List<DataFile> loadDataFiles(TableIdentifier ident) throws AnalysisException {
    PartitionSpec spec =
        SparkSchemaUtil.specForTable(spark, ident.database().get() + "." + ident.table());

    if (spec.equals(PartitionSpec.unpartitioned())) {
      return listUnPartitionedSparkTable(spark, ident);
    } else {
      List<SparkTableUtil.SparkPartition> sparkPartitions =
          SparkTableUtil.getPartitions(spark, ident, Maps.newHashMap());
      Preconditions.checkArgument(
          !sparkPartitions.isEmpty(), "Cannot find any partitions in table %s", ident);
      return listPartitionDataFiles(spark, sparkPartitions, spec);
    }
  }

  private UnkeyedTable createUnkeyedTable(V1Table sourceTable)
      throws TableAlreadyExistsException, NoSuchNamespaceException {
    Map<String, String> properties = Maps.newHashMap();
    properties.putAll(sourceTable.properties());
    EXCLUDED_PROPERTIES.forEach(properties::remove);
    properties.put(TableCatalog.PROP_PROVIDER, "arctic");
    properties.put("migrated", "true");

    StructType schema = sourceTable.schema();
    Transform[] partitions = sourceTable.partitioning();
    boolean threw = true;
    Table table = null;
    try {
      table = targetCatalog.createTable(targetIdentifier, schema, partitions, properties);
      if (table instanceof UnkeyedSparkTable) {
        threw = false;
        return ((UnkeyedSparkTable) table).table();
      } else if (table instanceof MixedSparkTable) {
        threw = false;
        return ((MixedSparkTable) table).table().asUnkeyedTable();
      }
      throw new IllegalStateException("target table must be un-keyed table");
    } finally {
      if (threw && table != null) {
        try {
          targetCatalog.dropTable(targetIdentifier);
        } catch (Exception e) {
          LOG.warn("error when rollback table", e);
        }
      }
    }
  }

  private static V1Table loadV1Table(TableCatalog catalog, Identifier identifier)
      throws NoSuchTableException {
    Table table = catalog.loadTable(identifier);
    Preconditions.checkArgument(table instanceof V1Table, "source table must be V1Table");
    return (V1Table) table;
  }

  private static List<DataFile> listUnPartitionedSparkTable(
      SparkSession spark, TableIdentifier sourceTableIdent)
      throws NoSuchDatabaseException, NoSuchTableException {
    CatalogTable sourceTable = spark.sessionState().catalog().getTableMetadata(sourceTableIdent);
    Option<String> format =
        sourceTable.storage().serde().nonEmpty()
            ? sourceTable.storage().serde()
            : sourceTable.provider();
    Preconditions.checkArgument(format.nonEmpty(), "Could not determine table format");

    Map<String, String> partition = Collections.emptyMap();
    PartitionSpec spec = PartitionSpec.unpartitioned();
    Configuration conf = spark.sessionState().newHadoopConf();
    MetricsConfig metricsConfig = MetricsConfig.getDefault();
    return TableMigrationUtil.listPartition(
        partition,
        Util.uriToString(sourceTable.location()),
        format.get(),
        spec,
        conf,
        metricsConfig,
        null);
  }

  private static List<DataFile> listPartitionDataFiles(
      SparkSession spark, List<SparkTableUtil.SparkPartition> partitions, PartitionSpec spec) {

    Configuration conf = spark.sessionState().newHadoopConf();
    MetricsConfig metricsConfig = MetricsConfig.getDefault();

    return partitions.stream()
        .map(
            p ->
                TableMigrationUtil.listPartition(
                    p.getValues(), p.getUri(), p.getFormat(), spec, conf, metricsConfig, null))
        .flatMap(Collection::stream)
        .collect(Collectors.toList());
  }

  public static Builder newBuilder(SparkSession spark) {
    return new Builder(spark);
  }

  public static class Builder {

    List<String> source;
    List<String> target;

    SparkSession spark;

    private Builder(SparkSession spark) {
      this.spark = spark;
    }

    public Builder withSource(List<String> source) {
      this.source = source;
      return this;
    }

    public Builder withTarget(List<String> target) {
      this.target = target;
      return this;
    }

    public MigrateToMixedFormatCommand build() throws NoSuchTableException {
      MixedFormatSparkUtils.TableCatalogAndIdentifier tableCatalogAndIdentifier =
          MixedFormatSparkUtils.tableCatalogAndIdentifier(spark, source);
      TableCatalog sourceCatalog = tableCatalogAndIdentifier.catalog();
      Identifier sourceTableIdentifier = tableCatalogAndIdentifier.identifier();

      checkSourceCatalogAndTable(sourceCatalog, sourceTableIdentifier);

      tableCatalogAndIdentifier = MixedFormatSparkUtils.tableCatalogAndIdentifier(spark, target);
      TableCatalog targetCatalog = tableCatalogAndIdentifier.catalog();
      Identifier targetTableIdentifier = tableCatalogAndIdentifier.identifier();

      checkTargetCatalog(targetCatalog);
      checkTargetTable(targetCatalog, targetTableIdentifier);

      return new MigrateToMixedFormatCommand(
          sourceCatalog, sourceTableIdentifier, targetCatalog, targetTableIdentifier, spark);
    }

    private void checkSourceCatalogAndTable(TableCatalog catalog, Identifier identifier)
        throws NoSuchTableException {
      Preconditions.checkArgument(
          catalog.name().equalsIgnoreCase(CatalogManager.SESSION_CATALOG_NAME()),
          "source table must in session catalog, current table is %s",
          catalog.name());

      Preconditions.checkArgument(
          catalog.tableExists(identifier),
          "source table %s does not exist in catalog %s",
          Joiner.on(".").join(identifier.namespace()),
          catalog.name());
      loadV1Table(catalog, identifier);
    }

    private void checkTargetCatalog(TableCatalog catalog) {
      Preconditions.checkArgument(
          catalog instanceof MixedFormatSparkCatalog
              || catalog instanceof MixedFormatSparkSessionCatalog,
          "target catalog must be %s",
          MixedFormatSparkCatalog.class.getName());
    }

    private void checkTargetTable(TableCatalog catalog, Identifier identifier) {
      Preconditions.checkArgument(
          catalog instanceof SupportsNamespaces, "The target catalog must support namespace");
      Preconditions.checkArgument(
          ((SupportsNamespaces) catalog).namespaceExists(identifier.namespace()),
          "database %s does not exist in catalog %s",
          Joiner.on(".").join(identifier.namespace()),
          catalog.name());

      List<String> nameParts = Lists.newArrayList(identifier.namespace());
      nameParts.add(identifier.name());
      Preconditions.checkArgument(
          !catalog.tableExists(identifier),
          "target table %s already exist in catalog %s",
          Joiner.on(".").join(nameParts),
          catalog.name());
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/main/java/org/apache/amoro/spark/command/MigrateToMixedFormatCommand.java [119:346]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  @Override
  public StructType outputType() {
    return OUTPUT_TYPE;
  }

  @Override
  public Row[] execute() throws AnalysisException {
    List<DataFile> dataFiles;
    TableIdentifier ident;
    PartitionSpec spec;
    Schema schema;
    LOG.info(
        "start to migrate {} to {}, using temp backup table {}",
        sourceIdentifier,
        targetIdentifier,
        backupV1TableIdentifier);
    V1Table sourceTable = loadV1Table(sourceCatalog, backupV1TableIdentifier);
    ident =
        new TableIdentifier(
            backupV1TableIdentifier.name(), Some.apply(backupV1TableIdentifier.namespace()[0]));
    dataFiles = loadDataFiles(ident);
    UnkeyedTable table = createUnkeyedTable(sourceTable);

    spec = table.spec();

    AppendFiles appendFiles = table.newAppend();
    dataFiles.forEach(appendFiles::appendFile);
    appendFiles.commit();

    LOG.info(
        "migrate table {} finished, remove metadata of backup {} table",
        targetIdentifier,
        backupV1TableIdentifier);

    if (PartitionSpec.unpartitioned().equals(spec)) {
      return new Row[] {RowFactory.create("ALL", dataFiles.size())};
    }

    Map<String, List<DataFile>> partitions = Maps.newHashMap();
    dataFiles.forEach(
        d -> {
          String partition = spec.partitionToPath(d.partition());
          List<DataFile> df = partitions.computeIfAbsent(partition, p -> Lists.newArrayList());
          df.add(d);
        });
    return partitions.keySet().stream()
        .sorted()
        .map(p -> RowFactory.create(p, partitions.get(p).size()))
        .toArray(Row[]::new);
  }

  private List<DataFile> loadDataFiles(TableIdentifier ident) throws AnalysisException {
    PartitionSpec spec =
        SparkSchemaUtil.specForTable(spark, ident.database().get() + "." + ident.table());

    if (spec.equals(PartitionSpec.unpartitioned())) {
      return listUnPartitionedSparkTable(spark, ident);
    } else {
      List<SparkTableUtil.SparkPartition> sparkPartitions =
          SparkTableUtil.getPartitions(spark, ident, Maps.newHashMap());
      Preconditions.checkArgument(
          !sparkPartitions.isEmpty(), "Cannot find any partitions in table %s", ident);
      return listPartitionDataFiles(spark, sparkPartitions, spec);
    }
  }

  private UnkeyedTable createUnkeyedTable(V1Table sourceTable)
      throws TableAlreadyExistsException, NoSuchNamespaceException {
    Map<String, String> properties = Maps.newHashMap();
    properties.putAll(sourceTable.properties());
    EXCLUDED_PROPERTIES.forEach(properties::remove);
    properties.put(TableCatalog.PROP_PROVIDER, "arctic");
    properties.put("migrated", "true");

    StructType schema = sourceTable.schema();
    Transform[] partitions = sourceTable.partitioning();
    boolean threw = true;
    Table table = null;
    try {
      table = targetCatalog.createTable(targetIdentifier, schema, partitions, properties);
      if (table instanceof UnkeyedSparkTable) {
        threw = false;
        return ((UnkeyedSparkTable) table).table();
      } else if (table instanceof MixedSparkTable) {
        threw = false;
        return ((MixedSparkTable) table).table().asUnkeyedTable();
      }
      throw new IllegalStateException("target table must be un-keyed table");
    } finally {
      if (threw && table != null) {
        try {
          targetCatalog.dropTable(targetIdentifier);
        } catch (Exception e) {
          LOG.warn("error when rollback table", e);
        }
      }
    }
  }

  private static V1Table loadV1Table(TableCatalog catalog, Identifier identifier)
      throws NoSuchTableException {
    Table table = catalog.loadTable(identifier);
    Preconditions.checkArgument(table instanceof V1Table, "source table must be V1Table");
    return (V1Table) table;
  }

  private static List<DataFile> listUnPartitionedSparkTable(
      SparkSession spark, TableIdentifier sourceTableIdent)
      throws NoSuchDatabaseException, NoSuchTableException {
    CatalogTable sourceTable = spark.sessionState().catalog().getTableMetadata(sourceTableIdent);
    Option<String> format =
        sourceTable.storage().serde().nonEmpty()
            ? sourceTable.storage().serde()
            : sourceTable.provider();
    Preconditions.checkArgument(format.nonEmpty(), "Could not determine table format");

    Map<String, String> partition = Collections.emptyMap();
    PartitionSpec spec = PartitionSpec.unpartitioned();
    Configuration conf = spark.sessionState().newHadoopConf();
    MetricsConfig metricsConfig = MetricsConfig.getDefault();
    return TableMigrationUtil.listPartition(
        partition,
        Util.uriToString(sourceTable.location()),
        format.get(),
        spec,
        conf,
        metricsConfig,
        null);
  }

  private static List<DataFile> listPartitionDataFiles(
      SparkSession spark, List<SparkTableUtil.SparkPartition> partitions, PartitionSpec spec) {

    Configuration conf = spark.sessionState().newHadoopConf();
    MetricsConfig metricsConfig = MetricsConfig.getDefault();

    return partitions.stream()
        .map(
            p ->
                TableMigrationUtil.listPartition(
                    p.getValues(), p.getUri(), p.getFormat(), spec, conf, metricsConfig, null))
        .flatMap(Collection::stream)
        .collect(Collectors.toList());
  }

  public static Builder newBuilder(SparkSession spark) {
    return new Builder(spark);
  }

  public static class Builder {

    List<String> source;
    List<String> target;

    SparkSession spark;

    private Builder(SparkSession spark) {
      this.spark = spark;
    }

    public Builder withSource(List<String> source) {
      this.source = source;
      return this;
    }

    public Builder withTarget(List<String> target) {
      this.target = target;
      return this;
    }

    public MigrateToMixedFormatCommand build() throws NoSuchTableException {
      MixedFormatSparkUtils.TableCatalogAndIdentifier tableCatalogAndIdentifier =
          MixedFormatSparkUtils.tableCatalogAndIdentifier(spark, source);
      TableCatalog sourceCatalog = tableCatalogAndIdentifier.catalog();
      Identifier sourceTableIdentifier = tableCatalogAndIdentifier.identifier();

      checkSourceCatalogAndTable(sourceCatalog, sourceTableIdentifier);

      tableCatalogAndIdentifier = MixedFormatSparkUtils.tableCatalogAndIdentifier(spark, target);
      TableCatalog targetCatalog = tableCatalogAndIdentifier.catalog();
      Identifier targetTableIdentifier = tableCatalogAndIdentifier.identifier();

      checkTargetCatalog(targetCatalog);
      checkTargetTable(targetCatalog, targetTableIdentifier);

      return new MigrateToMixedFormatCommand(
          sourceCatalog, sourceTableIdentifier, targetCatalog, targetTableIdentifier, spark);
    }

    private void checkSourceCatalogAndTable(TableCatalog catalog, Identifier identifier)
        throws NoSuchTableException {
      Preconditions.checkArgument(
          catalog.name().equalsIgnoreCase(CatalogManager.SESSION_CATALOG_NAME()),
          "source table must in session catalog, current table is %s",
          catalog.name());

      Preconditions.checkArgument(
          catalog.tableExists(identifier),
          "source table %s does not exist in catalog %s",
          Joiner.on(".").join(identifier.namespace()),
          catalog.name());
      loadV1Table(catalog, identifier);
    }

    private void checkTargetCatalog(TableCatalog catalog) {
      Preconditions.checkArgument(
          catalog instanceof MixedFormatSparkCatalog
              || catalog instanceof MixedFormatSparkSessionCatalog,
          "target catalog must be %s",
          MixedFormatSparkCatalog.class.getName());
    }

    private void checkTargetTable(TableCatalog catalog, Identifier identifier) {
      Preconditions.checkArgument(
          catalog instanceof SupportsNamespaces, "The target catalog must support namespace");
      Preconditions.checkArgument(
          ((SupportsNamespaces) catalog).namespaceExists(identifier.namespace()),
          "database %s does not exist in catalog %s",
          Joiner.on(".").join(identifier.namespace()),
          catalog.name());

      List<String> nameParts = Lists.newArrayList(identifier.namespace());
      nameParts.add(identifier.name());
      Preconditions.checkArgument(
          !catalog.tableExists(identifier),
          "target table %s already exist in catalog %s",
          Joiner.on(".").join(nameParts),
          catalog.name());
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



