public void buildK8sJob()

in dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java [95:199]


    public void buildK8sJob(K8sTaskMainParameters k8STaskMainParameters) {
        String taskInstanceId = String.valueOf(taskRequest.getTaskInstanceId());
        String taskName = taskRequest.getTaskName().toLowerCase(Locale.ROOT);
        String image = k8STaskMainParameters.getImage();
        String pullSecret = k8STaskMainParameters.getPullSecret();
        String namespaceName = k8STaskMainParameters.getNamespaceName();
        String imagePullPolicy = k8STaskMainParameters.getImagePullPolicy();
        Map<String, String> otherParams = k8STaskMainParameters.getParamsMap();
        Double podMem = k8STaskMainParameters.getMinMemorySpace();
        Double podCpu = k8STaskMainParameters.getMinCpuCores();
        Double limitPodMem = podMem * 2;
        Double limitPodCpu = podCpu * 2;
        int retryNum = 0;
        String k8sJobName = String.format("%s-%s", taskName, taskInstanceId);
        Map<String, Quantity> reqRes = new HashMap<>();
        reqRes.put(MEMORY, new Quantity(String.format("%s%s", podMem, MI)));
        reqRes.put(CPU, new Quantity(String.valueOf(podCpu)));
        Map<String, Quantity> limitRes = new HashMap<>();
        limitRes.put(MEMORY, new Quantity(String.format("%s%s", limitPodMem, MI)));
        limitRes.put(CPU, new Quantity(String.valueOf(limitPodCpu)));

        Map<String, String> labelMap = k8STaskMainParameters.getLabelMap();
        Map<String, String> jobLabelMap = new HashMap<>();
        jobLabelMap.put(LAYER_LABEL, LAYER_LABEL_VALUE);
        jobLabelMap.put(NAME_LABEL, k8sJobName);
        jobLabelMap.putAll(labelMap);

        Map<String, String> podLabelMap = new HashMap<>();
        podLabelMap.put(UNIQUE_LABEL_NAME, taskRequest.getTaskAppId());
        podLabelMap.putAll(labelMap);

        EnvVar taskInstanceIdVar = new EnvVar(TASK_INSTANCE_ID, taskInstanceId, null);
        List<EnvVar> envVars = new ArrayList<>();
        envVars.add(taskInstanceIdVar);
        if (MapUtils.isNotEmpty(otherParams)) {
            for (Map.Entry<String, String> entry : otherParams.entrySet()) {
                String param = entry.getKey();
                String paramValue = entry.getValue();
                EnvVar envVar = new EnvVar(param, paramValue, null);
                envVars.add(envVar);
            }
        }

        String commandString = k8STaskMainParameters.getCommand();
        String argsString = k8STaskMainParameters.getArgs();
        List<String> commands = new ArrayList<>();
        List<String> args = new ArrayList<>();

        try {
            if (!StringUtils.isEmpty(commandString)) {
                commands = yaml.load(commandString.trim());
            }
            if (!StringUtils.isEmpty(argsString)) {
                args = yaml.load(argsString.trim());
            }
        } catch (Exception e) {
            throw new TaskException("Parse yaml-like commands and args failed", e);
        }

        NodeSelectorTerm nodeSelectorTerm = new NodeSelectorTerm();
        nodeSelectorTerm.setMatchExpressions(k8STaskMainParameters.getNodeSelectorRequirements());

        Affinity affinity = k8STaskMainParameters.getNodeSelectorRequirements().size() == 0 ? null
                : new AffinityBuilder()
                        .withNewNodeAffinity()
                        .withNewRequiredDuringSchedulingIgnoredDuringExecution()
                        .addNewNodeSelectorTermLike(nodeSelectorTerm)
                        .endNodeSelectorTerm()
                        .endRequiredDuringSchedulingIgnoredDuringExecution()
                        .endNodeAffinity().build();

        job = new JobBuilder()
                .withApiVersion(API_VERSION)
                .withNewMetadata()
                .withName(k8sJobName)
                .withLabels(jobLabelMap)
                .withNamespace(namespaceName)
                .endMetadata()
                .withNewSpec()
                .withTtlSecondsAfterFinished(JOB_TTL_SECONDS)
                .withNewTemplate()
                .withNewMetadata()
                .withLabels(podLabelMap)
                .endMetadata()
                .withNewSpec()
                .addNewContainer()
                .withName(k8sJobName)
                .withImage(image)
                .withCommand(commands.size() == 0 ? null : commands)
                .withArgs(args.size() == 0 ? null : args)
                .withImagePullPolicy(imagePullPolicy)
                .withResources(new ResourceRequirements(limitRes, reqRes))
                .withEnv(envVars)
                .endContainer()
                .withImagePullSecrets(
                        StringUtils.isEmpty(pullSecret) ? null : singletonList(new LocalObjectReference(pullSecret)))
                .withRestartPolicy(RESTART_POLICY)
                .withAffinity(affinity)
                .endSpec()
                .endTemplate()
                .withBackoffLimit(retryNum)
                .endSpec()
                .build();

    }