public void process()

in uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/DefaultNodeInventoryProcessor.java [83:261]


  public void process(Exchange outgoingMessage) throws Exception {
    String methodName = "process";
    // Get a deep copy of agent's inventory
    Map<DuccId, IDuccProcess> inventory = getInventory();
    // Determine if the inventory changed since the last publishing was done
    // First check if the inventory expanded or shrunk. If the same in size,
    // compare process states and PID. If either of the two changed for any
    // of the processes trigger immediate publish. If no changes found,
    // publish
    // according to skip counter
    // (ducc.agent.node.inventory.publish.rate.skip)
    // configured in ducc.properties.
    if (previousInventory != null) {
      if (agent.getEventListener().forceInvotoryUpdate()) {
        inventoryChanged = true;
        agent.getEventListener().resetForceInventoryUpdateFlag();
      }
      if (inventory.size() != previousInventory.size()) {
        inventoryChanged = true;
      } else {
        // Inventory maps are equal in size, check if all processes in
        // the current
        // inventory exist in the previous inventory snapshot. If not,
        // it means that
        // that perhaps a new process was added and one was removed. In
        // this case,
        // force the publish, since there was a change.
        for (Map.Entry<DuccId, IDuccProcess> currentProcess : inventory.entrySet()) {
          // Check if a process in the current inventory exists in a
          // previous
          // inventory snapshot
          if (previousInventory.containsKey(currentProcess.getKey())) {
            IDuccProcess previousProcess = previousInventory.get(currentProcess.getKey());
            // check if either PID or process state has changed
            if (currentProcess.getValue().getPID() != null && previousProcess.getPID() == null) {
              inventoryChanged = true;
              break;
            } else if (!currentProcess.getValue().getProcessState()
                    .equals(previousProcess.getProcessState())) {
              inventoryChanged = true;
              break;
            } else {
              List<IUimaPipelineAEComponent> breakdown = currentProcess.getValue()
                      .getUimaPipelineComponents();
              if (breakdown != null && breakdown.size() > 0) {
                List<IUimaPipelineAEComponent> previousBreakdown = previousProcess
                        .getUimaPipelineComponents();
                if (previousBreakdown == null || previousBreakdown.size() == 0
                        || breakdown.size() != previousBreakdown.size()) {
                  inventoryChanged = true;
                } else {
                  for (IUimaPipelineAEComponent uimaAeState : breakdown) {
                    boolean found = false;
                    for (IUimaPipelineAEComponent previousUimaAeState : previousBreakdown) {
                      if (uimaAeState.getAeName().equals(previousUimaAeState.getAeName())) {
                        found = true;
                        if (!uimaAeState.getAeState().equals(previousUimaAeState.getAeState())
                                || uimaAeState.getInitializationTime() != previousUimaAeState
                                        .getInitializationTime()) {
                          inventoryChanged = true;
                          break;
                        }
                      }
                    }
                    if (!found) {
                      inventoryChanged = true;
                    }

                    if (inventoryChanged) {
                      break;
                    }

                  }
                }

              }
            }
          } else {
            // New inventory contains a process not in the previous
            // snapshot
            inventoryChanged = true;
            break;
          }
        }
      }
    }

    // Get this inventory snapshot
    previousInventory = inventory;
    // Broadcast inventory if there is a change or configured number of
    // epochs
    // passed since the last broadcast. This is configured in
    // ducc.properties with
    // property ducc.agent.node.inventory.publish.rate.skip
    try {
      if (inventory.size() > 0 && (inventoryChanged || // if there is
      // inventory
      // change,
      // publish
              forceInventoryUpdateMaxThreshold == 0 || // skip rate in
              // ducc.properties
              // is zero,
              // publish
              (counter > 0 && (counter % forceInventoryUpdateMaxThreshold) == 0))) { // if
        // reached
        // skip
        // rate,
        // publish

        StringBuffer sb = new StringBuffer("Node Inventory (" + inventory.size() + ")");
        for (Map.Entry<DuccId, IDuccProcess> p : inventory.entrySet()) {
          /*
           * long endInitLong = 0; String endInit = ""; ITimeWindow wInit =
           * p.getValue().getTimeWindowInit(); if(wInit != null) { endInit = wInit.getEnd();
           * endInitLong = wInit.getEndLong(); } long startRunLong = 0; String startRun = "";
           * ITimeWindow wRun = p.getValue().getTimeWindowRun(); if(wRun != null) { startRun =
           * wRun.getStart(); startRunLong = wRun.getStartLong(); } if(endInitLong > startRunLong) {
           * logger.warn(methodName, null, "endInit:"+endInitLong+" "+"startRun:"+startRunLong); }
           */
          if (p.getValue().getUimaPipelineComponents() == null) {
            p.getValue().setUimaPipelineComponents(new ArrayList<IUimaPipelineAEComponent>());
          }
          if (!p.getValue().getProcessState().equals(ProcessState.Initializing)) {
            p.getValue().getUimaPipelineComponents().clear();
          }
          int pipelineInitStats = (p.getValue().getUimaPipelineComponents() == null) ? 0
                  : p.getValue().getUimaPipelineComponents().size();
          StringBuffer gcInfo = new StringBuffer();
          if (p.getValue().getGarbageCollectionStats() != null) {
            gcInfo.append(" GC Total=")
                    .append(p.getValue().getGarbageCollectionStats().getCollectionCount())
                    .append(" GC Time=")
                    .append(p.getValue().getGarbageCollectionStats().getCollectionTime())
                    .append(" ");

          }
          sb.append("\n\t[Process Type=").append(p.getValue().getProcessType()).append(" DUCC ID=")
                  .append(p.getValue().getDuccId()).append(" PID=").append(p.getValue().getPID())
                  .append(" State=").append(p.getValue().getProcessState())
                  .append(" Resident Memory=").append(p.getValue().getResidentMemory())
                  .append(gcInfo.toString()).append(" Init Stats List Size:" + pipelineInitStats)
                  .append(" Reason: " + p.getValue().getReasonForStoppingProcess()).append("] ");
          if (p.getValue().getProcessState().equals(ProcessState.Stopped)
                  || p.getValue().getProcessState().equals(ProcessState.Failed)
                  || p.getValue().getProcessState().equals(ProcessState.Killed)) {
            sb.append(" Reason:" + p.getValue().getReasonForStoppingProcess());
            sb.append(" Extended Reason:" + p.getValue().getExtendedReasonForStoppingProcess());
          }

          if (!p.getValue().getProcessState().equals(ProcessState.Running)
                  && !p.getValue().getProcessState().equals(ProcessState.Initializing)) {
            sb.append(" Exit Code=" + p.getValue().getProcessExitCode());
          }

        }
        logger.info(methodName, null, "Agent " + agent.getIdentity().getCanonicalName()
                + " Posting Inventory:" + sb.toString());
        outgoingMessage.getIn().setBody(new NodeInventoryUpdateDuccEvent(inventory,
                agent.getLastORSequence(), agent.getIdentity()));

      } else {
        // Add null to the body of the message. A filter
        // defined in the Camel route (AgentConfiguration.java)
        // has a predicate to check for null body and throws
        // away such a message.
        outgoingMessage.getIn().setBody(null);
      }
    } catch (Exception e) {
      logger.error(methodName, null, e);
    } finally {
      if (inventoryChanged) {
        counter = 0;
      } else {
        counter++;
      }
      inventoryChanged = false;
    }

  }