public void updateProcessStatus()

in uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java [1320:1521]


  public void updateProcessStatus(ProcessStateUpdateDuccEvent duccEvent) throws Exception {
    String methodName = "updateProcessStatus";

    try {
      inventorySemaphore.acquire();
      for (Entry<DuccId, IDuccProcess> processEntry : getInventoryRef().entrySet()) {
        // Check if process with a given unique DuccId exist in the
        // local map
        if (processEntry.getKey().getUnique().equals(duccEvent.getDuccProcessId())) {
          // found it. Update pid and state of the process
          if (duccEvent.getPid() != null && processEntry.getValue().getPID() == null) {
            processEntry.getValue().setPID(duccEvent.getPid());
          }

          if (duccEvent.getProcessJmxUrl() != null
                  && processEntry.getValue().getProcessJmxUrl() == null) {
            processEntry.getValue().setProcessJmxUrl(duccEvent.getProcessJmxUrl());
          }
          ITimeWindow tw = processEntry.getValue().getTimeWindowInit();
          if (tw != null) {
            if (!duccEvent.getState().equals(ProcessState.Initializing)) {
              // Mark the time the process ended initialization. It also
              // covers a case when the process terminates while initializing
              tw.setEnd(TimeStamp.getCurrentMillis());
              if (duccEvent.getState().equals(ProcessState.Running)) {
                ITimeWindow twr = new TimeWindow();
                String millis;
                millis = TimeStamp.getCurrentMillis();
                // Mark the time the process started running
                processEntry.getValue().setTimeWindowRun(twr);
                twr.setStart(millis);
              }
            }
          } else {
            logger.info(methodName, null,
                    "++++++++++++ Agent Init TimeWindow not available - tw==null");
          }
          ManagedProcess deployedProcess = null;
          synchronized (monitor) {
            Iterator<ManagedProcess> it = deployedProcesses.iterator();
            while (it.hasNext()) {
              // Find ManagedProcess instance the DuccProcess
              // instance is associated with
              ManagedProcess dProcess = it.next();
              if (dProcess.getDuccProcess().getDuccId().getUnique()
                      .equals(duccEvent.getDuccProcessId())) {
                deployedProcess = dProcess;
                break;
              }
            }
          }
          if (processEntry.getValue().getProcessState() != ProcessState.Running
                  && duccEvent.getState().equals(ProcessState.Running) && deployedProcess != null) {
            // cancel process initialization timer.
            deployedProcess.stopInitializationTimer();
          }

          logger.info(methodName, null,
                  ">>>> Agent Handling Process Update - Ducc Id: "
                          + processEntry.getValue().getDuccId() + " PID:"
                          + processEntry.getValue().getPID() + " Status:" + duccEvent.getState()
                          + " Deallocated:" + processEntry.getValue().isDeallocated());
          if (deployedProcess != null && deployedProcess.getSocketEndpoint() == null
                  && duccEvent.getServiceEdnpoint() != null) {
            deployedProcess.setSocketEndpoint(duccEvent.getServiceEdnpoint());
          }

          // This is a delayed stop. Previously a request to stop the
          // process was received
          // but the PID was not available yet. Instead a flag was set
          // to initiate a
          // stop after the process reports the PID.
          if (deployedProcess != null && deployedProcess.killAfterLaunch()) {
            logger.info(methodName, null,
                    ">>>> Process Ducc Id:" + processEntry.getValue().getDuccId()
                            + " Was Previously Tagged for Kill While It Was Starting");
            undeployProcess(processEntry.getValue());
          } else if (deployedProcess != null && deployedProcess.doKill() && deployedProcess
                  .getDuccProcess().getProcessState().equals(ProcessState.Stopped)) {
            deployedProcess.getDuccProcess()
                    .setReasonForStoppingProcess(ReasonForStoppingProcess.KilledByDucc.toString());
          } else if (deployedProcess != null && (deployedProcess.doKill()
                  || deployedProcess.getDuccProcess().getProcessState().equals(ProcessState.Failed)
                  || deployedProcess.getDuccProcess().getProcessState()
                          .equals(ProcessState.Killed))) {
            // The process has already stopped, but managed to send
            // the last update before dying. Ignore the update
            return;
          } else if (changeState(processEntry.getValue().getProcessState())) {
            logger.info(methodName, null, "=============== PID:" + processEntry.getValue().getPID()
                    + " Changing State - current state:" + processEntry.getValue().getProcessState()
                    + " New State:" + duccEvent.getState());
            processEntry.getValue().setProcessState(duccEvent.getState());
            DuccEventDispatcher dispatcher = configurationFactory
                    .getORDispatcher(super.getContext());
            try {
              DefaultNodeInventoryProcessor processor = configurationFactory
                      .nodeInventoryProcessor(this);
              Map<DuccId, IDuccProcess> inventoryCopy = (Map<DuccId, IDuccProcess>) SerializationUtils
                      .clone((ConcurrentHashMap<DuccId, IDuccProcess>) inventory);

              processor.dispatchInventoryUpdate(dispatcher,
                      configurationFactory.getInventoryUpdateEndpoint(), inventoryCopy);
              logger.info(methodName, null, "Sent Node Inventory Update to the OR - process PID:"
                      + processEntry.getValue().getPID());

            } catch (Exception e) {
              logger.warn("", null, e);
            }

            // if the process is Stopping, it must have hit an error threshold
          }
          // Check if MemoryCollector should be created for this
          // process. It collects
          // resident memory of the process at regular intervals.
          // Should only be added
          // once for each process. This route will have its id set to
          // process PID.
          if (addProcessMemoryCollector(duccEvent.getPid())
                  && (duccEvent.getState().equals(ProcessState.Initializing)
                          || duccEvent.getState().equals(ProcessState.Running))) {
            if (duccEvent.getState().equals(ProcessState.Running)) {
              if (processEntry.getValue().getUimaPipelineComponents() != null
                      && processEntry.getValue().getUimaPipelineComponents().size() > 0) {
                processEntry.getValue().getUimaPipelineComponents().clear();
                if (duccEvent.getUimaPipeline() != null) {
                  duccEvent.getUimaPipeline().clear();
                }
              }
            }

          } else if (duccEvent.getState().equals(ProcessState.Stopped)
                  || duccEvent.getState().equals(ProcessState.Failed)
                  || duccEvent.getState().equals(ProcessState.Killed)) {
            if (deployedProcess.getMetricsProcessor() != null) {
              deployedProcess.getMetricsProcessor().close(); // close open fds (stat and statm
                                                             // files)
            }
            logger.info(methodName, null,
                    "----------- Agent Stopped ProcessMemoryUsagePollingRouter for Process:"
                            + duccEvent.getPid());
          } else if (duccEvent.getState().equals(ProcessState.FailedInitialization)) {
            logger.info(methodName, null,
                    ">>>> Agent Handling Process FailedInitialization. PID:" + duccEvent.getPid());
            deployedProcess.getDuccProcess().setReasonForStoppingProcess(
                    ReasonForStoppingProcess.FailedInitialization.toString());
            deployedProcess.getDuccProcess().setProcessState(ProcessState.Stopping);
            deployedProcess.setStopping();

            deployedProcess.kill();
            logger.info(methodName, null, ">>>> Agent Handling Process FailedInitialization. PID:"
                    + duccEvent.getPid() + " Killing Process");

            undeployProcess(processEntry.getValue());

          } else if (duccEvent.getState().equals(ProcessState.InitializationTimeout)) {
            deployedProcess.getDuccProcess().setReasonForStoppingProcess(
                    ReasonForStoppingProcess.InitializationTimeout.toString());
            deployedProcess.getDuccProcess().setProcessState(ProcessState.Stopping);
            deployedProcess.setStopping();

            // Mark process for death. Doesnt actually kill the process

            deployedProcess.kill();
            logger.info(methodName, null, ">>>> Agent Handling Process InitializationTimeout. PID:"
                    + duccEvent.getPid() + " Killing Process");

            undeployProcess(processEntry.getValue());

          } else if (duccEvent.getState().equals(ProcessState.Stopping)) {
            if (duccEvent.getMessage() != null && duccEvent.getMessage()
                    .equals(ReasonForStoppingProcess.ExceededErrorThreshold.toString())) {
              processEntry.getValue().setReasonForStoppingProcess(
                      ReasonForStoppingProcess.ExceededErrorThreshold.toString());
            }
            if (!deployedProcess.getDuccProcess().getProcessState().equals(ProcessState.Stopped)
                    && !deployedProcess.getDuccProcess().getProcessState()
                            .equals(ProcessState.Stopping)) {
              deployedProcess.getDuccProcess().setProcessState(ProcessState.Stopping);
              deployedProcess.setStopping();
            }
          }
          if (duccEvent.getUimaPipeline() != null) {
            StringBuffer buffer = new StringBuffer("\t\tUima Pipeline -");
            for (IUimaPipelineAEComponent uimaAeState : duccEvent.getUimaPipeline()) {
              buffer.append("\n\t\tAE:").append(uimaAeState.getAeName()).append(" state:")
                      .append(uimaAeState.getAeState()).append(" InitTime:")
                      .append(uimaAeState.getInitializationTime() / 1000).append(" secs. Thread:")
                      .append(uimaAeState.getAeThreadId());
            }
            logger.info(methodName, null, buffer.toString());
            ((DuccProcess) processEntry.getValue())
                    .setUimaPipelineComponents(duccEvent.getUimaPipeline());
          }
          return; // found it. Done
        }
      }
    } catch (InterruptedException e) {
    } finally {
      inventorySemaphore.release();
    }
  }