in dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/shell/AbstractShell.java [124:231]
private void runCommand() throws IOException {
ProcessBuilder builder = new ProcessBuilder(getExecString());
Timer timeOutTimer = null;
ShellTimeoutTimerTask timeoutTimerTask = null;
timedOut = new AtomicBoolean(false);
completed = new AtomicBoolean(false);
if (environment != null) {
builder.environment().putAll(this.environment);
}
if (dir != null) {
builder.directory(this.dir);
}
process = builder.start();
ProcessContainer.putProcess(process);
if (timeOutInterval > 0) {
timeOutTimer = new Timer();
timeoutTimerTask = new ShellTimeoutTimerTask(this);
// One time scheduling.
timeOutTimer.schedule(timeoutTimerTask, timeOutInterval);
}
final BufferedReader errReader =
new BufferedReader(
new InputStreamReader(process.getErrorStream()));
BufferedReader inReader =
new BufferedReader(
new InputStreamReader(process.getInputStream()));
final StringBuilder errMsg = new StringBuilder();
// read error and input streams as this would free up the buffers
// free the error stream buffer
Thread errThread = new Thread() {
@Override
public void run() {
try {
String line = errReader.readLine();
while ((line != null) && !isInterrupted()) {
errMsg.append(line);
errMsg.append(System.getProperty("line.separator"));
line = errReader.readLine();
}
} catch (IOException ioe) {
log.warn("Error reading the error stream", ioe);
}
}
};
Thread inThread = new Thread() {
@Override
public void run() {
try {
parseExecResult(inReader);
} catch (IOException ioe) {
log.warn("Error reading the in stream", ioe);
}
super.run();
}
};
try {
errThread.start();
inThread.start();
} catch (IllegalStateException ise) {
log.warn("Illegal while starting the error and in thread", ise);
}
try {
// parse the output
exitCode = process.waitFor();
try {
// make sure that the error and in thread exits
errThread.join();
inThread.join();
} catch (InterruptedException ie) {
log.warn("Interrupted while reading the error and in stream", ie);
}
completed.compareAndSet(false, true);
// the timeout thread handling
// taken care in finally block
if (exitCode != 0 || errMsg.length() > 0) {
throw new ExitCodeException(exitCode, errMsg.toString());
}
} catch (InterruptedException ie) {
throw new IOException(ie.toString());
} finally {
if ((timeOutTimer != null) && !timedOut.get()) {
timeOutTimer.cancel();
}
// close the input stream
try {
inReader.close();
} catch (IOException ioe) {
log.warn("Error while closing the input stream", ioe);
}
if (!completed.get()) {
errThread.interrupt();
}
try {
errReader.close();
} catch (IOException ioe) {
log.warn("Error while closing the error stream", ioe);
}
ProcessContainer.removeProcess(process);
process.destroy();
lastTime = System.currentTimeMillis();
}
}