in dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java [104:208]
public TaskResponse run(IShellInterceptorBuilder iShellInterceptorBuilder,
TaskCallBack taskCallBack) throws Exception {
TaskResponse result = new TaskResponse();
int taskInstanceId = taskRequest.getTaskInstanceId();
// todo: we need to use state like JDK Thread to make sure the killed task should not be executed
iShellInterceptorBuilder = iShellInterceptorBuilder
.shellDirectory(taskRequest.getExecutePath())
.shellName(taskRequest.getTaskAppId());
// Set system env
if (CollectionUtils.isNotEmpty(ShellUtils.ENV_SOURCE_LIST)) {
ShellUtils.ENV_SOURCE_LIST.forEach(iShellInterceptorBuilder::appendSystemEnv);
}
// Set custom env
if (StringUtils.isNotBlank(taskRequest.getEnvironmentConfig())) {
iShellInterceptorBuilder.appendCustomEnvScript(taskRequest.getEnvironmentConfig());
}
// Set k8s config (This is only work in Linux)
if (taskRequest.getK8sTaskExecutionContext() != null) {
iShellInterceptorBuilder.k8sConfigYaml(taskRequest.getK8sTaskExecutionContext().getConfigYaml());
}
// Set sudo (This is only work in Linux)
iShellInterceptorBuilder.sudoMode(OSUtils.isSudoEnable());
// Set tenant (This is only work in Linux)
iShellInterceptorBuilder.runUser(taskRequest.getTenantCode());
// Set CPU Quota (This is only work in Linux)
if (taskRequest.getCpuQuota() != null) {
iShellInterceptorBuilder.cpuQuota(taskRequest.getCpuQuota());
}
// Set memory Quota (This is only work in Linux)
if (taskRequest.getMemoryMax() != null) {
iShellInterceptorBuilder.memoryQuota(taskRequest.getMemoryMax());
}
IShellInterceptor iShellInterceptor = iShellInterceptorBuilder.build();
process = iShellInterceptor.execute();
// parse process output
parseProcessOutput(this.process);
// collect pod log
collectPodLogIfNeeded();
int processId = getProcessId(this.process);
result.setProcessId(processId);
// cache processId
taskRequest.setProcessId(processId);
// print process id
log.info("process start, process id is: {}", processId);
// if timeout occurs, exit directly
long remainTime = getRemainTime();
// update pid before waiting for the run to finish
if (null != taskCallBack) {
taskCallBack.updateTaskInstanceInfo(taskInstanceId);
}
// waiting for the run to finish
boolean status = this.process.waitFor(remainTime, TimeUnit.SECONDS);
TaskExecutionStatus kubernetesStatus =
ProcessUtils.getApplicationStatus(taskRequest.getK8sTaskExecutionContext(), taskRequest.getTaskAppId());
if (taskOutputFuture != null) {
try {
// Wait the task log process finished.
taskOutputFuture.get();
} catch (ExecutionException e) {
log.error("Handle task log error", e);
}
}
if (podLogOutputFuture != null) {
try {
// Wait kubernetes pod log collection finished
podLogOutputFuture.get();
// delete pod after successful execution and log collection
ProcessUtils.cancelApplication(taskRequest);
} catch (ExecutionException e) {
log.error("Handle pod log error", e);
}
}
// if SHELL task exit
if (status && kubernetesStatus.isSuccess()) {
// SHELL task state
result.setExitStatusCode(this.process.exitValue());
} else {
log.error("process has failure, the task timeout configuration value is:{}, ready to kill ...",
taskRequest.getTaskTimeout());
result.setExitStatusCode(EXIT_CODE_FAILURE);
cancelApplication();
}
int exitCode = this.process.exitValue();
String exitLogMessage = EXIT_CODE_KILL == exitCode ? "process has killed." : "process has exited.";
log.info("{} execute path:{}, processId:{} ,exitStatusCode:{} ,processWaitForStatus:{} ,processExitValue:{}",
exitLogMessage, taskRequest.getExecutePath(), processId, result.getExitStatusCode(), status, exitCode);
return result;
}