in spark-load/spark-load-core/src/main/java/org/apache/doris/common/meta/LoadMeta.java [48:101]
public EtlJobConfig getEtlJobConfig(JobConfig jobConfig) throws SparkLoadException {
Map<Long, EtlJobConfig.EtlTable> tables = new HashMap<>();
for (Map.Entry<String, TableMeta> entry : getTableMeta().entrySet()) {
String name = entry.getKey();
TableMeta meta = entry.getValue();
EtlJobConfig.EtlTable etlTable = new EtlJobConfig.EtlTable(meta.getIndexes().stream().map(
TableMeta.EtlIndex::toEtlIndex).collect(Collectors.toList()),
meta.getPartitionInfo().toEtlPartitionInfo());
JobConfig.TaskInfo taskInfo = jobConfig.getLoadTasks().get(name);
EtlJobConfig.EtlFileGroup fileGroup;
Map<String, EtlJobConfig.EtlColumnMapping> columnMappingMap = taskInfo.toEtlColumnMappingMap();
checkMapping(etlTable, columnMappingMap);
List<Long> partitionIds = meta.getPartitionInfo().partitions.stream()
.map(p -> p.partitionId).collect(Collectors.toList());
switch (taskInfo.getType()) {
case HIVE:
Map<String, String> properties = new HashMap<>(jobConfig.getHadoopProperties());
properties.put(Constants.HIVE_METASTORE_URIS, taskInfo.getHiveMetastoreUris());
fileGroup =
new EtlJobConfig.EtlFileGroup(EtlJobConfig.SourceType.HIVE, taskInfo.getHiveFullTableName(),
properties, false, columnMappingMap, taskInfo.getWhere(),
partitionIds);
break;
case FILE:
List<String> columnList = Collections.emptyList();
if (StringUtils.isNoneBlank(taskInfo.getColumns())) {
columnList = Arrays.stream(taskInfo.getColumns().split(",")).collect(Collectors.toList());
}
List<String> columnFromPathList = Collections.emptyList();
if (StringUtils.isNoneBlank(taskInfo.getColumnFromPath())) {
columnFromPathList =
Arrays.stream(taskInfo.getColumnFromPath().split(",")).collect(Collectors.toList());
}
fileGroup =
new EtlJobConfig.EtlFileGroup(EtlJobConfig.SourceType.FILE, taskInfo.getPaths(), columnList,
columnFromPathList, taskInfo.getFieldSep(), taskInfo.getLineDelim(), false,
taskInfo.getFormat(), columnMappingMap, taskInfo.getWhere(), partitionIds);
break;
default:
throw new IllegalArgumentException("Unsupported task type: " + taskInfo.getType());
}
etlTable.addFileGroup(fileGroup);
tables.put(meta.getId(), etlTable);
}
String outputFilePattern = EtlJobConfig.getOutputFilePattern(jobConfig.getLabel(),
EtlJobConfig.FilePatternVersion.V1);
String label = jobConfig.getLabel();
EtlJobConfig.EtlJobProperty properties = new EtlJobConfig.EtlJobProperty();
EtlJobConfig etlJobConfig = new EtlJobConfig(tables, outputFilePattern, label, properties);
etlJobConfig.outputPath =
EtlJobConfig.getOutputPath(jobConfig.getWorkingDir(), getDbId(), label,
getSignature());
return etlJobConfig;
}