in spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java [901:941]
private Dataset<Row> loadDataFromFilePaths(SparkSession spark,
EtlJobConfig.EtlIndex baseIndex,
List<String> filePaths,
EtlJobConfig.EtlFileGroup fileGroup,
StructType dstTableSchema)
throws SparkDppException, IOException {
Dataset<Row> fileGroupDataframe = null;
for (String filePath : filePaths) {
try {
FileSystem fs = FileSystem.get(new Path(filePath).toUri(), serializableHadoopConf.value());
FileStatus[] fileStatuses = fs.globStatus(new Path(filePath));
if (fileStatuses == null) {
throw new SparkDppException("fs list status failed: " + filePath);
}
for (FileStatus fileStatus : fileStatuses) {
if (fileStatus.isDirectory()) {
continue;
}
fileNumberAcc.add(1);
fileSizeAcc.add(fileStatus.getLen());
}
} catch (Exception e) {
LOG.warn("parse path failed:" + filePath);
throw e;
}
if (fileGroup.columnSeparator == null) {
LOG.warn("invalid null column separator!");
throw new SparkDppException("Reason: invalid null column separator!");
}
Dataset<Row> dataframe = null;
dataframe = loadDataFromPath(spark, fileGroup, filePath, baseIndex, baseIndex.columns);
dataframe = convertSrcDataframeToDstDataframe(baseIndex, dataframe, dstTableSchema, fileGroup);
if (fileGroupDataframe == null) {
fileGroupDataframe = dataframe;
} else {
fileGroupDataframe.union(dataframe);
}
}
return fileGroupDataframe;
}