in spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java [980:1072]
private Dataset<Row> checkDataFromHiveWithStrictMode(Dataset<Row> dataframe, EtlJobConfig.EtlIndex baseIndex,
Set<String> mappingColKeys, boolean isStrictMode,
StructType dstTableSchema,
Set<String> dictBitmapColumnSet,
Set<String> binaryBitmapColumnsSet) throws SparkDppException {
List<EtlJobConfig.EtlColumn> columnNameNeedCheckArrayList = new ArrayList<>();
List<ColumnParser> columnParserArrayList = new ArrayList<>();
for (EtlJobConfig.EtlColumn column : baseIndex.columns) {
// note(wb): there are three data source for bitmap column
// case 1: global dict and binary data; needn't check
// case 2: bitmap hash function; this func is not supported in spark load now, so ignore it here
// case 3: origin value is a integer value; it should be checked use LongParser
if (StringUtils.equalsIgnoreCase(column.columnType, "bitmap")) {
if (dictBitmapColumnSet.contains(column.columnName.toLowerCase())) {
continue;
}
if (binaryBitmapColumnsSet.contains(column.columnName.toLowerCase())) {
continue;
}
columnNameNeedCheckArrayList.add(column);
columnParserArrayList.add(new BigIntParser());
} else if (!StringUtils.equalsIgnoreCase(column.columnType, "varchar")
&& !StringUtils.equalsIgnoreCase(column.columnType, "char")
&& !mappingColKeys.contains(column.columnName)) {
columnNameNeedCheckArrayList.add(column);
columnParserArrayList.add(ColumnParser.create(column));
}
}
ColumnParser[] columnParserArray = columnParserArrayList.toArray(new ColumnParser[0]);
EtlJobConfig.EtlColumn[] columnNameArray = columnNameNeedCheckArrayList.toArray(new EtlJobConfig.EtlColumn[0]);
StructType srcSchema = dataframe.schema();
JavaRDD<Row> result = dataframe.toJavaRDD().flatMap(new FlatMapFunction<Row, Row>() {
@Override
public Iterator<Row> call(Row row) throws Exception {
List<Row> result = new ArrayList<>();
Set<Integer> columnIndexNeedToRepalceNull = new HashSet<Integer>();
boolean validRow = true;
for (int i = 0; i < columnNameArray.length; i++) {
EtlJobConfig.EtlColumn column = columnNameArray[i];
int fieldIndex = row.fieldIndex(column.columnName);
Object value = row.get(fieldIndex);
if (value == null && !column.isAllowNull) {
validRow = false;
LOG.warn("column:" + i + " can not be null. row:" + row.toString());
break;
}
if (value != null && !columnParserArray[i].parse(value.toString())) {
if (isStrictMode) {
validRow = false;
LOG.warn(String.format("row parsed failed in strict mode, column name %s, src row %s",
column.columnName, row.toString()));
} else if (!column.isAllowNull) {
// a column parsed failed would be filled null,
// but if doris column is not allowed null, we should skip this row
validRow = false;
LOG.warn("column:" + i + " can not be null. row:" + row.toString());
break;
} else {
columnIndexNeedToRepalceNull.add(fieldIndex);
}
}
}
if (!validRow) {
abnormalRowAcc.add(1);
// at most add 5 rows to invalidRows
if (abnormalRowAcc.value() <= 5) {
invalidRows.add(row.toString());
}
} else if (!columnIndexNeedToRepalceNull.isEmpty()) {
scannedRowsAcc.add(1);
Object[] newRow = new Object[row.size()];
for (int i = 0; i < row.size(); i++) {
if (columnIndexNeedToRepalceNull.contains(i)) {
newRow[i] = null;
} else {
newRow[i] = row.get(i);
}
}
result.add(RowFactory.create(newRow));
} else {
scannedRowsAcc.add(1);
result.add(row);
}
return result.iterator();
}
});
// here we just check data but not do cast,
// so data type should be same with src schema which is hive table schema
return spark.createDataFrame(result, srcSchema);
}