in spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java [542:618]
private Dataset<Row> convertSrcDataframeToDstDataframe(EtlJobConfig.EtlIndex baseIndex,
Dataset<Row> srcDataframe, StructType dstTableSchema,
EtlJobConfig.EtlFileGroup fileGroup)
throws SparkDppException {
Dataset<Row> dataframe = srcDataframe;
StructType srcSchema = dataframe.schema();
Set<String> srcColumnNames = new HashSet<>();
for (StructField field : srcSchema.fields()) {
srcColumnNames.add(field.name());
}
Map<String, EtlJobConfig.EtlColumnMapping> columnMappings = fileGroup.columnMappings;
// 1. process simple columns
Set<String> mappingColumns = null;
if (columnMappings != null) {
mappingColumns = columnMappings.keySet();
}
List<String> dstColumnNames = new ArrayList<>();
for (StructField dstField : dstTableSchema.fields()) {
dstColumnNames.add(dstField.name());
EtlJobConfig.EtlColumn column = baseIndex.getColumn(dstField.name());
if (!srcColumnNames.contains(dstField.name())) {
if (mappingColumns != null && mappingColumns.contains(dstField.name())) {
// mapping columns will be processed in next step
continue;
}
if (column.defaultValue != null) {
if (column.defaultValue.equals(NULL_FLAG)) {
dataframe = dataframe.withColumn(dstField.name(), functions.lit(null));
} else {
dataframe = dataframe.withColumn(dstField.name(), functions.lit(column.defaultValue));
}
} else if (column.isAllowNull) {
dataframe = dataframe.withColumn(dstField.name(), functions.lit(null));
} else {
throw new SparkDppException("Reason: no data for column:" + dstField.name());
}
}
if (column.columnType.equalsIgnoreCase("DATE") || column.columnType.equalsIgnoreCase("DATEV2")) {
dataframe = dataframe.withColumn(dstField.name(),
dataframe.col(dstField.name()).cast(DataTypes.DateType));
} else if (column.columnType.equalsIgnoreCase("DATETIME")
|| column.columnType.equalsIgnoreCase("DATETIMEV2")) {
dataframe = dataframe.withColumn(dstField.name(),
dataframe.col(dstField.name()).cast(DataTypes.TimestampType));
} else if (column.columnType.equalsIgnoreCase("BOOLEAN")) {
dataframe = dataframe.withColumn(dstField.name(),
functions.when(functions.lower(dataframe.col(dstField.name())).equalTo("true"), "1")
.when(dataframe.col(dstField.name()).equalTo("1"), "1")
.otherwise("0"));
} else if (!column.columnType.equalsIgnoreCase(BITMAP_TYPE)
&& !dstField.dataType().equals(DataTypes.StringType)) {
dataframe = dataframe.withColumn(dstField.name(),
dataframe.col(dstField.name()).cast(dstField.dataType()));
} else if (column.columnType.equalsIgnoreCase(BITMAP_TYPE)
&& dstField.dataType().equals(DataTypes.BinaryType)) {
dataframe = dataframe.withColumn(dstField.name(),
dataframe.col(dstField.name()).cast(DataTypes.BinaryType));
}
if (fileGroup.isNegative && !column.isKey) {
// negative load
// value will be convert te -1 * value
dataframe = dataframe.withColumn(dstField.name(), functions.expr("-1 *" + dstField.name()));
}
}
// 2. process the mapping columns
for (String mappingColumn : mappingColumns) {
String mappingDescription = columnMappings.get(mappingColumn).toDescription();
if (mappingDescription.toLowerCase().contains("hll_hash")) {
continue;
}
// here should cast data type to dst column type
dataframe = dataframe.withColumn(mappingColumn,
functions.expr(mappingDescription).cast(dstTableSchema.apply(mappingColumn).dataType()));
}
return dataframe;
}