spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java [310:689]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
      SparkPartition partition,
      PartitionSpec spec,
      SerializableConfiguration conf,
      MetricsConfig metricsConfig,
      NameMapping mapping) {
    return TableMigrationUtil.listPartition(
        partition.values,
        partition.uri,
        partition.format,
        spec,
        conf.get(),
        metricsConfig,
        mapping);
  }

  private static SparkPartition toSparkPartition(
      CatalogTablePartition partition, CatalogTable table) {
    Option<URI> locationUri = partition.storage().locationUri();
    Option<String> serde = partition.storage().serde();

    Preconditions.checkArgument(locationUri.nonEmpty(), "Partition URI should be defined");
    Preconditions.checkArgument(
        serde.nonEmpty() || table.provider().nonEmpty(), "Partition format should be defined");

    String uri = Util.uriToString(locationUri.get());
    String format = serde.nonEmpty() ? serde.get() : table.provider().get();

    Map<String, String> partitionSpec =
        JavaConverters.mapAsJavaMapConverter(partition.spec()).asJava();
    return new SparkPartition(partitionSpec, uri, format);
  }

  private static Expression resolveAttrs(SparkSession spark, String table, Expression expr) {
    Function2<String, String, Object> resolver = spark.sessionState().analyzer().resolver();
    LogicalPlan plan = spark.table(table).queryExecution().analyzed();
    return expr.transform(
        new AbstractPartialFunction<Expression, Expression>() {
          @Override
          public Expression apply(Expression attr) {
            UnresolvedAttribute unresolvedAttribute = (UnresolvedAttribute) attr;
            Option<NamedExpression> namedExpressionOption =
                plan.resolve(unresolvedAttribute.nameParts(), resolver);
            if (namedExpressionOption.isDefined()) {
              return (Expression) namedExpressionOption.get();
            } else {
              throw new IllegalArgumentException(
                  String.format("Could not resolve %s using columns: %s", attr, plan.output()));
            }
          }

          @Override
          public boolean isDefinedAt(Expression attr) {
            return attr instanceof UnresolvedAttribute;
          }
        });
  }

  private static Iterator<ManifestFile> buildManifest(
      SerializableConfiguration conf,
      PartitionSpec spec,
      String basePath,
      Iterator<Tuple2<String, DataFile>> fileTuples) {
    if (fileTuples.hasNext()) {
      FileIO io = new HadoopFileIO(conf.get());
      TaskContext ctx = TaskContext.get();
      String suffix =
          String.format(
              "stage-%d-task-%d-manifest-%s",
              ctx.stageId(), ctx.taskAttemptId(), UUID.randomUUID());
      Path location = new Path(basePath, suffix);
      String outputPath = FileFormat.AVRO.addExtension(location.toString());
      OutputFile outputFile = io.newOutputFile(outputPath);
      ManifestWriter<DataFile> writer = ManifestFiles.write(spec, outputFile);

      try (ManifestWriter<DataFile> writerRef = writer) {
        fileTuples.forEachRemaining(fileTuple -> writerRef.add(fileTuple._2));
      } catch (IOException e) {
        throw SparkExceptionUtil.toUncheckedException(
            e, "Unable to close the manifest writer: %s", outputPath);
      }

      ManifestFile manifestFile = writer.toManifestFile();
      return ImmutableList.of(manifestFile).iterator();
    } else {
      return Collections.emptyIterator();
    }
  }

  /**
   * Import files from an existing Spark table to an Iceberg table.
   *
   * <p>The import uses the Spark session to get table metadata. It assumes no operation is going on
   * the original and target table and thus is not thread-safe.
   *
   * @param spark a Spark session
   * @param sourceTableIdent an identifier of the source Spark table
   * @param targetTable an Iceberg table where to import the data
   * @param stagingDir a staging directory to store temporary manifest files
   * @param partitionFilter only import partitions whose values match those in the map, can be
   *     partially defined
   * @param checkDuplicateFiles if true, throw exception if import results in a duplicate data file
   */
  public static void importSparkTable(
      SparkSession spark,
      TableIdentifier sourceTableIdent,
      Table targetTable,
      String stagingDir,
      Map<String, String> partitionFilter,
      boolean checkDuplicateFiles) {
    SessionCatalog catalog = spark.sessionState().catalog();

    String db =
        sourceTableIdent.database().nonEmpty()
            ? sourceTableIdent.database().get()
            : catalog.getCurrentDatabase();
    TableIdentifier sourceTableIdentWithDB =
        new TableIdentifier(sourceTableIdent.table(), Some.apply(db));

    if (!catalog.tableExists(sourceTableIdentWithDB)) {
      throw new org.apache.iceberg.exceptions.NoSuchTableException(
          "Table %s does not exist", sourceTableIdentWithDB);
    }

    try {
      PartitionSpec spec =
          SparkSchemaUtil.specForTable(spark, sourceTableIdentWithDB.unquotedString());

      if (Objects.equal(spec, PartitionSpec.unpartitioned())) {
        importUnpartitionedSparkTable(
            spark, sourceTableIdentWithDB, targetTable, checkDuplicateFiles);
      } else {
        List<SparkPartition> sourceTablePartitions =
            getPartitions(spark, sourceTableIdent, partitionFilter);
        if (sourceTablePartitions.isEmpty()) {
          targetTable.newAppend().commit();
        } else {
          importSparkPartitions(
              spark, sourceTablePartitions, targetTable, spec, stagingDir, checkDuplicateFiles);
        }
      }
    } catch (AnalysisException e) {
      throw SparkExceptionUtil.toUncheckedException(
          e, "Unable to get partition spec for table: %s", sourceTableIdentWithDB);
    }
  }

  /**
   * Import files from an existing Spark table to an Iceberg table.
   *
   * <p>The import uses the Spark session to get table metadata. It assumes no operation is going on
   * the original and target table and thus is not thread-safe.
   *
   * @param spark a Spark session
   * @param sourceTableIdent an identifier of the source Spark table
   * @param targetTable an Iceberg table where to import the data
   * @param stagingDir a staging directory to store temporary manifest files
   * @param checkDuplicateFiles if true, throw exception if import results in a duplicate data file
   */
  public static void importSparkTable(
      SparkSession spark,
      TableIdentifier sourceTableIdent,
      Table targetTable,
      String stagingDir,
      boolean checkDuplicateFiles) {
    importSparkTable(
        spark,
        sourceTableIdent,
        targetTable,
        stagingDir,
        Collections.emptyMap(),
        checkDuplicateFiles);
  }

  /**
   * Import files from an existing Spark table to an Iceberg table.
   *
   * <p>The import uses the Spark session to get table metadata. It assumes no operation is going on
   * the original and target table and thus is not thread-safe.
   *
   * @param spark a Spark session
   * @param sourceTableIdent an identifier of the source Spark table
   * @param targetTable an Iceberg table where to import the data
   * @param stagingDir a staging directory to store temporary manifest files
   */
  public static void importSparkTable(
      SparkSession spark, TableIdentifier sourceTableIdent, Table targetTable, String stagingDir) {
    importSparkTable(
        spark, sourceTableIdent, targetTable, stagingDir, Collections.emptyMap(), false);
  }

  private static void importUnpartitionedSparkTable(
      SparkSession spark,
      TableIdentifier sourceTableIdent,
      Table targetTable,
      boolean checkDuplicateFiles) {
    try {
      CatalogTable sourceTable = spark.sessionState().catalog().getTableMetadata(sourceTableIdent);
      Option<String> format =
          sourceTable.storage().serde().nonEmpty()
              ? sourceTable.storage().serde()
              : sourceTable.provider();
      Preconditions.checkArgument(format.nonEmpty(), "Could not determine table format");

      Map<String, String> partition = Collections.emptyMap();
      PartitionSpec spec = PartitionSpec.unpartitioned();
      Configuration conf = spark.sessionState().newHadoopConf();
      MetricsConfig metricsConfig = MetricsConfig.forTable(targetTable);
      String nameMappingString = targetTable.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
      NameMapping nameMapping =
          nameMappingString != null ? NameMappingParser.fromJson(nameMappingString) : null;

      List<DataFile> files =
          TableMigrationUtil.listPartition(
              partition,
              Util.uriToString(sourceTable.location()),
              format.get(),
              spec,
              conf,
              metricsConfig,
              nameMapping);

      if (checkDuplicateFiles) {
        Dataset<Row> importedFiles =
            spark
                .createDataset(Lists.transform(files, f -> f.path().toString()), Encoders.STRING())
                .toDF("file_path");
        Dataset<Row> existingFiles =
            loadMetadataTable(spark, targetTable, MetadataTableType.ENTRIES).filter("status != 2");
        Column joinCond =
            existingFiles.col("data_file.file_path").equalTo(importedFiles.col("file_path"));
        Dataset<String> duplicates =
            importedFiles.join(existingFiles, joinCond).select("file_path").as(Encoders.STRING());
        Preconditions.checkState(
            duplicates.isEmpty(),
            String.format(
                DUPLICATE_FILE_MESSAGE, Joiner.on(",").join((String[]) duplicates.take(10))));
      }

      AppendFiles append = targetTable.newAppend();
      files.forEach(append::appendFile);
      append.commit();
    } catch (NoSuchDatabaseException e) {
      throw SparkExceptionUtil.toUncheckedException(
          e, "Unknown table: %s. Database not found in catalog.", sourceTableIdent);
    } catch (NoSuchTableException e) {
      throw SparkExceptionUtil.toUncheckedException(
          e, "Unknown table: %s. Table not found in catalog.", sourceTableIdent);
    }
  }

  /**
   * Import files from given partitions to an Iceberg table.
   *
   * @param spark a Spark session
   * @param partitions partitions to import
   * @param targetTable an Iceberg table where to import the data
   * @param spec a partition spec
   * @param stagingDir a staging directory to store temporary manifest files
   * @param checkDuplicateFiles if true, throw exception if import results in a duplicate data file
   */
  public static void importSparkPartitions(
      SparkSession spark,
      List<SparkPartition> partitions,
      Table targetTable,
      PartitionSpec spec,
      String stagingDir,
      boolean checkDuplicateFiles) {
    Configuration conf = spark.sessionState().newHadoopConf();
    SerializableConfiguration serializableConf = new SerializableConfiguration(conf);
    int parallelism =
        Math.min(
            partitions.size(), spark.sessionState().conf().parallelPartitionDiscoveryParallelism());
    int numShufflePartitions = spark.sessionState().conf().numShufflePartitions();
    MetricsConfig metricsConfig = MetricsConfig.fromProperties(targetTable.properties());
    String nameMappingString = targetTable.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
    NameMapping nameMapping =
        nameMappingString != null ? NameMappingParser.fromJson(nameMappingString) : null;

    JavaSparkContext sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
    JavaRDD<SparkPartition> partitionRDD = sparkContext.parallelize(partitions, parallelism);

    Dataset<SparkPartition> partitionDS =
        spark.createDataset(partitionRDD.rdd(), Encoders.javaSerialization(SparkPartition.class));

    Dataset<DataFile> filesToImport =
        partitionDS.flatMap(
            (FlatMapFunction<SparkPartition, DataFile>)
                sparkPartition ->
                    listPartition(
                            sparkPartition, spec, serializableConf, metricsConfig, nameMapping)
                        .iterator(),
            Encoders.javaSerialization(DataFile.class));

    if (checkDuplicateFiles) {
      Dataset<Row> importedFiles =
          filesToImport
              .map((MapFunction<DataFile, String>) f -> f.path().toString(), Encoders.STRING())
              .toDF("file_path");
      Dataset<Row> existingFiles =
          loadMetadataTable(spark, targetTable, MetadataTableType.ENTRIES).filter("status != 2");
      Column joinCond =
          existingFiles.col("data_file.file_path").equalTo(importedFiles.col("file_path"));
      Dataset<String> duplicates =
          importedFiles.join(existingFiles, joinCond).select("file_path").as(Encoders.STRING());
      Preconditions.checkState(
          duplicates.isEmpty(),
          String.format(
              DUPLICATE_FILE_MESSAGE, Joiner.on(",").join((String[]) duplicates.take(10))));
    }

    List<ManifestFile> manifests =
        filesToImport
            .repartition(numShufflePartitions)
            .map(
                (MapFunction<DataFile, Tuple2<String, DataFile>>)
                    file -> Tuple2.apply(file.path().toString(), file),
                Encoders.tuple(Encoders.STRING(), Encoders.javaSerialization(DataFile.class)))
            .orderBy(col("_1"))
            .mapPartitions(
                (MapPartitionsFunction<Tuple2<String, DataFile>, ManifestFile>)
                    fileTuple -> buildManifest(serializableConf, spec, stagingDir, fileTuple),
                Encoders.javaSerialization(ManifestFile.class))
            .collectAsList();

    try {
      boolean snapshotIdInheritanceEnabled =
          PropertyUtil.propertyAsBoolean(
              targetTable.properties(),
              TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED,
              TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT);

      AppendFiles append = targetTable.newAppend();
      manifests.forEach(append::appendManifest);
      append.commit();

      if (!snapshotIdInheritanceEnabled) {
        // delete original manifests as they were rewritten before the commit
        deleteManifests(targetTable.io(), manifests);
      }
    } catch (Throwable e) {
      deleteManifests(targetTable.io(), manifests);
      throw e;
    }
  }

  /**
   * Import files from given partitions to an Iceberg table.
   *
   * @param spark a Spark session
   * @param partitions partitions to import
   * @param targetTable an Iceberg table where to import the data
   * @param spec a partition spec
   * @param stagingDir a staging directory to store temporary manifest files
   */
  public static void importSparkPartitions(
      SparkSession spark,
      List<SparkPartition> partitions,
      Table targetTable,
      PartitionSpec spec,
      String stagingDir) {
    importSparkPartitions(spark, partitions, targetTable, spec, stagingDir, false);
  }

  public static List<SparkPartition> filterPartitions(
      List<SparkPartition> partitions, Map<String, String> partitionFilter) {
    if (partitionFilter.isEmpty()) {
      return partitions;
    } else {
      return partitions.stream()
          .filter(p -> p.getValues().entrySet().containsAll(partitionFilter.entrySet()))
          .collect(Collectors.toList());
    }
  }

  private static void deleteManifests(FileIO io, List<ManifestFile> manifests) {
    Tasks.foreach(manifests)
        .executeWith(ThreadPools.getWorkerPool())
        .noRetry()
        .suppressFailureWhenFinished()
        .run(item -> io.deleteFile(item.path()));
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java [274:653]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
      SparkPartition partition,
      PartitionSpec spec,
      SerializableConfiguration conf,
      MetricsConfig metricsConfig,
      NameMapping mapping) {
    return TableMigrationUtil.listPartition(
        partition.values,
        partition.uri,
        partition.format,
        spec,
        conf.get(),
        metricsConfig,
        mapping);
  }

  private static SparkPartition toSparkPartition(
      CatalogTablePartition partition, CatalogTable table) {
    Option<URI> locationUri = partition.storage().locationUri();
    Option<String> serde = partition.storage().serde();

    Preconditions.checkArgument(locationUri.nonEmpty(), "Partition URI should be defined");
    Preconditions.checkArgument(
        serde.nonEmpty() || table.provider().nonEmpty(), "Partition format should be defined");

    String uri = Util.uriToString(locationUri.get());
    String format = serde.nonEmpty() ? serde.get() : table.provider().get();

    Map<String, String> partitionSpec =
        JavaConverters.mapAsJavaMapConverter(partition.spec()).asJava();
    return new SparkPartition(partitionSpec, uri, format);
  }

  private static Expression resolveAttrs(SparkSession spark, String table, Expression expr) {
    Function2<String, String, Object> resolver = spark.sessionState().analyzer().resolver();
    LogicalPlan plan = spark.table(table).queryExecution().analyzed();
    return expr.transform(
        new AbstractPartialFunction<Expression, Expression>() {
          @Override
          public Expression apply(Expression attr) {
            UnresolvedAttribute unresolvedAttribute = (UnresolvedAttribute) attr;
            Option<NamedExpression> namedExpressionOption =
                plan.resolve(unresolvedAttribute.nameParts(), resolver);
            if (namedExpressionOption.isDefined()) {
              return (Expression) namedExpressionOption.get();
            } else {
              throw new IllegalArgumentException(
                  String.format("Could not resolve %s using columns: %s", attr, plan.output()));
            }
          }

          @Override
          public boolean isDefinedAt(Expression attr) {
            return attr instanceof UnresolvedAttribute;
          }
        });
  }

  private static Iterator<ManifestFile> buildManifest(
      SerializableConfiguration conf,
      PartitionSpec spec,
      String basePath,
      Iterator<Tuple2<String, DataFile>> fileTuples) {
    if (fileTuples.hasNext()) {
      FileIO io = new HadoopFileIO(conf.get());
      TaskContext ctx = TaskContext.get();
      String suffix =
          String.format(
              "stage-%d-task-%d-manifest-%s",
              ctx.stageId(), ctx.taskAttemptId(), UUID.randomUUID());
      Path location = new Path(basePath, suffix);
      String outputPath = FileFormat.AVRO.addExtension(location.toString());
      OutputFile outputFile = io.newOutputFile(outputPath);
      ManifestWriter<DataFile> writer = ManifestFiles.write(spec, outputFile);

      try (ManifestWriter<DataFile> writerRef = writer) {
        fileTuples.forEachRemaining(fileTuple -> writerRef.add(fileTuple._2));
      } catch (IOException e) {
        throw SparkExceptionUtil.toUncheckedException(
            e, "Unable to close the manifest writer: %s", outputPath);
      }

      ManifestFile manifestFile = writer.toManifestFile();
      return ImmutableList.of(manifestFile).iterator();
    } else {
      return Collections.emptyIterator();
    }
  }

  /**
   * Import files from an existing Spark table to an Iceberg table.
   *
   * <p>The import uses the Spark session to get table metadata. It assumes no operation is going on
   * the original and target table and thus is not thread-safe.
   *
   * @param spark a Spark session
   * @param sourceTableIdent an identifier of the source Spark table
   * @param targetTable an Iceberg table where to import the data
   * @param stagingDir a staging directory to store temporary manifest files
   * @param partitionFilter only import partitions whose values match those in the map, can be
   *     partially defined
   * @param checkDuplicateFiles if true, throw exception if import results in a duplicate data file
   */
  public static void importSparkTable(
      SparkSession spark,
      TableIdentifier sourceTableIdent,
      Table targetTable,
      String stagingDir,
      Map<String, String> partitionFilter,
      boolean checkDuplicateFiles) {
    SessionCatalog catalog = spark.sessionState().catalog();

    String db =
        sourceTableIdent.database().nonEmpty()
            ? sourceTableIdent.database().get()
            : catalog.getCurrentDatabase();
    TableIdentifier sourceTableIdentWithDB =
        new TableIdentifier(sourceTableIdent.table(), Some.apply(db));

    if (!catalog.tableExists(sourceTableIdentWithDB)) {
      throw new org.apache.iceberg.exceptions.NoSuchTableException(
          "Table %s does not exist", sourceTableIdentWithDB);
    }

    try {
      PartitionSpec spec =
          SparkSchemaUtil.specForTable(spark, sourceTableIdentWithDB.unquotedString());

      if (Objects.equal(spec, PartitionSpec.unpartitioned())) {
        importUnpartitionedSparkTable(
            spark, sourceTableIdentWithDB, targetTable, checkDuplicateFiles);
      } else {
        List<SparkPartition> sourceTablePartitions =
            getPartitions(spark, sourceTableIdent, partitionFilter);
        if (sourceTablePartitions.isEmpty()) {
          targetTable.newAppend().commit();
        } else {
          importSparkPartitions(
              spark, sourceTablePartitions, targetTable, spec, stagingDir, checkDuplicateFiles);
        }
      }
    } catch (AnalysisException e) {
      throw SparkExceptionUtil.toUncheckedException(
          e, "Unable to get partition spec for table: %s", sourceTableIdentWithDB);
    }
  }

  /**
   * Import files from an existing Spark table to an Iceberg table.
   *
   * <p>The import uses the Spark session to get table metadata. It assumes no operation is going on
   * the original and target table and thus is not thread-safe.
   *
   * @param spark a Spark session
   * @param sourceTableIdent an identifier of the source Spark table
   * @param targetTable an Iceberg table where to import the data
   * @param stagingDir a staging directory to store temporary manifest files
   * @param checkDuplicateFiles if true, throw exception if import results in a duplicate data file
   */
  public static void importSparkTable(
      SparkSession spark,
      TableIdentifier sourceTableIdent,
      Table targetTable,
      String stagingDir,
      boolean checkDuplicateFiles) {
    importSparkTable(
        spark,
        sourceTableIdent,
        targetTable,
        stagingDir,
        Collections.emptyMap(),
        checkDuplicateFiles);
  }

  /**
   * Import files from an existing Spark table to an Iceberg table.
   *
   * <p>The import uses the Spark session to get table metadata. It assumes no operation is going on
   * the original and target table and thus is not thread-safe.
   *
   * @param spark a Spark session
   * @param sourceTableIdent an identifier of the source Spark table
   * @param targetTable an Iceberg table where to import the data
   * @param stagingDir a staging directory to store temporary manifest files
   */
  public static void importSparkTable(
      SparkSession spark, TableIdentifier sourceTableIdent, Table targetTable, String stagingDir) {
    importSparkTable(
        spark, sourceTableIdent, targetTable, stagingDir, Collections.emptyMap(), false);
  }

  private static void importUnpartitionedSparkTable(
      SparkSession spark,
      TableIdentifier sourceTableIdent,
      Table targetTable,
      boolean checkDuplicateFiles) {
    try {
      CatalogTable sourceTable = spark.sessionState().catalog().getTableMetadata(sourceTableIdent);
      Option<String> format =
          sourceTable.storage().serde().nonEmpty()
              ? sourceTable.storage().serde()
              : sourceTable.provider();
      Preconditions.checkArgument(format.nonEmpty(), "Could not determine table format");

      Map<String, String> partition = Collections.emptyMap();
      PartitionSpec spec = PartitionSpec.unpartitioned();
      Configuration conf = spark.sessionState().newHadoopConf();
      MetricsConfig metricsConfig = MetricsConfig.forTable(targetTable);
      String nameMappingString = targetTable.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
      NameMapping nameMapping =
          nameMappingString != null ? NameMappingParser.fromJson(nameMappingString) : null;

      List<DataFile> files =
          TableMigrationUtil.listPartition(
              partition,
              Util.uriToString(sourceTable.location()),
              format.get(),
              spec,
              conf,
              metricsConfig,
              nameMapping);

      if (checkDuplicateFiles) {
        Dataset<Row> importedFiles =
            spark
                .createDataset(Lists.transform(files, f -> f.path().toString()), Encoders.STRING())
                .toDF("file_path");
        Dataset<Row> existingFiles =
            loadMetadataTable(spark, targetTable, MetadataTableType.ENTRIES).filter("status != 2");
        Column joinCond =
            existingFiles.col("data_file.file_path").equalTo(importedFiles.col("file_path"));
        Dataset<String> duplicates =
            importedFiles.join(existingFiles, joinCond).select("file_path").as(Encoders.STRING());
        Preconditions.checkState(
            duplicates.isEmpty(),
            String.format(
                DUPLICATE_FILE_MESSAGE, Joiner.on(",").join((String[]) duplicates.take(10))));
      }

      AppendFiles append = targetTable.newAppend();
      files.forEach(append::appendFile);
      append.commit();
    } catch (NoSuchDatabaseException e) {
      throw SparkExceptionUtil.toUncheckedException(
          e, "Unknown table: %s. Database not found in catalog.", sourceTableIdent);
    } catch (NoSuchTableException e) {
      throw SparkExceptionUtil.toUncheckedException(
          e, "Unknown table: %s. Table not found in catalog.", sourceTableIdent);
    }
  }

  /**
   * Import files from given partitions to an Iceberg table.
   *
   * @param spark a Spark session
   * @param partitions partitions to import
   * @param targetTable an Iceberg table where to import the data
   * @param spec a partition spec
   * @param stagingDir a staging directory to store temporary manifest files
   * @param checkDuplicateFiles if true, throw exception if import results in a duplicate data file
   */
  public static void importSparkPartitions(
      SparkSession spark,
      List<SparkPartition> partitions,
      Table targetTable,
      PartitionSpec spec,
      String stagingDir,
      boolean checkDuplicateFiles) {
    Configuration conf = spark.sessionState().newHadoopConf();
    SerializableConfiguration serializableConf = new SerializableConfiguration(conf);
    int parallelism =
        Math.min(
            partitions.size(), spark.sessionState().conf().parallelPartitionDiscoveryParallelism());
    int numShufflePartitions = spark.sessionState().conf().numShufflePartitions();
    MetricsConfig metricsConfig = MetricsConfig.fromProperties(targetTable.properties());
    String nameMappingString = targetTable.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
    NameMapping nameMapping =
        nameMappingString != null ? NameMappingParser.fromJson(nameMappingString) : null;

    JavaSparkContext sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
    JavaRDD<SparkPartition> partitionRDD = sparkContext.parallelize(partitions, parallelism);

    Dataset<SparkPartition> partitionDS =
        spark.createDataset(partitionRDD.rdd(), Encoders.javaSerialization(SparkPartition.class));

    Dataset<DataFile> filesToImport =
        partitionDS.flatMap(
            (FlatMapFunction<SparkPartition, DataFile>)
                sparkPartition ->
                    listPartition(
                            sparkPartition, spec, serializableConf, metricsConfig, nameMapping)
                        .iterator(),
            Encoders.javaSerialization(DataFile.class));

    if (checkDuplicateFiles) {
      Dataset<Row> importedFiles =
          filesToImport
              .map((MapFunction<DataFile, String>) f -> f.path().toString(), Encoders.STRING())
              .toDF("file_path");
      Dataset<Row> existingFiles =
          loadMetadataTable(spark, targetTable, MetadataTableType.ENTRIES).filter("status != 2");
      Column joinCond =
          existingFiles.col("data_file.file_path").equalTo(importedFiles.col("file_path"));
      Dataset<String> duplicates =
          importedFiles.join(existingFiles, joinCond).select("file_path").as(Encoders.STRING());
      Preconditions.checkState(
          duplicates.isEmpty(),
          String.format(
              DUPLICATE_FILE_MESSAGE, Joiner.on(",").join((String[]) duplicates.take(10))));
    }

    List<ManifestFile> manifests =
        filesToImport
            .repartition(numShufflePartitions)
            .map(
                (MapFunction<DataFile, Tuple2<String, DataFile>>)
                    file -> Tuple2.apply(file.path().toString(), file),
                Encoders.tuple(Encoders.STRING(), Encoders.javaSerialization(DataFile.class)))
            .orderBy(col("_1"))
            .mapPartitions(
                (MapPartitionsFunction<Tuple2<String, DataFile>, ManifestFile>)
                    fileTuple -> buildManifest(serializableConf, spec, stagingDir, fileTuple),
                Encoders.javaSerialization(ManifestFile.class))
            .collectAsList();

    try {
      boolean snapshotIdInheritanceEnabled =
          PropertyUtil.propertyAsBoolean(
              targetTable.properties(),
              TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED,
              TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT);

      AppendFiles append = targetTable.newAppend();
      manifests.forEach(append::appendManifest);
      append.commit();

      if (!snapshotIdInheritanceEnabled) {
        // delete original manifests as they were rewritten before the commit
        deleteManifests(targetTable.io(), manifests);
      }
    } catch (Throwable e) {
      deleteManifests(targetTable.io(), manifests);
      throw e;
    }
  }

  /**
   * Import files from given partitions to an Iceberg table.
   *
   * @param spark a Spark session
   * @param partitions partitions to import
   * @param targetTable an Iceberg table where to import the data
   * @param spec a partition spec
   * @param stagingDir a staging directory to store temporary manifest files
   */
  public static void importSparkPartitions(
      SparkSession spark,
      List<SparkPartition> partitions,
      Table targetTable,
      PartitionSpec spec,
      String stagingDir) {
    importSparkPartitions(spark, partitions, targetTable, spec, stagingDir, false);
  }

  public static List<SparkPartition> filterPartitions(
      List<SparkPartition> partitions, Map<String, String> partitionFilter) {
    if (partitionFilter.isEmpty()) {
      return partitions;
    } else {
      return partitions.stream()
          .filter(p -> p.getValues().entrySet().containsAll(partitionFilter.entrySet()))
          .collect(Collectors.toList());
    }
  }

  private static void deleteManifests(FileIO io, List<ManifestFile> manifests) {
    Tasks.foreach(manifests)
        .executeWith(ThreadPools.getWorkerPool())
        .noRetry()
        .suppressFailureWhenFinished()
        .run(item -> io.deleteFile(item.path()));
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



