private Dataset checkDataFromHiveWithStrictMode()

in spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java [980:1072]


    private Dataset<Row> checkDataFromHiveWithStrictMode(Dataset<Row> dataframe, EtlJobConfig.EtlIndex baseIndex,
                                                         Set<String> mappingColKeys, boolean isStrictMode,
                                                         StructType dstTableSchema,
                                                         Set<String> dictBitmapColumnSet,
                                                         Set<String> binaryBitmapColumnsSet) throws SparkDppException {
        List<EtlJobConfig.EtlColumn> columnNameNeedCheckArrayList = new ArrayList<>();
        List<ColumnParser> columnParserArrayList = new ArrayList<>();
        for (EtlJobConfig.EtlColumn column : baseIndex.columns) {
            // note(wb): there are three data source for bitmap column
            // case 1: global dict and binary data; needn't check
            // case 2: bitmap hash function; this func is not supported in spark load now, so ignore it here
            // case 3: origin value is a integer value; it should be checked use LongParser
            if (StringUtils.equalsIgnoreCase(column.columnType, "bitmap")) {
                if (dictBitmapColumnSet.contains(column.columnName.toLowerCase())) {
                    continue;
                }
                if (binaryBitmapColumnsSet.contains(column.columnName.toLowerCase())) {
                    continue;
                }
                columnNameNeedCheckArrayList.add(column);
                columnParserArrayList.add(new BigIntParser());
            } else if (!StringUtils.equalsIgnoreCase(column.columnType, "varchar")
                    && !StringUtils.equalsIgnoreCase(column.columnType, "char")
                    && !mappingColKeys.contains(column.columnName)) {
                columnNameNeedCheckArrayList.add(column);
                columnParserArrayList.add(ColumnParser.create(column));
            }
        }

        ColumnParser[] columnParserArray = columnParserArrayList.toArray(new ColumnParser[0]);
        EtlJobConfig.EtlColumn[] columnNameArray = columnNameNeedCheckArrayList.toArray(new EtlJobConfig.EtlColumn[0]);

        StructType srcSchema = dataframe.schema();
        JavaRDD<Row> result = dataframe.toJavaRDD().flatMap(new FlatMapFunction<Row, Row>() {
            @Override
            public Iterator<Row> call(Row row) throws Exception {
                List<Row> result = new ArrayList<>();
                Set<Integer> columnIndexNeedToRepalceNull = new HashSet<Integer>();
                boolean validRow = true;
                for (int i = 0; i < columnNameArray.length; i++) {
                    EtlJobConfig.EtlColumn column = columnNameArray[i];
                    int fieldIndex = row.fieldIndex(column.columnName);
                    Object value = row.get(fieldIndex);
                    if (value == null && !column.isAllowNull) {
                        validRow = false;
                        LOG.warn("column:" + i + " can not be null. row:" + row.toString());
                        break;
                    }
                    if (value != null && !columnParserArray[i].parse(value.toString())) {
                        if (isStrictMode) {
                            validRow = false;
                            LOG.warn(String.format("row parsed failed in strict mode, column name %s, src row %s",
                                    column.columnName, row.toString()));
                        } else if (!column.isAllowNull) {
                            // a column parsed failed would be filled null,
                            // but if doris column is not allowed null, we should skip this row
                            validRow = false;
                            LOG.warn("column:" + i + " can not be null. row:" + row.toString());
                            break;
                        } else {
                            columnIndexNeedToRepalceNull.add(fieldIndex);
                        }
                    }
                }
                if (!validRow) {
                    abnormalRowAcc.add(1);
                    // at most add 5 rows to invalidRows
                    if (abnormalRowAcc.value() <= 5) {
                        invalidRows.add(row.toString());
                    }
                } else if (!columnIndexNeedToRepalceNull.isEmpty()) {
                    scannedRowsAcc.add(1);
                    Object[] newRow = new Object[row.size()];
                    for (int i = 0; i < row.size(); i++) {
                        if (columnIndexNeedToRepalceNull.contains(i)) {
                            newRow[i] = null;
                        } else {
                            newRow[i] = row.get(i);
                        }
                    }
                    result.add(RowFactory.create(newRow));
                } else {
                    scannedRowsAcc.add(1);
                    result.add(row);
                }
                return result.iterator();
            }
        });

        // here we just check data but not do cast,
        // so data type should be same with src schema which is hive table schema
        return spark.createDataFrame(result, srcSchema);
    }