public EtlJobConfig getEtlJobConfig()

in spark-load/spark-load-core/src/main/java/org/apache/doris/common/meta/LoadMeta.java [48:101]


    public EtlJobConfig getEtlJobConfig(JobConfig jobConfig) throws SparkLoadException {
        Map<Long, EtlJobConfig.EtlTable> tables = new HashMap<>();
        for (Map.Entry<String, TableMeta> entry : getTableMeta().entrySet()) {
            String name = entry.getKey();
            TableMeta meta = entry.getValue();
            EtlJobConfig.EtlTable etlTable = new EtlJobConfig.EtlTable(meta.getIndexes().stream().map(
                    TableMeta.EtlIndex::toEtlIndex).collect(Collectors.toList()),
                    meta.getPartitionInfo().toEtlPartitionInfo());
            JobConfig.TaskInfo taskInfo = jobConfig.getLoadTasks().get(name);
            EtlJobConfig.EtlFileGroup fileGroup;
            Map<String, EtlJobConfig.EtlColumnMapping> columnMappingMap = taskInfo.toEtlColumnMappingMap();
            checkMapping(etlTable, columnMappingMap);
            List<Long> partitionIds = meta.getPartitionInfo().partitions.stream()
                    .map(p -> p.partitionId).collect(Collectors.toList());
            switch (taskInfo.getType()) {
                case HIVE:
                    Map<String, String> properties = new HashMap<>(jobConfig.getHadoopProperties());
                    properties.put(Constants.HIVE_METASTORE_URIS, taskInfo.getHiveMetastoreUris());
                    fileGroup =
                            new EtlJobConfig.EtlFileGroup(EtlJobConfig.SourceType.HIVE, taskInfo.getHiveFullTableName(),
                                    properties, false, columnMappingMap, taskInfo.getWhere(),
                                    partitionIds);
                    break;
                case FILE:
                    List<String> columnList = Collections.emptyList();
                    if (StringUtils.isNoneBlank(taskInfo.getColumns())) {
                        columnList = Arrays.stream(taskInfo.getColumns().split(",")).collect(Collectors.toList());
                    }
                    List<String> columnFromPathList = Collections.emptyList();
                    if (StringUtils.isNoneBlank(taskInfo.getColumnFromPath())) {
                        columnFromPathList =
                                Arrays.stream(taskInfo.getColumnFromPath().split(",")).collect(Collectors.toList());
                    }
                    fileGroup =
                            new EtlJobConfig.EtlFileGroup(EtlJobConfig.SourceType.FILE, taskInfo.getPaths(), columnList,
                                    columnFromPathList, taskInfo.getFieldSep(), taskInfo.getLineDelim(), false,
                                    taskInfo.getFormat(), columnMappingMap, taskInfo.getWhere(), partitionIds);
                    break;
                default:
                    throw new IllegalArgumentException("Unsupported task type: " + taskInfo.getType());
            }
            etlTable.addFileGroup(fileGroup);
            tables.put(meta.getId(), etlTable);
        }
        String outputFilePattern = EtlJobConfig.getOutputFilePattern(jobConfig.getLabel(),
                EtlJobConfig.FilePatternVersion.V1);
        String label = jobConfig.getLabel();
        EtlJobConfig.EtlJobProperty properties = new EtlJobConfig.EtlJobProperty();
        EtlJobConfig etlJobConfig = new EtlJobConfig(tables, outputFilePattern, label, properties);
        etlJobConfig.outputPath =
                EtlJobConfig.getOutputPath(jobConfig.getWorkingDir(), getDbId(), label,
                        getSignature());
        return etlJobConfig;
    }