in dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java [95:199]
public void buildK8sJob(K8sTaskMainParameters k8STaskMainParameters) {
String taskInstanceId = String.valueOf(taskRequest.getTaskInstanceId());
String taskName = taskRequest.getTaskName().toLowerCase(Locale.ROOT);
String image = k8STaskMainParameters.getImage();
String pullSecret = k8STaskMainParameters.getPullSecret();
String namespaceName = k8STaskMainParameters.getNamespaceName();
String imagePullPolicy = k8STaskMainParameters.getImagePullPolicy();
Map<String, String> otherParams = k8STaskMainParameters.getParamsMap();
Double podMem = k8STaskMainParameters.getMinMemorySpace();
Double podCpu = k8STaskMainParameters.getMinCpuCores();
Double limitPodMem = podMem * 2;
Double limitPodCpu = podCpu * 2;
int retryNum = 0;
String k8sJobName = String.format("%s-%s", taskName, taskInstanceId);
Map<String, Quantity> reqRes = new HashMap<>();
reqRes.put(MEMORY, new Quantity(String.format("%s%s", podMem, MI)));
reqRes.put(CPU, new Quantity(String.valueOf(podCpu)));
Map<String, Quantity> limitRes = new HashMap<>();
limitRes.put(MEMORY, new Quantity(String.format("%s%s", limitPodMem, MI)));
limitRes.put(CPU, new Quantity(String.valueOf(limitPodCpu)));
Map<String, String> labelMap = k8STaskMainParameters.getLabelMap();
Map<String, String> jobLabelMap = new HashMap<>();
jobLabelMap.put(LAYER_LABEL, LAYER_LABEL_VALUE);
jobLabelMap.put(NAME_LABEL, k8sJobName);
jobLabelMap.putAll(labelMap);
Map<String, String> podLabelMap = new HashMap<>();
podLabelMap.put(UNIQUE_LABEL_NAME, taskRequest.getTaskAppId());
podLabelMap.putAll(labelMap);
EnvVar taskInstanceIdVar = new EnvVar(TASK_INSTANCE_ID, taskInstanceId, null);
List<EnvVar> envVars = new ArrayList<>();
envVars.add(taskInstanceIdVar);
if (MapUtils.isNotEmpty(otherParams)) {
for (Map.Entry<String, String> entry : otherParams.entrySet()) {
String param = entry.getKey();
String paramValue = entry.getValue();
EnvVar envVar = new EnvVar(param, paramValue, null);
envVars.add(envVar);
}
}
String commandString = k8STaskMainParameters.getCommand();
String argsString = k8STaskMainParameters.getArgs();
List<String> commands = new ArrayList<>();
List<String> args = new ArrayList<>();
try {
if (!StringUtils.isEmpty(commandString)) {
commands = yaml.load(commandString.trim());
}
if (!StringUtils.isEmpty(argsString)) {
args = yaml.load(argsString.trim());
}
} catch (Exception e) {
throw new TaskException("Parse yaml-like commands and args failed", e);
}
NodeSelectorTerm nodeSelectorTerm = new NodeSelectorTerm();
nodeSelectorTerm.setMatchExpressions(k8STaskMainParameters.getNodeSelectorRequirements());
Affinity affinity = k8STaskMainParameters.getNodeSelectorRequirements().size() == 0 ? null
: new AffinityBuilder()
.withNewNodeAffinity()
.withNewRequiredDuringSchedulingIgnoredDuringExecution()
.addNewNodeSelectorTermLike(nodeSelectorTerm)
.endNodeSelectorTerm()
.endRequiredDuringSchedulingIgnoredDuringExecution()
.endNodeAffinity().build();
job = new JobBuilder()
.withApiVersion(API_VERSION)
.withNewMetadata()
.withName(k8sJobName)
.withLabels(jobLabelMap)
.withNamespace(namespaceName)
.endMetadata()
.withNewSpec()
.withTtlSecondsAfterFinished(JOB_TTL_SECONDS)
.withNewTemplate()
.withNewMetadata()
.withLabels(podLabelMap)
.endMetadata()
.withNewSpec()
.addNewContainer()
.withName(k8sJobName)
.withImage(image)
.withCommand(commands.size() == 0 ? null : commands)
.withArgs(args.size() == 0 ? null : args)
.withImagePullPolicy(imagePullPolicy)
.withResources(new ResourceRequirements(limitRes, reqRes))
.withEnv(envVars)
.endContainer()
.withImagePullSecrets(
StringUtils.isEmpty(pullSecret) ? null : singletonList(new LocalObjectReference(pullSecret)))
.withRestartPolicy(RESTART_POLICY)
.withAffinity(affinity)
.endSpec()
.endTemplate()
.withBackoffLimit(retryNum)
.endSpec()
.build();
}