private void writeRepartitionAndSortedRDDToParquet()

in spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java [189:287]


    private void writeRepartitionAndSortedRDDToParquet(JavaPairRDD<List<Object>, Object[]> resultRDD,
                                                       String pathPattern, long tableId,
                                                       EtlJobConfig.EtlIndex indexMeta,
                                                       SparkRDDAggregator<?>[] sparkRDDAggregators) {
        // TODO(wb) should deal largeint as BigInteger instead of string when using biginteger as key,
        // data type may affect sorting logic
        StructType dstSchema = DppUtils.createDstTableSchema(indexMeta.columns, false, true);

        resultRDD.repartitionAndSortWithinPartitions(new BucketPartitioner(bucketKeyMap), new BucketComparator())
                .foreachPartition((VoidFunction<Iterator<Tuple2<List<Object>, Object[]>>>) t -> {
                    // write the data to dst file
                    Configuration conf = new Configuration(serializableHadoopConf.value());
                    FileSystem fs = FileSystem.get(new Path(etlJobConfig.outputPath).toUri(), conf);
                    String lastBucketKey = null;
                    ParquetWriter<InternalRow> parquetWriter = null;
                    TaskContext taskContext = TaskContext.get();
                    long taskAttemptId = taskContext.taskAttemptId();
                    String dstPath = "";
                    String tmpPath = "";

                    while (t.hasNext()) {
                        Tuple2<List<Object>, Object[]> pair = t.next();
                        List<Object> keyColumns = pair._1();
                        Object[] valueColumns = pair._2();
                        if ((keyColumns.size() + valueColumns.length) <= 1) {
                            LOG.warn("invalid row:" + pair);
                            continue;
                        }

                        String curBucketKey = keyColumns.get(0).toString();
                        List<Object> columnObjects = new ArrayList<>();
                        for (int i = 1; i < keyColumns.size(); ++i) {
                            columnObjects.add(keyColumns.get(i));
                        }
                        for (int i = 0; i < valueColumns.length; ++i) {
                            columnObjects.add(sparkRDDAggregators[i].finalize(valueColumns[i]));
                        }

                        // if the bucket key is new, it will belong to a new tablet
                        if (!curBucketKey.equals(lastBucketKey)) {
                            if (parquetWriter != null) {
                                parquetWriter.close();
                                // rename tmpPath to path
                                try {
                                    fs.rename(new Path(tmpPath), new Path(dstPath));
                                } catch (IOException ioe) {
                                    LOG.warn("rename from tmpPath" + tmpPath + " to dstPath:" + dstPath
                                            + " failed. exception:" + ioe);
                                    throw ioe;
                                }
                            }
                            // flush current writer and create a new writer
                            String[] bucketKey = curBucketKey.split("_");
                            if (bucketKey.length != 2) {
                                LOG.warn("invalid bucket key:" + curBucketKey);
                                continue;
                            }
                            long partitionId = Long.parseLong(bucketKey[0]);
                            int bucketId = Integer.parseInt(bucketKey[1]);
                            dstPath = String.format(pathPattern, tableId, partitionId, indexMeta.indexId, bucketId,
                                    indexMeta.schemaHash);
                            tmpPath = dstPath + "." + taskAttemptId;
                            conf.setBoolean("spark.sql.parquet.writeLegacyFormat", false);
                            conf.setBoolean("spark.sql.parquet.int64AsTimestampMillis", false);
                            conf.setBoolean("spark.sql.parquet.int96AsTimestamp", true);
                            conf.setBoolean("spark.sql.parquet.binaryAsString", false);
                            conf.setBoolean("spark.sql.parquet.fieldId.write.enabled", true);
                            conf.set("spark.sql.parquet.outputTimestampType", "INT96");
                            ParquetWriteSupport.setSchema(dstSchema, conf);
                            ParquetWriteSupport parquetWriteSupport = new ParquetWriteSupport();
                            parquetWriter = new ParquetWriter<>(new Path(tmpPath), parquetWriteSupport,
                                    CompressionCodecName.SNAPPY, 256 * 1024 * 1024, 16 * 1024, 1024 * 1024, true, false,
                                    WriterVersion.PARQUET_1_0, conf);
                            LOG.info("[HdfsOperate]>> initialize writer succeed! path:" + tmpPath);
                            lastBucketKey = curBucketKey;
                        }
                        Object[] array = columnObjects.toArray();
                        Object[] catalystArr = new Object[array.length];
                        for (int i = 0; i < array.length; i++) {
                            catalystArr[i] = CatalystTypeConverters.createToCatalystConverter(dstSchema.apply(i).dataType()).apply(array[i]);
                        }
                        InternalRow internalRow = InternalRow.apply(
                                JavaConverters.asScalaBufferConverter(Arrays.asList(catalystArr)).asScala()
                                        .toSeq());
                        parquetWriter.write(internalRow);
                    }
                    if (parquetWriter != null) {
                        parquetWriter.close();
                        try {
                            fs.rename(new Path(tmpPath), new Path(dstPath));
                        } catch (IOException ioe) {
                            LOG.warn("rename from tmpPath" + tmpPath + " to dstPath:" + dstPath + " failed. exception:"
                                    + ioe);
                            throw ioe;
                        }
                    }

                });
    }