private void process()

in spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java [1074:1184]


    private void process() throws Exception {
        try {
            for (Map.Entry<Long, EtlJobConfig.EtlTable> entry : etlJobConfig.tables.entrySet()) {
                Long tableId = entry.getKey();
                EtlJobConfig.EtlTable etlTable = entry.getValue();
                LOG.info("etlTable:" + etlTable);
                Set<String> dictBitmapColumnSet = tableToBitmapDictColumns.getOrDefault(tableId, new HashSet<>());
                Set<String> binaryBitmapColumnSet = tableToBinaryBitmapColumns.getOrDefault(tableId, new HashSet<>());

                // get the base index meta
                EtlJobConfig.EtlIndex baseIndex = null;
                for (EtlJobConfig.EtlIndex indexMeta : etlTable.indexes) {
                    if (indexMeta.isBaseIndex) {
                        baseIndex = indexMeta;
                        break;
                    }
                }

                // get key and partition column names and value column names separately
                List<String> keyAndPartitionColumnNames = new ArrayList<>();
                List<String> valueColumnNames = new ArrayList<>();
                for (EtlJobConfig.EtlColumn etlColumn : baseIndex.columns) {
                    if (etlColumn.isKey) {
                        keyAndPartitionColumnNames.add(etlColumn.columnName);
                    } else {
                        if (etlTable.partitionInfo.partitionColumnRefs.contains(etlColumn.columnName)) {
                            keyAndPartitionColumnNames.add(etlColumn.columnName);
                        }
                        valueColumnNames.add(etlColumn.columnName);
                    }
                }

                EtlJobConfig.EtlPartitionInfo partitionInfo = etlTable.partitionInfo;
                List<Integer> partitionKeyIndex = new ArrayList<Integer>();
                List<Class<?>> partitionKeySchema = new ArrayList<>();
                for (String key : partitionInfo.partitionColumnRefs) {
                    for (int i = 0; i < baseIndex.columns.size(); ++i) {
                        EtlJobConfig.EtlColumn column = baseIndex.columns.get(i);
                        if (column.columnName.equals(key)) {
                            partitionKeyIndex.add(keyAndPartitionColumnNames.indexOf(key));
                            partitionKeySchema.add(DppUtils.getClassFromColumn(column));
                            break;
                        }
                    }
                }
                Map<String, String> columnToType = baseIndex.columns.stream().collect(
                        Collectors.toMap(etlColumn -> etlColumn.columnName, etlColumn -> etlColumn.columnType));
                Map<Integer, String> partitionKeyIndexToType = new HashMap<>();
                for (int i = 0; i < partitionInfo.partitionColumnRefs.size(); i++) {
                    String partitionColumn = partitionInfo.partitionColumnRefs.get(i);
                    partitionKeyIndexToType.put(i, columnToType.get(partitionColumn));
                }
                List<DorisRangePartitioner.PartitionRangeKey> partitionRangeKeys
                        = createPartitionRangeKeys(partitionInfo, partitionKeySchema, partitionKeyIndexToType);
                StructType dstTableSchema = DppUtils.createDstTableSchema(baseIndex.columns, false, false);
                dstTableSchema = DppUtils.replaceBinaryColsInSchema(binaryBitmapColumnSet, dstTableSchema);
                RollupTreeBuilder rollupTreeParser = new MinimumCoverageRollupTreeBuilder();
                RollupTreeNode rootNode = rollupTreeParser.build(etlTable);
                LOG.info("Start to process rollup tree:" + rootNode);

                JavaPairRDD<List<Object>, Object[]> tablePairRDD = null;
                for (EtlJobConfig.EtlFileGroup fileGroup : etlTable.fileGroups) {
                    List<String> filePaths = fileGroup.filePaths;
                    Dataset<Row> fileGroupDataframe = null;
                    EtlJobConfig.SourceType sourceType = fileGroup.sourceType;
                    if (sourceType == EtlJobConfig.SourceType.FILE) {
                        fileGroupDataframe = loadDataFromFilePaths(
                                spark, baseIndex, filePaths, fileGroup, dstTableSchema);
                    } else if (sourceType == EtlJobConfig.SourceType.HIVE) {
                        fileGroupDataframe = loadDataFromHiveTable(spark, fileGroup.dppHiveDbTableName,
                                baseIndex, fileGroup, dstTableSchema, dictBitmapColumnSet, binaryBitmapColumnSet);
                    } else {
                        throw new RuntimeException("Unknown source type: " + sourceType.name());
                    }
                    if (fileGroupDataframe == null) {
                        LOG.info("no data for file file group:" + fileGroup);
                        continue;
                    }

                    JavaPairRDD<List<Object>, Object[]> ret = fillTupleWithPartitionColumn(
                            fileGroupDataframe,
                            partitionInfo, partitionKeyIndex,
                            partitionRangeKeys,
                            keyAndPartitionColumnNames, valueColumnNames,
                            dstTableSchema, baseIndex, fileGroup.partitions);
                    if (tablePairRDD == null) {
                        tablePairRDD = ret;
                    } else {
                        tablePairRDD.union(ret);
                    }
                }
                processRollupTree(rootNode, tablePairRDD, tableId, baseIndex);
            }
            LOG.info("invalid rows contents:" + invalidRows.value());
            dppResult.isSuccess = true;
            dppResult.failedReason = "";
        } catch (Exception exception) {
            LOG.warn("spark dpp failed for exception:" + exception);
            dppResult.isSuccess = false;
            dppResult.failedReason = exception.getMessage();
            throw exception;
        } finally {
            spark.stop();
            dppResult.normalRows = scannedRowsAcc.value() - abnormalRowAcc.value();
            dppResult.scannedRows = scannedRowsAcc.value();
            dppResult.fileNumber = fileNumberAcc.value();
            dppResult.fileSize = fileSizeAcc.value();
            dppResult.abnormalRows = abnormalRowAcc.value();
            dppResult.partialAbnormalRows = invalidRows.value();
        }
    }