public void drainProcessStreams()

in uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/ManagedProcess.java [354:518]


  public void drainProcessStreams(java.lang.Process process, DuccLogger logger, PrintStream pStream,
          boolean isKillCmd) {
    // Create dedicated Thread Group for the process stream consumer threads
    ThreadGroup group = new ThreadGroup("AgentDeployer" + tgCounter++);
    // Fetch stdin from the deployed process
    InputStream stdin = process.getInputStream();
    // Fetch stderr from the deployed process
    InputStream stderr = process.getErrorStream();
    // Create dedicated thread to consume std output stream from the process
    stdOutReader = new ProcessStreamConsumer(logger, group, "StdOutputReader", stdin, pStream,
            workDuccId);
    // Create dedicated thread to consume std error stream from the process
    stdErrReader = new ProcessStreamConsumer(logger, group, "StdErrorReader", stderr, pStream,
            workDuccId);

    // Start both stream consumer threads
    stdOutReader.start();
    stdErrReader.start();
    // block until the process is terminated or the agent terminates
    boolean finished = false;
    int exitcode = 0;
    while (!finished) {
      try {
        process.waitFor();
        if (getMetricsProcessor() != null) {
          // close open fds (stat and statm files
          getMetricsProcessor().close();
        }
      } catch (InterruptedException e) {
      }
      try {
        exitcode = process.exitValue();
        finished = true;
      } catch (IllegalThreadStateException e) {
      }
    }
    // determine if this process is an Arbitrary Process (AP)
    boolean isAP = isJd() && getDuccProcess().getProcessType()
            .equals(org.apache.uima.ducc.transport.event.common.IDuccProcessType.ProcessType.Pop);
    try {
      // wait for stdout and stderr reader threads to finish. Join for max
      // of 2 secs The process has exited and in theory the join should
      // return quickly. We do the join to make sure that the streams are
      // drained so that we can get a reason for failure if there was a
      // problem launching the process.
      stdOutReader.join(2000);
      stdErrReader.join(2000);
    } catch (InterruptedException ie) {
      log("ManagedProcess.drainProcessStreams",
              "Interrupted While Awaiting Termination of StdOutReader and StdErrReader Threads");
    }

    String reason = getDuccProcess().getReasonForStoppingProcess();

    ProcessState pstate = getDuccProcess().getProcessState();
    boolean initError = (reason != null
            && (reason.equals(ReasonForStoppingProcess.FailedInitialization.toString())
                    || reason.equals(ReasonForStoppingProcess.InitializationTimeout.toString())));
    if (isKillCmd ||
    // if the process is to be killed due to init problems, set the
    // state to Stopped
            (reason != null && initError)) {
      // getDuccProcess().setProcessState(ProcessState.Stopped);
      if (!initError && (exitcode - 128 == 9 || exitcode - 128 == 15)) {
        getDuccProcess()
                .setReasonForStoppingProcess(ReasonForStoppingProcess.KilledByDucc.toString());
      }

    } else {
      // Process has terminated. Determine why the process terminated.
      log("ManagedProcess.drainProcessStreams",
              "Ducc Process with PID:" + getPid() + " Terminated while in " + pstate + " State");

      // true if agent killed the process. Process either exceeded memory
      // use or the PM state notifications stopped coming in.
      if (doKill()) {
        if (isAP) {
          // Agent killed the AP process
          pstate = ProcessState.Stopped;
          if (exitcode - 128 == 9 || exitcode - 128 == 15) { // kill -9 or -15?
            getDuccProcess()
                    .setReasonForStoppingProcess(ReasonForStoppingProcess.KilledByDucc.toString());
          }
        } else {
          // Agent killed the process due to timeout waiting for OR state
          pstate = ProcessState.Killed;
        }
      } else {
        if (!isstopping) {
          // check if process exited while in Initializing state
          if (ProcessState.Initializing.equals(pstate) || ProcessState.Starting.equals(pstate)
                  || ProcessState.Started.equals(pstate)) {
            getDuccProcess().setReasonForStoppingProcess(
                    ReasonForStoppingProcess.FailedInitialization.toString());
            log("ManagedProcess.drainProcessStreams",
                    "Process Failed while in initializing state - setting reason to "
                            + getDuccProcess().getReasonForStoppingProcess());
          }
        }
        // default state to Stopped. If the process died unexpectadly the state
        // will be changed to Failed
        pstate = ProcessState.Stopped;
        // check if the process died due to an external cause. If that
        // was the case isstopping = false. The isstopping=true iff the Agent
        // initiated process stop because the process was deallocated
        if (!isstopping) {
          // fetch errors from stderr stream. If the process failed to
          // start due to misconfiguration
          // the reason for failure would be provided by the OS (wrong
          // user id, bad directory,etc)
          String errors = stdErrReader.getDataFromStream();
          if (errors != null && errors.trim().length() > 0) {
            getDuccProcess().setExtendedReasonForStoppingProcess(errors.trim());

            log("ManagedProcess.drainProcessStreams", "Process Failed - stderr stream:"
                    + getDuccProcess().getReasonForStoppingProcess());
          }

          // APs can stop for any reason. There is
          // no way to determine why the AP terminated.
          if (!isAP) {
            // Unexpected process termination
            pstate = ProcessState.Failed;
            // fetch errors from stderr stream. If the process failed to
            // start due to misconfiguration
            // the reason for failure would be provided by the OS (wrong
            // user id, bad directory,etc)
            if (errors != null && errors.trim().length() > 0) {
              // JP should not be marked as CROAKED if it terminates
              // due to a process error, failed initialization or initialization
              // timeout. On such errors, a JP sends an event to its agent where
              // the reason for failure is set
              if (getDuccProcess().getProcessState().equals(ProcessState.Stopping)) {
                // the reason was already set while handling JPs Stopping event
                getDuccProcess().setProcessState(ProcessState.Stopped); // now the JP is dead
              } else {
                // Process terminated unexpectedly. It stopped on its own due to Ducc framework
                // error or due to some external event not initiated by an agent
                getDuccProcess()
                        .setReasonForStoppingProcess(ReasonForStoppingProcess.Croaked.toString());
              }

            } else if (exitcode - 128 == 9 || exitcode - 128 == 15) {
              // Process terminated unexpectedly. It stopped on its own due to Ducc framework
              // error or due to some external event not initiated by an agent
              getDuccProcess()
                      .setReasonForStoppingProcess(ReasonForStoppingProcess.Croaked.toString());
            }
          }
        } else {
          if (exitcode - 128 == 9 || exitcode - 128 == 15) { // check if the process was killed with
                                                             // -9 or -15
            addReasonForStopping(getDuccProcess(),
                    ReasonForStoppingProcess.KilledByDucc.toString());
          } else {
            addReasonForStopping(getDuccProcess(), ReasonForStoppingProcess.Deallocated.toString());
          }
        }
        notifyProcessObserver(pstate);
        log("ManagedProcess.drainProcessStreams",
                "************ Remote Process PID:" + getPid() + " Terminated *************** State:"
                        + getDuccProcess().getProcessState().toString());
      }
    }
  }