in spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java [189:287]
private void writeRepartitionAndSortedRDDToParquet(JavaPairRDD<List<Object>, Object[]> resultRDD,
String pathPattern, long tableId,
EtlJobConfig.EtlIndex indexMeta,
SparkRDDAggregator<?>[] sparkRDDAggregators) {
// TODO(wb) should deal largeint as BigInteger instead of string when using biginteger as key,
// data type may affect sorting logic
StructType dstSchema = DppUtils.createDstTableSchema(indexMeta.columns, false, true);
resultRDD.repartitionAndSortWithinPartitions(new BucketPartitioner(bucketKeyMap), new BucketComparator())
.foreachPartition((VoidFunction<Iterator<Tuple2<List<Object>, Object[]>>>) t -> {
// write the data to dst file
Configuration conf = new Configuration(serializableHadoopConf.value());
FileSystem fs = FileSystem.get(new Path(etlJobConfig.outputPath).toUri(), conf);
String lastBucketKey = null;
ParquetWriter<InternalRow> parquetWriter = null;
TaskContext taskContext = TaskContext.get();
long taskAttemptId = taskContext.taskAttemptId();
String dstPath = "";
String tmpPath = "";
while (t.hasNext()) {
Tuple2<List<Object>, Object[]> pair = t.next();
List<Object> keyColumns = pair._1();
Object[] valueColumns = pair._2();
if ((keyColumns.size() + valueColumns.length) <= 1) {
LOG.warn("invalid row:" + pair);
continue;
}
String curBucketKey = keyColumns.get(0).toString();
List<Object> columnObjects = new ArrayList<>();
for (int i = 1; i < keyColumns.size(); ++i) {
columnObjects.add(keyColumns.get(i));
}
for (int i = 0; i < valueColumns.length; ++i) {
columnObjects.add(sparkRDDAggregators[i].finalize(valueColumns[i]));
}
// if the bucket key is new, it will belong to a new tablet
if (!curBucketKey.equals(lastBucketKey)) {
if (parquetWriter != null) {
parquetWriter.close();
// rename tmpPath to path
try {
fs.rename(new Path(tmpPath), new Path(dstPath));
} catch (IOException ioe) {
LOG.warn("rename from tmpPath" + tmpPath + " to dstPath:" + dstPath
+ " failed. exception:" + ioe);
throw ioe;
}
}
// flush current writer and create a new writer
String[] bucketKey = curBucketKey.split("_");
if (bucketKey.length != 2) {
LOG.warn("invalid bucket key:" + curBucketKey);
continue;
}
long partitionId = Long.parseLong(bucketKey[0]);
int bucketId = Integer.parseInt(bucketKey[1]);
dstPath = String.format(pathPattern, tableId, partitionId, indexMeta.indexId, bucketId,
indexMeta.schemaHash);
tmpPath = dstPath + "." + taskAttemptId;
conf.setBoolean("spark.sql.parquet.writeLegacyFormat", false);
conf.setBoolean("spark.sql.parquet.int64AsTimestampMillis", false);
conf.setBoolean("spark.sql.parquet.int96AsTimestamp", true);
conf.setBoolean("spark.sql.parquet.binaryAsString", false);
conf.setBoolean("spark.sql.parquet.fieldId.write.enabled", true);
conf.set("spark.sql.parquet.outputTimestampType", "INT96");
ParquetWriteSupport.setSchema(dstSchema, conf);
ParquetWriteSupport parquetWriteSupport = new ParquetWriteSupport();
parquetWriter = new ParquetWriter<>(new Path(tmpPath), parquetWriteSupport,
CompressionCodecName.SNAPPY, 256 * 1024 * 1024, 16 * 1024, 1024 * 1024, true, false,
WriterVersion.PARQUET_1_0, conf);
LOG.info("[HdfsOperate]>> initialize writer succeed! path:" + tmpPath);
lastBucketKey = curBucketKey;
}
Object[] array = columnObjects.toArray();
Object[] catalystArr = new Object[array.length];
for (int i = 0; i < array.length; i++) {
catalystArr[i] = CatalystTypeConverters.createToCatalystConverter(dstSchema.apply(i).dataType()).apply(array[i]);
}
InternalRow internalRow = InternalRow.apply(
JavaConverters.asScalaBufferConverter(Arrays.asList(catalystArr)).asScala()
.toSeq());
parquetWriter.write(internalRow);
}
if (parquetWriter != null) {
parquetWriter.close();
try {
fs.rename(new Path(tmpPath), new Path(dstPath));
} catch (IOException ioe) {
LOG.warn("rename from tmpPath" + tmpPath + " to dstPath:" + dstPath + " failed. exception:"
+ ioe);
throw ioe;
}
}
});
}