in spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java [620:758]
private Dataset<Row> loadDataFromPath(SparkSession spark,
EtlJobConfig.EtlFileGroup fileGroup,
String fileUrl,
EtlJobConfig.EtlIndex baseIndex,
List<EtlJobConfig.EtlColumn> columns) throws SparkDppException {
List<String> columnValueFromPath = DppUtils.parseColumnsFromPath(fileUrl, fileGroup.columnsFromPath);
List<String> dataSrcColumns = fileGroup.fileFieldNames;
if (dataSrcColumns == null) {
// if there is no source columns info
// use base index columns as source columns
dataSrcColumns = new ArrayList<>();
for (EtlJobConfig.EtlColumn column : baseIndex.columns) {
dataSrcColumns.add(column.columnName);
}
}
// for getting schema to check source data
Map<String, Integer> dstColumnNameToIndex = new HashMap<String, Integer>();
for (int i = 0; i < baseIndex.columns.size(); i++) {
dstColumnNameToIndex.put(baseIndex.columns.get(i).columnName, i);
}
List<String> srcColumnsWithColumnsFromPath = new ArrayList<>(dataSrcColumns);
if (fileGroup.columnsFromPath != null) {
srcColumnsWithColumnsFromPath.addAll(fileGroup.columnsFromPath);
}
if ("parquet".equalsIgnoreCase(fileGroup.fileFormat)) {
// parquet had its own schema, just use it; perhaps we could add some validation in future.
Dataset<Row> dataFrame = spark.read().parquet(fileUrl);
if (!CollectionUtils.isEmpty(columnValueFromPath)) {
for (int k = 0; k < columnValueFromPath.size(); k++) {
dataFrame = dataFrame.withColumn(
fileGroup.columnsFromPath.get(k), functions.lit(columnValueFromPath.get(k)));
}
}
if (!Strings.isNullOrEmpty(fileGroup.where)) {
dataFrame = dataFrame.where(fileGroup.where);
}
return dataFrame;
}
if ("orc".equalsIgnoreCase(fileGroup.fileFormat)) {
Dataset<Row> dataFrame = spark.read().orc(fileUrl);
if (!CollectionUtils.isEmpty(columnValueFromPath)) {
for (int k = 0; k < columnValueFromPath.size(); k++) {
dataFrame = dataFrame.withColumn(
fileGroup.columnsFromPath.get(k), functions.lit(columnValueFromPath.get(k)));
}
}
if (!Strings.isNullOrEmpty(fileGroup.where)) {
dataFrame = dataFrame.where(fileGroup.where);
}
return dataFrame;
}
StructType srcSchema = createScrSchema(srcColumnsWithColumnsFromPath);
JavaRDD<String> sourceDataRdd = spark.read().textFile(fileUrl).toJavaRDD();
int columnSize = dataSrcColumns.size();
List<ColumnParser> parsers = new ArrayList<>();
for (EtlJobConfig.EtlColumn column : baseIndex.columns) {
parsers.add(ColumnParser.create(column));
}
char separator = (char) fileGroup.columnSeparator.getBytes(StandardCharsets.UTF_8)[0];
JavaRDD<Row> rowRDD = sourceDataRdd.flatMap(
record -> {
scannedRowsAcc.add(1);
String[] attributes = splitLine(record, separator);
List<Row> result = new ArrayList<>();
boolean validRow = true;
if (attributes.length != columnSize) {
LOG.warn("invalid src schema, data columns:"
+ attributes.length + ", file group columns:"
+ columnSize + ", row:" + record);
validRow = false;
} else {
for (int i = 0; i < attributes.length; ++i) {
StructField field = srcSchema.apply(i);
String srcColumnName = field.name();
if (attributes[i].equals(NULL_FLAG) && dstColumnNameToIndex.containsKey(srcColumnName)) {
if (baseIndex.columns.get(dstColumnNameToIndex.get(srcColumnName)).isAllowNull) {
attributes[i] = null;
} else {
LOG.warn("column name:" + srcColumnName + ", attribute: " + i
+ " can not be null. row:" + record);
validRow = false;
break;
}
}
boolean isStrictMode = etlJobConfig.properties.strictMode;
if (isStrictMode) {
if (dstColumnNameToIndex.containsKey(srcColumnName)) {
int index = dstColumnNameToIndex.get(srcColumnName);
String type = columns.get(index).columnType;
if (type.equalsIgnoreCase("CHAR")
|| type.equalsIgnoreCase("VARCHAR")
|| fileGroup.columnMappings.containsKey(field.name())) {
continue;
}
ColumnParser parser = parsers.get(index);
boolean valid = parser.parse(attributes[i]);
if (!valid) {
validRow = false;
LOG.warn("invalid row:" + record
+ ", attribute " + i + ": " + attributes[i] + " parsed failed");
break;
}
}
}
}
}
if (validRow) {
Row row = null;
if (fileGroup.columnsFromPath == null) {
row = RowFactory.create(attributes);
} else {
// process columns from path
// append columns from path to the tail
List<String> columnAttributes = new ArrayList<>();
columnAttributes.addAll(Arrays.asList(attributes));
columnAttributes.addAll(columnValueFromPath);
row = RowFactory.create(columnAttributes.toArray());
}
result.add(row);
} else {
abnormalRowAcc.add(1);
// at most add 5 rows to invalidRows
if (abnormalRowAcc.value() <= 5) {
invalidRows.add(record);
}
}
return result.iterator();
}
);
Dataset<Row> dataframe = spark.createDataFrame(rowRDD, srcSchema);
if (!Strings.isNullOrEmpty(fileGroup.where)) {
dataframe = dataframe.where(fileGroup.where);
}
return dataframe;
}