in spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/etl/SparkEtlJob.java [182:237]
private String buildGlobalDictAndEncodeSourceTable(EtlTable table, long tableId) {
// dict column map
MultiValueMap dictColumnMap = new MultiValueMap();
for (String dictColumn : tableToBitmapDictColumns.get(tableId)) {
dictColumnMap.put(dictColumn, null);
}
// doris schema
List<String> dorisOlapTableColumnList = Lists.newArrayList();
for (EtlIndex etlIndex : table.indexes) {
if (etlIndex.isBaseIndex) {
for (EtlColumn column : etlIndex.columns) {
dorisOlapTableColumnList.add(column.columnName);
}
}
}
// hive db and tables
EtlFileGroup fileGroup = table.fileGroups.get(0);
String sourceHiveDBTableName = fileGroup.hiveDbTableName;
String dorisHiveDB = sourceHiveDBTableName.split("\\.")[0];
String taskId = etlJobConfig.outputPath.substring(etlJobConfig.outputPath.lastIndexOf("/") + 1);
String globalDictTableName = String.format(EtlJobConfig.GLOBAL_DICT_TABLE_NAME, tableId);
String distinctKeyTableName = String.format(EtlJobConfig.DISTINCT_KEY_TABLE_NAME, tableId, taskId);
String dorisIntermediateHiveTable = String.format(
EtlJobConfig.DORIS_INTERMEDIATE_HIVE_TABLE_NAME, tableId, taskId);
String sourceHiveFilter = fileGroup.where;
// others
List<String> mapSideJoinColumns = Lists.newArrayList();
int buildConcurrency = 1;
List<String> veryHighCardinalityColumn = Lists.newArrayList();
int veryHighCardinalityColumnSplitNum = 1;
LOG.info("global dict builder args, dictColumnMap: " + dictColumnMap
+ ", dorisOlapTableColumnList: " + dorisOlapTableColumnList
+ ", sourceHiveDBTableName: " + sourceHiveDBTableName
+ ", sourceHiveFilter: " + sourceHiveFilter
+ ", distinctKeyTableName: " + distinctKeyTableName
+ ", globalDictTableName: " + globalDictTableName
+ ", dorisIntermediateHiveTable: " + dorisIntermediateHiveTable);
try {
GlobalDictBuilder globalDictBuilder = new GlobalDictBuilder(dictColumnMap, dorisOlapTableColumnList,
mapSideJoinColumns, sourceHiveDBTableName, sourceHiveFilter, dorisHiveDB, distinctKeyTableName,
globalDictTableName, dorisIntermediateHiveTable, buildConcurrency, veryHighCardinalityColumn,
veryHighCardinalityColumnSplitNum, spark);
globalDictBuilder.createHiveIntermediateTable();
globalDictBuilder.extractDistinctColumn();
globalDictBuilder.buildGlobalDict();
globalDictBuilder.encodeDorisIntermediateHiveTable();
} catch (Exception e) {
throw new RuntimeException(e);
}
return String.format("%s.%s", dorisHiveDB, dorisIntermediateHiveTable);
}