in spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java [1074:1184]
private void process() throws Exception {
try {
for (Map.Entry<Long, EtlJobConfig.EtlTable> entry : etlJobConfig.tables.entrySet()) {
Long tableId = entry.getKey();
EtlJobConfig.EtlTable etlTable = entry.getValue();
LOG.info("etlTable:" + etlTable);
Set<String> dictBitmapColumnSet = tableToBitmapDictColumns.getOrDefault(tableId, new HashSet<>());
Set<String> binaryBitmapColumnSet = tableToBinaryBitmapColumns.getOrDefault(tableId, new HashSet<>());
// get the base index meta
EtlJobConfig.EtlIndex baseIndex = null;
for (EtlJobConfig.EtlIndex indexMeta : etlTable.indexes) {
if (indexMeta.isBaseIndex) {
baseIndex = indexMeta;
break;
}
}
// get key and partition column names and value column names separately
List<String> keyAndPartitionColumnNames = new ArrayList<>();
List<String> valueColumnNames = new ArrayList<>();
for (EtlJobConfig.EtlColumn etlColumn : baseIndex.columns) {
if (etlColumn.isKey) {
keyAndPartitionColumnNames.add(etlColumn.columnName);
} else {
if (etlTable.partitionInfo.partitionColumnRefs.contains(etlColumn.columnName)) {
keyAndPartitionColumnNames.add(etlColumn.columnName);
}
valueColumnNames.add(etlColumn.columnName);
}
}
EtlJobConfig.EtlPartitionInfo partitionInfo = etlTable.partitionInfo;
List<Integer> partitionKeyIndex = new ArrayList<Integer>();
List<Class<?>> partitionKeySchema = new ArrayList<>();
for (String key : partitionInfo.partitionColumnRefs) {
for (int i = 0; i < baseIndex.columns.size(); ++i) {
EtlJobConfig.EtlColumn column = baseIndex.columns.get(i);
if (column.columnName.equals(key)) {
partitionKeyIndex.add(keyAndPartitionColumnNames.indexOf(key));
partitionKeySchema.add(DppUtils.getClassFromColumn(column));
break;
}
}
}
Map<String, String> columnToType = baseIndex.columns.stream().collect(
Collectors.toMap(etlColumn -> etlColumn.columnName, etlColumn -> etlColumn.columnType));
Map<Integer, String> partitionKeyIndexToType = new HashMap<>();
for (int i = 0; i < partitionInfo.partitionColumnRefs.size(); i++) {
String partitionColumn = partitionInfo.partitionColumnRefs.get(i);
partitionKeyIndexToType.put(i, columnToType.get(partitionColumn));
}
List<DorisRangePartitioner.PartitionRangeKey> partitionRangeKeys
= createPartitionRangeKeys(partitionInfo, partitionKeySchema, partitionKeyIndexToType);
StructType dstTableSchema = DppUtils.createDstTableSchema(baseIndex.columns, false, false);
dstTableSchema = DppUtils.replaceBinaryColsInSchema(binaryBitmapColumnSet, dstTableSchema);
RollupTreeBuilder rollupTreeParser = new MinimumCoverageRollupTreeBuilder();
RollupTreeNode rootNode = rollupTreeParser.build(etlTable);
LOG.info("Start to process rollup tree:" + rootNode);
JavaPairRDD<List<Object>, Object[]> tablePairRDD = null;
for (EtlJobConfig.EtlFileGroup fileGroup : etlTable.fileGroups) {
List<String> filePaths = fileGroup.filePaths;
Dataset<Row> fileGroupDataframe = null;
EtlJobConfig.SourceType sourceType = fileGroup.sourceType;
if (sourceType == EtlJobConfig.SourceType.FILE) {
fileGroupDataframe = loadDataFromFilePaths(
spark, baseIndex, filePaths, fileGroup, dstTableSchema);
} else if (sourceType == EtlJobConfig.SourceType.HIVE) {
fileGroupDataframe = loadDataFromHiveTable(spark, fileGroup.dppHiveDbTableName,
baseIndex, fileGroup, dstTableSchema, dictBitmapColumnSet, binaryBitmapColumnSet);
} else {
throw new RuntimeException("Unknown source type: " + sourceType.name());
}
if (fileGroupDataframe == null) {
LOG.info("no data for file file group:" + fileGroup);
continue;
}
JavaPairRDD<List<Object>, Object[]> ret = fillTupleWithPartitionColumn(
fileGroupDataframe,
partitionInfo, partitionKeyIndex,
partitionRangeKeys,
keyAndPartitionColumnNames, valueColumnNames,
dstTableSchema, baseIndex, fileGroup.partitions);
if (tablePairRDD == null) {
tablePairRDD = ret;
} else {
tablePairRDD.union(ret);
}
}
processRollupTree(rootNode, tablePairRDD, tableId, baseIndex);
}
LOG.info("invalid rows contents:" + invalidRows.value());
dppResult.isSuccess = true;
dppResult.failedReason = "";
} catch (Exception exception) {
LOG.warn("spark dpp failed for exception:" + exception);
dppResult.isSuccess = false;
dppResult.failedReason = exception.getMessage();
throw exception;
} finally {
spark.stop();
dppResult.normalRows = scannedRowsAcc.value() - abnormalRowAcc.value();
dppResult.scannedRows = scannedRowsAcc.value();
dppResult.fileNumber = fileNumberAcc.value();
dppResult.fileSize = fileSizeAcc.value();
dppResult.abnormalRows = abnormalRowAcc.value();
dppResult.partialAbnormalRows = invalidRows.value();
}
}