private String getEmrCode()

in spec/src/main/java/com/aliyun/dataworks/common/spec/domain/dw/nodemodel/DataWorksNodeCodeAdapter.java [233:327]


    private String getEmrCode(SpecScript script) {
        String command = script.getRuntime().getCommand();
        CodeModel<EmrCode> code = CodeModelFactory.getCodeModel(script.getRuntime().getCommand(), script.getContent());
        EmrCode codeModel = code.getCodeModel();
        codeModel.setSourceCode(script.getContent());
        Optional.ofNullable(CodeProgramType.getNodeTypeByName(command)).ifPresent(type -> {
            switch (type) {
                case EMR_SHELL: {
                    codeModel.setType(EmrJobType.SHELL);
                    break;
                }
                case EMR_STREAMING_SQL: {
                    codeModel.setType(EmrJobType.STREAMING_SQL);
                    break;
                }
                case EMR_HIVE: {
                    codeModel.setType(EmrJobType.HIVE_SQL);
                    break;
                }
                case EMR_HIVE_CLI: {
                    codeModel.setType(EmrJobType.HIVE);
                    break;
                }
                case EMR_MR: {
                    codeModel.setType(EmrJobType.MR);
                    break;
                }
                case EMR_IMPALA: {
                    codeModel.setType(EmrJobType.IMPALA_SQL);
                    break;
                }
                case EMR_PRESTO: {
                    codeModel.setType(EmrJobType.PRESTO_SQL);
                    break;
                }
                case EMR_SPARK_SQL: {
                    codeModel.setType(EmrJobType.SPARK_SQL);
                    break;
                }
                case EMR_SPARK: {
                    codeModel.setType(EmrJobType.SPARK);
                    break;
                }
                case EMR_SPARK_SHELL: {
                    codeModel.setType(EmrJobType.SPARK_SHELL);
                    break;
                }
                case EMR_SPARK_STREAMING: {
                    codeModel.setType(EmrJobType.SPARK_STREAMING);
                    break;
                }
            }
        });
        codeModel.setName(UUID.randomUUID().toString());

        EmrLauncher launcher = new EmrLauncher();
        Optional.ofNullable(script.getRuntime()).ifPresent(rt -> {
            Map<String, Object> allocationSpecProps = new HashMap<>();
            launcher.setAllocationSpec(allocationSpecProps);
            Optional.ofNullable(rt.getSparkConf()).filter(MapUtils::isNotEmpty).ifPresent(allocationSpecProps::putAll);

            Optional.ofNullable(rt.getEmrJobConfig()).filter(MapUtils::isNotEmpty).ifPresent(emrJobConfig -> {
                EmrAllocationSpec allocationSpec = new EmrAllocationSpec();
                allocationSpec.setUserName((String)emrJobConfig.get("submitter"));
                allocationSpec.setQueue(Optional.ofNullable((String)emrJobConfig.get("queue")).filter(StringUtils::isNotBlank).orElse("default"));
                allocationSpec.setMemory(Optional.ofNullable(emrJobConfig.get("memory")).map(String::valueOf).orElse("2048"));
                allocationSpec.setVcores(Optional.ofNullable(emrJobConfig.get("cores")).map(String::valueOf).orElse("1"));
                allocationSpec.setPriority(Optional.ofNullable(emrJobConfig.get("priority")).map(String::valueOf).orElse("1"));
                allocationSpec.setUseGateway(Optional.ofNullable((String)emrJobConfig.get("submitMode"))
                    .map(mode -> LabelEnum.getByLabel(EmrJobSubmitMode.class, mode))
                    .map(mode -> Objects.equals(mode, EmrJobSubmitMode.LOCAL))
                    .orElse(null));
                allocationSpec.setReuseSession(Optional.ofNullable(emrJobConfig.get("sessionEnabled"))
                    .map(String::valueOf).map(BooleanUtils::toBoolean)
                    .orElse(null));
                allocationSpec.setBatchMode(Optional.ofNullable((String)emrJobConfig.get("executeMode"))
                    .map(mode -> LabelEnum.getByLabel(EmrJobExecuteMode.class, mode))
                    .map(mode -> Objects.equals(mode, EmrJobExecuteMode.BATCH))
                    .orElse(false));
                allocationSpec.setEnableJdbcSql(Optional.ofNullable(emrJobConfig.get("enableJdbcSql"))
                    .map(String::valueOf).map(BooleanUtils::toBoolean)
                    .orElse(null));
                codeModel.getProperties().getEnvs().put(EmrCode.ENVS_KEY_FLOW_SKIP_SQL_ANALYZE, String.valueOf(allocationSpec.getBatchMode()));
                Optional.ofNullable((JsonObject)GsonUtils.fromJsonString(GsonUtils.toJsonString(allocationSpec), JsonObject.class)).ifPresent(json ->
                    json.entrySet().stream()
                        .filter(ent -> ent.getValue() != null && !ent.getValue().isJsonNull())
                        .forEach(entry -> allocationSpecProps.put(entry.getKey(), entry.getValue().getAsJsonPrimitive())));
                emrJobConfig.keySet().stream().filter(key -> ListUtils.emptyIfNull(ReflectUtils.getPropertyFields(allocationSpec)).stream()
                    .noneMatch(f -> StringUtils.equals(f.getName(), key))).forEach(key -> allocationSpecProps.put(key, emrJobConfig.get(key)));
            });
        });
        codeModel.setLauncher(launcher);
        codeModel.getProperties().setTags(Arrays.asList(ProductModule.DATA_STUDIO.getName(), SpecConstants.FLOW_SPEC + "/" + SpecVersion.V_1_1_0));
        return code.getContent();
    }