public void process()

in uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java [1057:1236]


  public void process(CAS aCAS, String aCasReferenceId) {
    boolean handlingDelayedStep = false;
    // First check if there are outstanding steps to be called before consulting the Flow
    // Controller.
    // This could be the case if a previous step was a parallel step and it contained collocated
    // delegates.
    if (!isStopped()) {
      if (abortGeneratingCASes(aCasReferenceId)) {
        // Force delegate Cas Multipliers to Stop generating new CASes
        super.stopCasMultipliers();
      }
      try {
    	CacheEntry entry = null;
    	if ( getInProcessCache().entryExists(aCasReferenceId) ) {
    	     entry = getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
    	}
        CasStateEntry casStateEntry = getLocalCache().lookupEntry(aCasReferenceId);
        // Check if this CAS should be aborted due to previous error on this CAS or its
        // parent. If this is the case the method will move the CAS to the final state
        // where it will be dropped. If the CAS is an input CAS, it will be returned to
        // the client with an exception
        if (abortProcessingCas(casStateEntry, entry)) {
          // This CAS was aborted, we are done here
          return;
        }
        if ( entry == null ) {
        	throw new AsynchAEException("Cas Not Found In CasManager Cache. CasReferenceId::"
                    + aCasReferenceId + " is Invalid");
        }
        // Check if this is an input CAS from the client. If not, check if last
        // delegate handling this CAS was a Cas Multiplier configured to process
        // parent CAS last
        if (casStateEntry.getLastDelegate() != null) {
          // Fetch the endpoint corresponding to the last Delegate handling the CAS
          Endpoint lastDelegateEndpoint = casStateEntry.getLastDelegate().getEndpoint();
          // Check if this delegate is a Cas Multiplier and the parent CAS is to be processed last
          casStateEntry.setReplyReceived();
          if (lastDelegateEndpoint.isCasMultiplier()){
            //  The following blocks until all child CASes acquire their Flow objects from the Flow
            //  Controller. Release the semaphore immediately after acquiring it. This semaphore is 
            //  no longer needed. This synchronization is only necessary for blocking the parent
            //  CAS until all child CASes acquire their Flow objects.
            casStateEntry.acquireFlowSemaphore();
            casStateEntry.releaseFlowSemaphore();
            if ( lastDelegateEndpoint.processParentLast()) {
              synchronized (super.finalStepMux) {
                // Determine if the CAS should be held until all its children leave this aggregate.
                if (casStateEntry.getSubordinateCasInPlayCount() > 0) {
                  // This input CAS has child CASes still in play. It will remain in the cache
                  // until the last of the child CASes is released. Only than, the input CAS is
                  // is allowed to continue into the next step in the flow.
                  // The CAS has to be in final state
                  casStateEntry.setState(CacheEntry.FINAL_STATE);
                  // The input CAS will be interned until all children leave this aggregate
                  return;
                }
              }
              
            }
          }
        }
        // if we are here entry is not null. The above throws an exception if an entry is not
        // found in the cache. First check if there is a delayedSingleStepList in the cache.
        // If there is one, it means that a parallel step contained collocated delegate(s)
        // The parallel step may only contain remote delegates. All collocated delegates
        // were removed from the parallel step and added to the delayedSingleStepList in
        // parallelStep() method.
        List delayedSingleStepList = entry.getDelayedSingleStepList();
        if (delayedSingleStepList != null && delayedSingleStepList.size() > 0) {
          handlingDelayedStep = true;
          // Reset number of parallel delegates back to one. This is done only if the previous step
          // was a parallel step.
          synchronized (parallelStepMux) {
            if (casStateEntry.getNumberOfParallelDelegates() > 1) {
              casStateEntry.setNumberOfParallelDelegates(1);
            }
          }
          // Remove a delegate endpoint from the single step list cached in the CAS entry
          Endpoint endpoint = (Endpoint_impl) entry.getDelayedSingleStepList().remove(0);
          // send the CAS to a collocated delegate from the delayed single step list.
          dispatchProcessRequest(entry, endpoint, true);
        }
      } catch (Exception e) {
        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
                  "process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                  "UIMAEE_service_exception_WARNING", getComponentName());

          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "process",
                  UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING",
                  e);
        }
      } finally {
        // If just handled the delayed step, return as there is nothing else to do
        if (handlingDelayedStep) {
          return;
        }
      }
    }

    FlowContainer flow = null;
    try {
      if (aCasReferenceId != null) {
        try {
          // Check if a Flow object has been previously generated for the Cas.
          if (flowMap.containsKey(aCasReferenceId)) {
            if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
              UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
                      "process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                      "UIMAEE_retrieve_flow_object__FINEST",
                      new Object[] { getComponentName(), aCasReferenceId });
            }
            flow = (FlowContainer) flowMap.get(aCasReferenceId);
            if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
              UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
                      "process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                      "UIMAEE_retrieved_flow_object_ok__FINEST",
                      new Object[] { getComponentName(), aCasReferenceId });
            }
          } else {
            if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
              UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
                      "process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                      "UIMAEE_new_flow_object__FINEST", new Object[] { aCasReferenceId });
            }
            synchronized (flowControllerContainer) {
              flow = flowControllerContainer.computeFlow(aCAS);
            }
            // Save the Flow Object in a cache. Flow exists in the cache
            // until the CAS is fully processed or it is
            // explicitly deleted when processing of this CAS cannot
            // continue
            flowMap.put(aCasReferenceId, flow);
            // Check if the local cache already contains an entry for the Cas id.
            // A colocated Cas Multiplier may have already registered this CAS
            // in the parent's controller
            if (localCache.lookupEntry(aCasReferenceId) == null) {
              // Add this Cas Id to the local cache. Every input CAS goes through here
              localCache.createCasStateEntry(aCasReferenceId);
            }
          }
        } catch (Exception ex) {
          // Any error here is automatic termination
          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
                    "process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                    "UIMAEE_service_exception_WARNING", getComponentName());

            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
                    "process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                    "UIMAEE_exception__WARNING", ex);
          }
          sendReplyWithShutdownException(aCasReferenceId);

          handleAction(ErrorHandler.TERMINATE, null, null);
          return;
        }
        if (!isStopped()) {
          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
                    "process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                    "UIMAEE_executing_step_input_cas__FINEST",
                    new Object[] { getComponentName(), aCasReferenceId });
          }
          // Execute a step in the flow. false means that this CAS has not
          // been produced by CAS Multiplier
          executeFlowStep(flow, aCasReferenceId, false);
        } else {
          synchronized (flowControllerContainer) {
            flow.aborted();
          }
        }
      }
    } catch (Exception e) {
      HashMap map = new HashMap();
      map.put(AsynchAEMessage.Command, AsynchAEMessage.Process);
      map.put(AsynchAEMessage.CasReference, aCasReferenceId);
      handleError(map, e);
    }
  }