public void finalStep()

in uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java [1666:1951]


  public void finalStep(FinalStep aStep, String aCasReferenceId) {
    Endpoint endpoint = null;
    boolean replySentToClient = false;
    boolean isSubordinate = false;
    CacheEntry cacheEntry = null;
    CasStateEntry casStateEntry = null;
    CasStateEntry parentCasStateEntry = null;
    Endpoint freeCasEndpoint = null;
    CacheEntry parentCASCacheEntry = null;
    Endpoint cEndpoint = null;
    boolean casDropped = false;
    boolean doDecrementChildCount = false;
    localCache.dumpContents();

    // First locate entries in the global and local cache for a given CAS
    // If not found, log a message and return
    try {
      // Get entry from the cache for a given CAS Id. This throws an exception if
      // an entry doesnt exist in the cache
      cacheEntry = getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
      casStateEntry = localCache.lookupEntry(aCasReferenceId);
      if (casStateEntry.getState() != CacheEntry.FINAL_STATE) {
        // Mark the entry to indicate that the CAS reached a final step. This CAS
        // may still have children and will not be returned to the client until
        // all of them are fully processed. This state info will aid in the
        // internal bookkeeping when the final child is processed.
        casStateEntry.setFinalStep(aStep);
        casStateEntry.setState(CacheEntry.FINAL_STATE);
        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
          UIMAFramework.getLogger(CLASS_NAME).logrb(
                  Level.FINE,
                  CLASS_NAME.getName(),
                  "finalStep",
                  UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                  "UIMAEE_cas_in_finalstep__FINE",
                  new Object[] { getComponentName(), casStateEntry.getCasReferenceId(),
                      casStateEntry.getSubordinateCasInPlayCount() });
        }
      }
    } catch (Exception e) {
      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
                "finalStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                "UIMAEE_service_exception_WARNING", getComponentName());

        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "finalStep",
                UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING",
                e);
      }
      return;
    }
    // Found entries in caches for a given CAS id
    try {
      endpoint = getInProcessCache().getEndpoint(null, aCasReferenceId);

      synchronized (super.finalStepMux) {
        // Check if the global cache still contains the CAS. It may have been deleted by another
        // thread already
        if (!getInProcessCache().entryExists(aCasReferenceId)) {
          return;
        }
        
        // Check if this CAS has children that are still being processed in this aggregate
        if (casHasChildrenInPlay(casStateEntry)) {
          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
            UIMAFramework.getLogger(CLASS_NAME).logrb(
                    Level.FINE,
                    CLASS_NAME.getName(),
                    "finalStep",
                    UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                    "UIMAEE_cas_has_children__FINE",
                    new Object[] { getComponentName(), casStateEntry.getCasReferenceId(),
                        casStateEntry.getSubordinateCasInPlayCount() });
          }

          replySentToClient = false;
          casStateEntry.waitingForChildrenToFinish(true);
          return;
        }
      
        // check if this is a warm up CAS. Such CAS is internally created with
        // a main purpose of warming up analytics.
        if ( isTopLevelComponent() && cacheEntry.isWarmUp()) {
        	if ( cacheEntry.getThreadCompletionSemaphore() != null ) {
            	cacheEntry.getThreadCompletionSemaphore().release();
        	}
        	// we are in the warm up state which means the pipelines have been initialized
        	// and we are sending CASes to warm up/prime the analytics. Since we
        	// reached the end of the flow here, just return now. There is nothing
        	// else to do with the CAS.
        	return;
        	
        }

        
        casStateEntry.waitingForChildrenToFinish(false);
        if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
                  "finalStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                  "UIMAEE_final_step_parent_cas_no_children__FINEST",
                  new Object[] { getComponentName(), aCasReferenceId });
        }
        // Determine if this CAS is a child of some CAS
        isSubordinate = casStateEntry.getInputCasReferenceId() != null;

        if (isSubordinate) {
          // fetch the destination of a CM that produced this CAS, so that we know where to send
          // Free Cas Notification
          freeCasEndpoint = cacheEntry.getFreeCasEndpoint();
          parentCasStateEntry = fetchParentCasFromLocalCache(casStateEntry);
          parentCASCacheEntry = fetchParentCasFromGlobalCache(casStateEntry);
          doDecrementChildCount = true;
        }
        // If the CAS was generated by this component but the Flow Controller wants to drop it OR
        // this component
        // is not a Cas Multiplier
        
        
        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
                    "finalStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                    "UIMAEE_drop_cas_debug_FINEST",
                    new Object[] { getComponentName(), aStep.getForceCasToBeDropped(), aCasReferenceId, casStateEntry.isReplyReceived() });
        
        if (forceToDropTheCas(parentCasStateEntry, cacheEntry, aStep)) {
        	
        	
            
   
        	
        	if (casStateEntry.isReplyReceived()) {
            if (isSubordinate) {
              // drop the flow since we no longer need it
              dropFlow(aCasReferenceId, true);
              // Drop the CAS and remove cache entry for it
              dropCAS(aCasReferenceId, true);
              casDropped = true;
              // If debug level=FINEST dump the entire cache
              localCache.dumpContents();
              // Set this state as if we sent the reply to the client. This triggers a cleanup of
              // origin map and stats
              // for the current cas
              if (isTopLevelComponent()) {
                replySentToClient = true;
              }
            }
          } else {
            doDecrementChildCount = false;
          }
        } else if (!casStateEntry.isDropped()) {
          casStateEntry.setWaitingForRelease(true);
          // Send a reply to the Client. If the CAS is an input CAS it will be dropped
          cEndpoint = replyToClient(cacheEntry, casStateEntry);
          replySentToClient = true;
          
          if (cEndpoint != null && cEndpoint.isRemote()) {
            // if this service is a Cas Multiplier don't remove the CAS. It will be removed
            // when a remote client sends explicit Release CAS Request
            if (!isCasMultiplier()) {
              // Drop the CAS and remove cache entry for it
              dropCAS(aCasReferenceId, true);
            }
            casDropped = true;
          } else {
            // Remove entry from the local cache for this CAS. If the client
            // is remote the entry was removed in replyToClient()
            try {
              localCache.lookupEntry(aCasReferenceId).setDropped(true);
            } catch (Exception e) {
            }
            localCache.remove(aCasReferenceId);
          }
          // If debug level=FINEST dump the entire cache
          localCache.dumpContents();
        }

        if (parentCasStateEntry == null && isSubordinate) {
          parentCasStateEntry = localCache.lookupEntry(casStateEntry.getInputCasReferenceId());
        }
        if (doDecrementChildCount) {
          // Child CAS has been fully processed, decrement its parent count of active child CASes
          if (parentCasStateEntry != null) {
            parentCasStateEntry.decrementSubordinateCasInPlayCount();
            // If debug level=FINEST dump the entire cache
            localCache.dumpContents();
            if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
              UIMAFramework.getLogger(CLASS_NAME).logrb(
                      Level.FINE,
                      CLASS_NAME.getName(),
                      "finalStep",
                      UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                      "UIMAEE_cas_decremented_child_count__FINE",
                      new Object[] { getComponentName(), parentCasStateEntry.getCasReferenceId(),
                    	  parentCasStateEntry.getSubordinateCasInPlayCount() });
            }
          }
        }

        boolean clientIsCollocated = (cEndpoint == null || !cEndpoint.isRemote());

        if (parentCasStateEntry != null && parentCasStateEntry.getSubordinateCasInPlayCount() == 0
                && parentCasStateEntry.isFailed()) {
          parentCasStateEntry.setReplyReceived();
        }
        // For subordinate CAS, check if its parent needs to be put in play. This should happen if
        // this CAS was the last of the children in play
        if (isSubordinate && releaseParentCas(casDropped, clientIsCollocated, parentCasStateEntry)  ) {
//        	parentCasStateEntry.waitingForChildrenToFinish(false);
//        if (isSubordinate && releaseParentCas(casDropped, clientIsCollocated, parentCasStateEntry) ) {
          // Put the parent CAS in play. The parent CAS can be in one of two places now depending
          // on the configuration. The parent CAS could have been suspended in the final step, or it
          // could have
          // been suspended in the process method. If the configuration indicates that the parent
          // should follow only when the last of its children leaves this aggregate, call the
          // process method.
          // Otherwise, the CAS is in a final state and simply needs to resume there.
          Endpoint lastDelegateEndpoint = parentCasStateEntry.getLastDelegate().getEndpoint();
          if (lastDelegateEndpoint.processParentLast()) {
        	  
            // The parent was suspended in the process call. Resume processing the CAS
            process(parentCASCacheEntry.getCas(), parentCasStateEntry.getCasReferenceId());
          } else if ( parentCasStateEntry.waitingForChildrenToFinish()){
        	  
            // The parent was suspended in the final step. Resume processing the CAS
            finalStep(parentCasStateEntry.getFinalStep(), parentCasStateEntry.getCasReferenceId());
          }
        }
      } // synchronized
      if (endpoint != null) {
        // remove stats associated with this Cas and a given endpoint
        dropStats(aCasReferenceId, endpoint.getEndpoint());
      }
    } catch (Exception e) {
      HashMap map = new HashMap();
      map.put(AsynchAEMessage.Command, AsynchAEMessage.Process);
      map.put(AsynchAEMessage.CasReference, aCasReferenceId);
      if (endpoint != null) {
        map.put(AsynchAEMessage.Endpoint, endpoint);
      }
      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
                "finalStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                "UIMAEE_service_exception_WARNING", getComponentName());

        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
                "finalStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                "UIMAEE_exception__WARNING", e);
      }
      handleError(map, e);
    } finally {
      if (replySentToClient) {
        removeMessageOrigin(aCasReferenceId);
        dropStats(aCasReferenceId, super.getName());
      }
      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
        UIMAFramework.getLogger(CLASS_NAME).logrb(
                Level.FINEST,
                CLASS_NAME.getName(),
                "finalStep",
                UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                "UIMAEE_final_step_show_internal_stats__FINEST",
                new Object[] { getName(), flowMap.size(), getInProcessCache().getSize(),
                    originMap.size(), super.statsMap.size() });
      }
      // freeCasEndpoint is a special endpoint for sending Free CAS Notification.
      if (casDropped && freeCasEndpoint != null) {
        try {
          freeCasEndpoint.setReplyEndpoint(true);
          freeCasEndpoint.setIsCasMultiplier(true);
          freeCasEndpoint.setFreeCasEndpoint(true);
          // send Free CAS Notification to a Cas Multiplier
          getOutputChannel().sendRequest(AsynchAEMessage.ReleaseCAS, aCasReferenceId,
                  freeCasEndpoint);
        } catch (Exception e) {
          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
                    "finalStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                    "UIMAEE_service_exception_WARNING", getComponentName());

            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
                    "finalStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                    "UIMAEE_exception__WARNING", e);
          }
        }
      }
    }
  }