private Dataset loadDataFromPath()

in spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java [620:758]


    private Dataset<Row> loadDataFromPath(SparkSession spark,
                                          EtlJobConfig.EtlFileGroup fileGroup,
                                          String fileUrl,
                                          EtlJobConfig.EtlIndex baseIndex,
                                          List<EtlJobConfig.EtlColumn> columns) throws SparkDppException {
        List<String> columnValueFromPath = DppUtils.parseColumnsFromPath(fileUrl, fileGroup.columnsFromPath);
        List<String> dataSrcColumns = fileGroup.fileFieldNames;
        if (dataSrcColumns == null) {
            // if there is no source columns info
            // use base index columns as source columns
            dataSrcColumns = new ArrayList<>();
            for (EtlJobConfig.EtlColumn column : baseIndex.columns) {
                dataSrcColumns.add(column.columnName);
            }
        }
        // for getting schema to check source data
        Map<String, Integer> dstColumnNameToIndex = new HashMap<String, Integer>();
        for (int i = 0; i < baseIndex.columns.size(); i++) {
            dstColumnNameToIndex.put(baseIndex.columns.get(i).columnName, i);
        }
        List<String> srcColumnsWithColumnsFromPath = new ArrayList<>(dataSrcColumns);
        if (fileGroup.columnsFromPath != null) {
            srcColumnsWithColumnsFromPath.addAll(fileGroup.columnsFromPath);
        }

        if ("parquet".equalsIgnoreCase(fileGroup.fileFormat)) {
            // parquet had its own schema, just use it; perhaps we could add some validation in future.
            Dataset<Row> dataFrame = spark.read().parquet(fileUrl);
            if (!CollectionUtils.isEmpty(columnValueFromPath)) {
                for (int k = 0; k < columnValueFromPath.size(); k++) {
                    dataFrame = dataFrame.withColumn(
                            fileGroup.columnsFromPath.get(k), functions.lit(columnValueFromPath.get(k)));
                }
            }
            if (!Strings.isNullOrEmpty(fileGroup.where)) {
                dataFrame = dataFrame.where(fileGroup.where);
            }
            return dataFrame;
        }

        if ("orc".equalsIgnoreCase(fileGroup.fileFormat)) {
            Dataset<Row> dataFrame = spark.read().orc(fileUrl);
            if (!CollectionUtils.isEmpty(columnValueFromPath)) {
                for (int k = 0; k < columnValueFromPath.size(); k++) {
                    dataFrame = dataFrame.withColumn(
                            fileGroup.columnsFromPath.get(k), functions.lit(columnValueFromPath.get(k)));
                }
            }
            if (!Strings.isNullOrEmpty(fileGroup.where)) {
                dataFrame = dataFrame.where(fileGroup.where);
            }
            return dataFrame;
        }

        StructType srcSchema = createScrSchema(srcColumnsWithColumnsFromPath);
        JavaRDD<String> sourceDataRdd = spark.read().textFile(fileUrl).toJavaRDD();
        int columnSize = dataSrcColumns.size();
        List<ColumnParser> parsers = new ArrayList<>();
        for (EtlJobConfig.EtlColumn column : baseIndex.columns) {
            parsers.add(ColumnParser.create(column));
        }
        char separator = (char) fileGroup.columnSeparator.getBytes(StandardCharsets.UTF_8)[0];
        JavaRDD<Row> rowRDD = sourceDataRdd.flatMap(
                record -> {
                    scannedRowsAcc.add(1);
                    String[] attributes = splitLine(record, separator);
                    List<Row> result = new ArrayList<>();
                    boolean validRow = true;
                    if (attributes.length != columnSize) {
                        LOG.warn("invalid src schema, data columns:"
                                + attributes.length + ", file group columns:"
                                + columnSize + ", row:" + record);
                        validRow = false;
                    } else {
                        for (int i = 0; i < attributes.length; ++i) {
                            StructField field = srcSchema.apply(i);
                            String srcColumnName = field.name();
                            if (attributes[i].equals(NULL_FLAG) && dstColumnNameToIndex.containsKey(srcColumnName)) {
                                if (baseIndex.columns.get(dstColumnNameToIndex.get(srcColumnName)).isAllowNull) {
                                    attributes[i] = null;
                                } else {
                                    LOG.warn("column name:" + srcColumnName + ", attribute: " + i
                                            + " can not be null. row:" + record);
                                    validRow = false;
                                    break;
                                }
                            }
                            boolean isStrictMode = etlJobConfig.properties.strictMode;
                            if (isStrictMode) {
                                if (dstColumnNameToIndex.containsKey(srcColumnName)) {
                                    int index = dstColumnNameToIndex.get(srcColumnName);
                                    String type = columns.get(index).columnType;
                                    if (type.equalsIgnoreCase("CHAR")
                                            || type.equalsIgnoreCase("VARCHAR")
                                            || fileGroup.columnMappings.containsKey(field.name())) {
                                        continue;
                                    }
                                    ColumnParser parser = parsers.get(index);
                                    boolean valid = parser.parse(attributes[i]);
                                    if (!valid) {
                                        validRow = false;
                                        LOG.warn("invalid row:" + record
                                                + ", attribute " + i + ": " + attributes[i] + " parsed failed");
                                        break;
                                    }
                                }
                            }
                        }
                    }
                    if (validRow) {
                        Row row = null;
                        if (fileGroup.columnsFromPath == null) {
                            row = RowFactory.create(attributes);
                        } else {
                            // process columns from path
                            // append columns from path to the tail
                            List<String> columnAttributes = new ArrayList<>();
                            columnAttributes.addAll(Arrays.asList(attributes));
                            columnAttributes.addAll(columnValueFromPath);
                            row = RowFactory.create(columnAttributes.toArray());
                        }
                        result.add(row);
                    } else {
                        abnormalRowAcc.add(1);
                        // at most add 5 rows to invalidRows
                        if (abnormalRowAcc.value() <= 5) {
                            invalidRows.add(record);
                        }
                    }
                    return result.iterator();
                }
        );

        Dataset<Row> dataframe = spark.createDataFrame(rowRDD, srcSchema);
        if (!Strings.isNullOrEmpty(fileGroup.where)) {
            dataframe = dataframe.where(fileGroup.where);
        }
        return dataframe;
    }