public TaskResponse run()

in dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java [104:208]


    public TaskResponse run(IShellInterceptorBuilder iShellInterceptorBuilder,
                            TaskCallBack taskCallBack) throws Exception {
        TaskResponse result = new TaskResponse();
        int taskInstanceId = taskRequest.getTaskInstanceId();
        // todo: we need to use state like JDK Thread to make sure the killed task should not be executed
        iShellInterceptorBuilder = iShellInterceptorBuilder
                .shellDirectory(taskRequest.getExecutePath())
                .shellName(taskRequest.getTaskAppId());
        // Set system env
        if (CollectionUtils.isNotEmpty(ShellUtils.ENV_SOURCE_LIST)) {
            ShellUtils.ENV_SOURCE_LIST.forEach(iShellInterceptorBuilder::appendSystemEnv);
        }
        // Set custom env
        if (StringUtils.isNotBlank(taskRequest.getEnvironmentConfig())) {
            iShellInterceptorBuilder.appendCustomEnvScript(taskRequest.getEnvironmentConfig());
        }
        // Set k8s config (This is only work in Linux)
        if (taskRequest.getK8sTaskExecutionContext() != null) {
            iShellInterceptorBuilder.k8sConfigYaml(taskRequest.getK8sTaskExecutionContext().getConfigYaml());
        }
        // Set sudo (This is only work in Linux)
        iShellInterceptorBuilder.sudoMode(OSUtils.isSudoEnable());
        // Set tenant (This is only work in Linux)
        iShellInterceptorBuilder.runUser(taskRequest.getTenantCode());
        // Set CPU Quota (This is only work in Linux)
        if (taskRequest.getCpuQuota() != null) {
            iShellInterceptorBuilder.cpuQuota(taskRequest.getCpuQuota());
        }
        // Set memory Quota (This is only work in Linux)
        if (taskRequest.getMemoryMax() != null) {
            iShellInterceptorBuilder.memoryQuota(taskRequest.getMemoryMax());
        }

        IShellInterceptor iShellInterceptor = iShellInterceptorBuilder.build();
        process = iShellInterceptor.execute();

        // parse process output
        parseProcessOutput(this.process);

        // collect pod log
        collectPodLogIfNeeded();

        int processId = getProcessId(this.process);

        result.setProcessId(processId);

        // cache processId
        taskRequest.setProcessId(processId);

        // print process id
        log.info("process start, process id is: {}", processId);

        // if timeout occurs, exit directly
        long remainTime = getRemainTime();

        // update pid before waiting for the run to finish
        if (null != taskCallBack) {
            taskCallBack.updateTaskInstanceInfo(taskInstanceId);
        }

        // waiting for the run to finish
        boolean status = this.process.waitFor(remainTime, TimeUnit.SECONDS);

        TaskExecutionStatus kubernetesStatus =
                ProcessUtils.getApplicationStatus(taskRequest.getK8sTaskExecutionContext(), taskRequest.getTaskAppId());

        if (taskOutputFuture != null) {
            try {
                // Wait the task log process finished.
                taskOutputFuture.get();
            } catch (ExecutionException e) {
                log.error("Handle task log error", e);
            }
        }

        if (podLogOutputFuture != null) {
            try {
                // Wait kubernetes pod log collection finished
                podLogOutputFuture.get();
                // delete pod after successful execution and log collection
                ProcessUtils.cancelApplication(taskRequest);
            } catch (ExecutionException e) {
                log.error("Handle pod log error", e);
            }
        }

        // if SHELL task exit
        if (status && kubernetesStatus.isSuccess()) {

            // SHELL task state
            result.setExitStatusCode(this.process.exitValue());

        } else {
            log.error("process has failure, the task timeout configuration value is:{}, ready to kill ...",
                    taskRequest.getTaskTimeout());
            result.setExitStatusCode(EXIT_CODE_FAILURE);
            cancelApplication();
        }
        int exitCode = this.process.exitValue();
        String exitLogMessage = EXIT_CODE_KILL == exitCode ? "process has killed." : "process has exited.";
        log.info("{} execute path:{}, processId:{} ,exitStatusCode:{} ,processWaitForStatus:{} ,processExitValue:{}",
                exitLogMessage, taskRequest.getExecutePath(), processId, result.getExitStatusCode(), status, exitCode);
        return result;

    }