in spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java [290:341]
private void processRollupTree(RollupTreeNode rootNode,
JavaPairRDD<List<Object>, Object[]> rootRDD,
long tableId, EtlJobConfig.EtlIndex baseIndex) throws SparkDppException {
Queue<RollupTreeNode> nodeQueue = new LinkedList<>();
nodeQueue.offer(rootNode);
int currentLevel = 0;
// level travel the tree
Map<Long, JavaPairRDD<List<Object>, Object[]>> parentRDDMap = new HashMap<>();
parentRDDMap.put(baseIndex.indexId, rootRDD);
Map<Long, JavaPairRDD<List<Object>, Object[]>> childrenRDDMap = new HashMap<>();
String pathPattern = etlJobConfig.outputPath + "/" + etlJobConfig.outputFilePattern;
while (!nodeQueue.isEmpty()) {
RollupTreeNode curNode = nodeQueue.poll();
LOG.info("start to process index:" + curNode.indexId);
if (curNode.children != null) {
for (RollupTreeNode child : curNode.children) {
nodeQueue.offer(child);
}
}
JavaPairRDD<List<Object>, Object[]> curRDD = null;
// column select for rollup
if (curNode.level != currentLevel) {
for (JavaPairRDD<List<Object>, Object[]> rdd : parentRDDMap.values()) {
rdd.unpersist();
}
currentLevel = curNode.level;
parentRDDMap.clear();
parentRDDMap = childrenRDDMap;
childrenRDDMap = new HashMap<>();
}
long parentIndexId = baseIndex.indexId;
if (curNode.parent != null) {
parentIndexId = curNode.parent.indexId;
}
JavaPairRDD<List<Object>, Object[]> parentRDD = parentRDDMap.get(parentIndexId);
// aggregate
SparkRDDAggregator[] sparkRDDAggregators = new SparkRDDAggregator[curNode.valueColumnNames.size()];
curRDD = processRDDAggregate(parentRDD, curNode, sparkRDDAggregators);
childrenRDDMap.put(curNode.indexId, curRDD);
if (curNode.children != null && curNode.children.size() > 1) {
// if the children number larger than 1, persist the dataframe for performance
curRDD.persist(StorageLevel.MEMORY_AND_DISK());
}
// repartition and write to hdfs
writeRepartitionAndSortedRDDToParquet(curRDD, pathPattern, tableId, curNode.indexMeta, sparkRDDAggregators);
}
}