in spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java [798:891]
public static void importSparkPartitions(
SparkSession spark,
List<SparkPartition> partitions,
Table targetTable,
PartitionSpec spec,
String stagingDir,
boolean checkDuplicateFiles,
boolean ignoreMissingFiles,
ExecutorService service) {
Configuration conf = spark.sessionState().newHadoopConf();
SerializableConfiguration serializableConf = new SerializableConfiguration(conf);
int listingParallelism =
Math.min(
partitions.size(), spark.sessionState().conf().parallelPartitionDiscoveryParallelism());
int numShufflePartitions = spark.sessionState().conf().numShufflePartitions();
MetricsConfig metricsConfig = MetricsConfig.fromProperties(targetTable.properties());
String nameMappingString = targetTable.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
NameMapping nameMapping =
nameMappingString != null ? NameMappingParser.fromJson(nameMappingString) : null;
JavaSparkContext sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<SparkPartition> partitionRDD = sparkContext.parallelize(partitions, listingParallelism);
Dataset<SparkPartition> partitionDS =
spark.createDataset(partitionRDD.rdd(), Encoders.javaSerialization(SparkPartition.class));
Dataset<DataFile> filesToImport =
partitionDS.flatMap(
(FlatMapFunction<SparkPartition, DataFile>)
sparkPartition ->
listPartition(
sparkPartition,
spec,
serializableConf,
metricsConfig,
nameMapping,
ignoreMissingFiles,
service)
.iterator(),
Encoders.javaSerialization(DataFile.class));
if (checkDuplicateFiles) {
Dataset<Row> importedFiles =
filesToImport
.map((MapFunction<DataFile, String>) ContentFile::location, Encoders.STRING())
.toDF("file_path");
Dataset<Row> existingFiles =
loadMetadataTable(spark, targetTable, MetadataTableType.ENTRIES).filter("status != 2");
Column joinCond =
existingFiles.col("data_file.file_path").equalTo(importedFiles.col("file_path"));
Dataset<String> duplicates =
importedFiles.join(existingFiles, joinCond).select("file_path").as(Encoders.STRING());
Preconditions.checkState(
duplicates.isEmpty(),
String.format(
DUPLICATE_FILE_MESSAGE, Joiner.on(",").join((String[]) duplicates.take(10))));
}
List<ManifestFile> manifests =
filesToImport
.repartition(numShufflePartitions)
.map(
(MapFunction<DataFile, Tuple2<String, DataFile>>)
file -> Tuple2.apply(file.location(), file),
Encoders.tuple(Encoders.STRING(), Encoders.javaSerialization(DataFile.class)))
.orderBy(col("_1"))
.mapPartitions(
(MapPartitionsFunction<Tuple2<String, DataFile>, ManifestFile>)
fileTuple -> buildManifest(serializableConf, spec, stagingDir, fileTuple),
Encoders.javaSerialization(ManifestFile.class))
.collectAsList();
try {
TableOperations ops = ((HasTableOperations) targetTable).operations();
int formatVersion = ops.current().formatVersion();
boolean snapshotIdInheritanceEnabled =
PropertyUtil.propertyAsBoolean(
targetTable.properties(),
TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED,
TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT);
AppendFiles append = targetTable.newAppend();
manifests.forEach(append::appendManifest);
append.commit();
if (formatVersion == 1 && !snapshotIdInheritanceEnabled) {
// delete original manifests as they were rewritten before the commit
deleteManifests(targetTable.io(), manifests);
}
} catch (Throwable e) {
deleteManifests(targetTable.io(), manifests);
throw e;
}
}