private Dataset loadDataFromFilePaths()

in spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java [901:941]


    private Dataset<Row> loadDataFromFilePaths(SparkSession spark,
                                               EtlJobConfig.EtlIndex baseIndex,
                                               List<String> filePaths,
                                               EtlJobConfig.EtlFileGroup fileGroup,
                                               StructType dstTableSchema)
            throws SparkDppException, IOException {
        Dataset<Row> fileGroupDataframe = null;
        for (String filePath : filePaths) {
            try {
                FileSystem fs = FileSystem.get(new Path(filePath).toUri(), serializableHadoopConf.value());
                FileStatus[] fileStatuses = fs.globStatus(new Path(filePath));
                if (fileStatuses == null) {
                    throw new SparkDppException("fs list status failed: " + filePath);
                }
                for (FileStatus fileStatus : fileStatuses) {
                    if (fileStatus.isDirectory()) {
                        continue;
                    }
                    fileNumberAcc.add(1);
                    fileSizeAcc.add(fileStatus.getLen());
                }
            } catch (Exception e) {
                LOG.warn("parse path failed:" + filePath);
                throw e;
            }
            if (fileGroup.columnSeparator == null) {
                LOG.warn("invalid null column separator!");
                throw new SparkDppException("Reason: invalid null column separator!");
            }
            Dataset<Row> dataframe = null;

            dataframe = loadDataFromPath(spark, fileGroup, filePath, baseIndex, baseIndex.columns);
            dataframe = convertSrcDataframeToDstDataframe(baseIndex, dataframe, dstTableSchema, fileGroup);
            if (fileGroupDataframe == null) {
                fileGroupDataframe = dataframe;
            } else {
                fileGroupDataframe.union(dataframe);
            }
        }
        return fileGroupDataframe;
    }