public static void importSparkPartitions()

in spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java [798:891]


  public static void importSparkPartitions(
      SparkSession spark,
      List<SparkPartition> partitions,
      Table targetTable,
      PartitionSpec spec,
      String stagingDir,
      boolean checkDuplicateFiles,
      boolean ignoreMissingFiles,
      ExecutorService service) {
    Configuration conf = spark.sessionState().newHadoopConf();
    SerializableConfiguration serializableConf = new SerializableConfiguration(conf);
    int listingParallelism =
        Math.min(
            partitions.size(), spark.sessionState().conf().parallelPartitionDiscoveryParallelism());
    int numShufflePartitions = spark.sessionState().conf().numShufflePartitions();
    MetricsConfig metricsConfig = MetricsConfig.fromProperties(targetTable.properties());
    String nameMappingString = targetTable.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
    NameMapping nameMapping =
        nameMappingString != null ? NameMappingParser.fromJson(nameMappingString) : null;

    JavaSparkContext sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
    JavaRDD<SparkPartition> partitionRDD = sparkContext.parallelize(partitions, listingParallelism);

    Dataset<SparkPartition> partitionDS =
        spark.createDataset(partitionRDD.rdd(), Encoders.javaSerialization(SparkPartition.class));

    Dataset<DataFile> filesToImport =
        partitionDS.flatMap(
            (FlatMapFunction<SparkPartition, DataFile>)
                sparkPartition ->
                    listPartition(
                            sparkPartition,
                            spec,
                            serializableConf,
                            metricsConfig,
                            nameMapping,
                            ignoreMissingFiles,
                            service)
                        .iterator(),
            Encoders.javaSerialization(DataFile.class));

    if (checkDuplicateFiles) {
      Dataset<Row> importedFiles =
          filesToImport
              .map((MapFunction<DataFile, String>) ContentFile::location, Encoders.STRING())
              .toDF("file_path");
      Dataset<Row> existingFiles =
          loadMetadataTable(spark, targetTable, MetadataTableType.ENTRIES).filter("status != 2");
      Column joinCond =
          existingFiles.col("data_file.file_path").equalTo(importedFiles.col("file_path"));
      Dataset<String> duplicates =
          importedFiles.join(existingFiles, joinCond).select("file_path").as(Encoders.STRING());
      Preconditions.checkState(
          duplicates.isEmpty(),
          String.format(
              DUPLICATE_FILE_MESSAGE, Joiner.on(",").join((String[]) duplicates.take(10))));
    }

    List<ManifestFile> manifests =
        filesToImport
            .repartition(numShufflePartitions)
            .map(
                (MapFunction<DataFile, Tuple2<String, DataFile>>)
                    file -> Tuple2.apply(file.location(), file),
                Encoders.tuple(Encoders.STRING(), Encoders.javaSerialization(DataFile.class)))
            .orderBy(col("_1"))
            .mapPartitions(
                (MapPartitionsFunction<Tuple2<String, DataFile>, ManifestFile>)
                    fileTuple -> buildManifest(serializableConf, spec, stagingDir, fileTuple),
                Encoders.javaSerialization(ManifestFile.class))
            .collectAsList();

    try {
      TableOperations ops = ((HasTableOperations) targetTable).operations();
      int formatVersion = ops.current().formatVersion();
      boolean snapshotIdInheritanceEnabled =
          PropertyUtil.propertyAsBoolean(
              targetTable.properties(),
              TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED,
              TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT);

      AppendFiles append = targetTable.newAppend();
      manifests.forEach(append::appendManifest);
      append.commit();

      if (formatVersion == 1 && !snapshotIdInheritanceEnabled) {
        // delete original manifests as they were rewritten before the commit
        deleteManifests(targetTable.io(), manifests);
      }
    } catch (Throwable e) {
      deleteManifests(targetTable.io(), manifests);
      throw e;
    }
  }