in spec/src/main/java/com/aliyun/dataworks/common/spec/domain/dw/nodemodel/DataWorksNodeCodeAdapter.java [233:327]
private String getEmrCode(SpecScript script) {
String command = script.getRuntime().getCommand();
CodeModel<EmrCode> code = CodeModelFactory.getCodeModel(script.getRuntime().getCommand(), script.getContent());
EmrCode codeModel = code.getCodeModel();
codeModel.setSourceCode(script.getContent());
Optional.ofNullable(CodeProgramType.getNodeTypeByName(command)).ifPresent(type -> {
switch (type) {
case EMR_SHELL: {
codeModel.setType(EmrJobType.SHELL);
break;
}
case EMR_STREAMING_SQL: {
codeModel.setType(EmrJobType.STREAMING_SQL);
break;
}
case EMR_HIVE: {
codeModel.setType(EmrJobType.HIVE_SQL);
break;
}
case EMR_HIVE_CLI: {
codeModel.setType(EmrJobType.HIVE);
break;
}
case EMR_MR: {
codeModel.setType(EmrJobType.MR);
break;
}
case EMR_IMPALA: {
codeModel.setType(EmrJobType.IMPALA_SQL);
break;
}
case EMR_PRESTO: {
codeModel.setType(EmrJobType.PRESTO_SQL);
break;
}
case EMR_SPARK_SQL: {
codeModel.setType(EmrJobType.SPARK_SQL);
break;
}
case EMR_SPARK: {
codeModel.setType(EmrJobType.SPARK);
break;
}
case EMR_SPARK_SHELL: {
codeModel.setType(EmrJobType.SPARK_SHELL);
break;
}
case EMR_SPARK_STREAMING: {
codeModel.setType(EmrJobType.SPARK_STREAMING);
break;
}
}
});
codeModel.setName(UUID.randomUUID().toString());
EmrLauncher launcher = new EmrLauncher();
Optional.ofNullable(script.getRuntime()).ifPresent(rt -> {
Map<String, Object> allocationSpecProps = new HashMap<>();
launcher.setAllocationSpec(allocationSpecProps);
Optional.ofNullable(rt.getSparkConf()).filter(MapUtils::isNotEmpty).ifPresent(allocationSpecProps::putAll);
Optional.ofNullable(rt.getEmrJobConfig()).filter(MapUtils::isNotEmpty).ifPresent(emrJobConfig -> {
EmrAllocationSpec allocationSpec = new EmrAllocationSpec();
allocationSpec.setUserName((String)emrJobConfig.get("submitter"));
allocationSpec.setQueue(Optional.ofNullable((String)emrJobConfig.get("queue")).filter(StringUtils::isNotBlank).orElse("default"));
allocationSpec.setMemory(Optional.ofNullable(emrJobConfig.get("memory")).map(String::valueOf).orElse("2048"));
allocationSpec.setVcores(Optional.ofNullable(emrJobConfig.get("cores")).map(String::valueOf).orElse("1"));
allocationSpec.setPriority(Optional.ofNullable(emrJobConfig.get("priority")).map(String::valueOf).orElse("1"));
allocationSpec.setUseGateway(Optional.ofNullable((String)emrJobConfig.get("submitMode"))
.map(mode -> LabelEnum.getByLabel(EmrJobSubmitMode.class, mode))
.map(mode -> Objects.equals(mode, EmrJobSubmitMode.LOCAL))
.orElse(null));
allocationSpec.setReuseSession(Optional.ofNullable(emrJobConfig.get("sessionEnabled"))
.map(String::valueOf).map(BooleanUtils::toBoolean)
.orElse(null));
allocationSpec.setBatchMode(Optional.ofNullable((String)emrJobConfig.get("executeMode"))
.map(mode -> LabelEnum.getByLabel(EmrJobExecuteMode.class, mode))
.map(mode -> Objects.equals(mode, EmrJobExecuteMode.BATCH))
.orElse(false));
allocationSpec.setEnableJdbcSql(Optional.ofNullable(emrJobConfig.get("enableJdbcSql"))
.map(String::valueOf).map(BooleanUtils::toBoolean)
.orElse(null));
codeModel.getProperties().getEnvs().put(EmrCode.ENVS_KEY_FLOW_SKIP_SQL_ANALYZE, String.valueOf(allocationSpec.getBatchMode()));
Optional.ofNullable((JsonObject)GsonUtils.fromJsonString(GsonUtils.toJsonString(allocationSpec), JsonObject.class)).ifPresent(json ->
json.entrySet().stream()
.filter(ent -> ent.getValue() != null && !ent.getValue().isJsonNull())
.forEach(entry -> allocationSpecProps.put(entry.getKey(), entry.getValue().getAsJsonPrimitive())));
emrJobConfig.keySet().stream().filter(key -> ListUtils.emptyIfNull(ReflectUtils.getPropertyFields(allocationSpec)).stream()
.noneMatch(f -> StringUtils.equals(f.getName(), key))).forEach(key -> allocationSpecProps.put(key, emrJobConfig.get(key)));
});
});
codeModel.setLauncher(launcher);
codeModel.getProperties().setTags(Arrays.asList(ProductModule.DATA_STUDIO.getName(), SpecConstants.FLOW_SPEC + "/" + SpecVersion.V_1_1_0));
return code.getContent();
}