in uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/ManagedProcess.java [354:518]
public void drainProcessStreams(java.lang.Process process, DuccLogger logger, PrintStream pStream,
boolean isKillCmd) {
// Create dedicated Thread Group for the process stream consumer threads
ThreadGroup group = new ThreadGroup("AgentDeployer" + tgCounter++);
// Fetch stdin from the deployed process
InputStream stdin = process.getInputStream();
// Fetch stderr from the deployed process
InputStream stderr = process.getErrorStream();
// Create dedicated thread to consume std output stream from the process
stdOutReader = new ProcessStreamConsumer(logger, group, "StdOutputReader", stdin, pStream,
workDuccId);
// Create dedicated thread to consume std error stream from the process
stdErrReader = new ProcessStreamConsumer(logger, group, "StdErrorReader", stderr, pStream,
workDuccId);
// Start both stream consumer threads
stdOutReader.start();
stdErrReader.start();
// block until the process is terminated or the agent terminates
boolean finished = false;
int exitcode = 0;
while (!finished) {
try {
process.waitFor();
if (getMetricsProcessor() != null) {
// close open fds (stat and statm files
getMetricsProcessor().close();
}
} catch (InterruptedException e) {
}
try {
exitcode = process.exitValue();
finished = true;
} catch (IllegalThreadStateException e) {
}
}
// determine if this process is an Arbitrary Process (AP)
boolean isAP = isJd() && getDuccProcess().getProcessType()
.equals(org.apache.uima.ducc.transport.event.common.IDuccProcessType.ProcessType.Pop);
try {
// wait for stdout and stderr reader threads to finish. Join for max
// of 2 secs The process has exited and in theory the join should
// return quickly. We do the join to make sure that the streams are
// drained so that we can get a reason for failure if there was a
// problem launching the process.
stdOutReader.join(2000);
stdErrReader.join(2000);
} catch (InterruptedException ie) {
log("ManagedProcess.drainProcessStreams",
"Interrupted While Awaiting Termination of StdOutReader and StdErrReader Threads");
}
String reason = getDuccProcess().getReasonForStoppingProcess();
ProcessState pstate = getDuccProcess().getProcessState();
boolean initError = (reason != null
&& (reason.equals(ReasonForStoppingProcess.FailedInitialization.toString())
|| reason.equals(ReasonForStoppingProcess.InitializationTimeout.toString())));
if (isKillCmd ||
// if the process is to be killed due to init problems, set the
// state to Stopped
(reason != null && initError)) {
// getDuccProcess().setProcessState(ProcessState.Stopped);
if (!initError && (exitcode - 128 == 9 || exitcode - 128 == 15)) {
getDuccProcess()
.setReasonForStoppingProcess(ReasonForStoppingProcess.KilledByDucc.toString());
}
} else {
// Process has terminated. Determine why the process terminated.
log("ManagedProcess.drainProcessStreams",
"Ducc Process with PID:" + getPid() + " Terminated while in " + pstate + " State");
// true if agent killed the process. Process either exceeded memory
// use or the PM state notifications stopped coming in.
if (doKill()) {
if (isAP) {
// Agent killed the AP process
pstate = ProcessState.Stopped;
if (exitcode - 128 == 9 || exitcode - 128 == 15) { // kill -9 or -15?
getDuccProcess()
.setReasonForStoppingProcess(ReasonForStoppingProcess.KilledByDucc.toString());
}
} else {
// Agent killed the process due to timeout waiting for OR state
pstate = ProcessState.Killed;
}
} else {
if (!isstopping) {
// check if process exited while in Initializing state
if (ProcessState.Initializing.equals(pstate) || ProcessState.Starting.equals(pstate)
|| ProcessState.Started.equals(pstate)) {
getDuccProcess().setReasonForStoppingProcess(
ReasonForStoppingProcess.FailedInitialization.toString());
log("ManagedProcess.drainProcessStreams",
"Process Failed while in initializing state - setting reason to "
+ getDuccProcess().getReasonForStoppingProcess());
}
}
// default state to Stopped. If the process died unexpectadly the state
// will be changed to Failed
pstate = ProcessState.Stopped;
// check if the process died due to an external cause. If that
// was the case isstopping = false. The isstopping=true iff the Agent
// initiated process stop because the process was deallocated
if (!isstopping) {
// fetch errors from stderr stream. If the process failed to
// start due to misconfiguration
// the reason for failure would be provided by the OS (wrong
// user id, bad directory,etc)
String errors = stdErrReader.getDataFromStream();
if (errors != null && errors.trim().length() > 0) {
getDuccProcess().setExtendedReasonForStoppingProcess(errors.trim());
log("ManagedProcess.drainProcessStreams", "Process Failed - stderr stream:"
+ getDuccProcess().getReasonForStoppingProcess());
}
// APs can stop for any reason. There is
// no way to determine why the AP terminated.
if (!isAP) {
// Unexpected process termination
pstate = ProcessState.Failed;
// fetch errors from stderr stream. If the process failed to
// start due to misconfiguration
// the reason for failure would be provided by the OS (wrong
// user id, bad directory,etc)
if (errors != null && errors.trim().length() > 0) {
// JP should not be marked as CROAKED if it terminates
// due to a process error, failed initialization or initialization
// timeout. On such errors, a JP sends an event to its agent where
// the reason for failure is set
if (getDuccProcess().getProcessState().equals(ProcessState.Stopping)) {
// the reason was already set while handling JPs Stopping event
getDuccProcess().setProcessState(ProcessState.Stopped); // now the JP is dead
} else {
// Process terminated unexpectedly. It stopped on its own due to Ducc framework
// error or due to some external event not initiated by an agent
getDuccProcess()
.setReasonForStoppingProcess(ReasonForStoppingProcess.Croaked.toString());
}
} else if (exitcode - 128 == 9 || exitcode - 128 == 15) {
// Process terminated unexpectedly. It stopped on its own due to Ducc framework
// error or due to some external event not initiated by an agent
getDuccProcess()
.setReasonForStoppingProcess(ReasonForStoppingProcess.Croaked.toString());
}
}
} else {
if (exitcode - 128 == 9 || exitcode - 128 == 15) { // check if the process was killed with
// -9 or -15
addReasonForStopping(getDuccProcess(),
ReasonForStoppingProcess.KilledByDucc.toString());
} else {
addReasonForStopping(getDuccProcess(), ReasonForStoppingProcess.Deallocated.toString());
}
}
notifyProcessObserver(pstate);
log("ManagedProcess.drainProcessStreams",
"************ Remote Process PID:" + getPid() + " Terminated *************** State:"
+ getDuccProcess().getProcessState().toString());
}
}
}