private void handleProcessRequestWithCASReference()

in uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java [634:831]


  private void handleProcessRequestWithCASReference(MessageContext aMessageContext)
          throws AsynchAEException {
    boolean isNewCAS = false;
    String newCASProducedBy = null;

    try {
      // This is only used when handling CASes produced by CAS Multiplier
      String inputCasReferenceId = null;
      CAS cas = null;
      CasStateEntry cse = null;
      String casReferenceId = getCasReferenceId(aMessageContext);
      if ((cse = getController().getLocalCache().lookupEntry(casReferenceId)) == null) {
        // Create a new entry in the local cache for the CAS received from the remote
        cse = getController().getLocalCache().createCasStateEntry(casReferenceId);
      }

      // Check if this Cas has been sent from a Cas Multiplier. If so, its sequence will be > 0
      if (aMessageContext.propertyExists(AsynchAEMessage.CasSequence)) {
        isNewCAS = true;

        Endpoint casMultiplierEndpoint = aMessageContext.getEndpoint();

        if (casMultiplierEndpoint == null) {
          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
                    "handleProcessRequestWithCASReference",
                    UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_no_endpoint_for_reply__INFO",
                    new Object[] { casReferenceId });
          }
          return;
        }
        // Get the id of the parent Cas
        inputCasReferenceId = aMessageContext
                .getMessageStringProperty(AsynchAEMessage.InputCasReference);
        if (cse.getInputCasReferenceId() == null) {
          cse.setInputCasReferenceId(inputCasReferenceId);
        }

        if (getController() instanceof AggregateAnalysisEngineController) {
          CasStateEntry parentCasEntry = getController().getLocalCache().lookupEntry(
                  inputCasReferenceId);
          // Check if the parent CAS is in a failed state first
          if (parentCasEntry != null && parentCasEntry.isFailed()) {
            // handle CAS release
            getController().process(null, casReferenceId);
            return;
          }

          String delegateKey = ((AggregateAnalysisEngineController) getController())
                  .lookUpDelegateKey(aMessageContext.getEndpoint().getEndpoint());
          Delegate delegate = ((AggregateAnalysisEngineController) getController())
                  .lookupDelegate(delegateKey);
          cse.setLastDelegate(delegate);
          newCASProducedBy = delegate.getKey();
          casMultiplierEndpoint.setIsCasMultiplier(true);
          try {
            // Save the endpoint of the CM which produced the Cas
            getController().getInProcessCache().setCasProducer(casReferenceId, newCASProducedBy);
          } catch (Exception e) {
            if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
              if ( getController() != null ) {
                UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
                        "handleProcessRequestWithCASReference", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                        "UIMAEE_service_exception_WARNING", getController().getComponentName());
              }

              UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
                      "handleProcessRequestWithCASReference",
                      UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING",
                      e);
            }
            return;
          }
          // Safety check. The input Cas should not be null here
          if (inputCasReferenceId != null) {
            try {
              Endpoint endp = null;

              // Located the origin of the parent Cas. The produced Cas will inherit the origin from
              // its parent.
              // Once the origin is identified, save the origin using the produced Cas id as a key.
              if (endp == null) {
                boolean gotTheEndpoint = false;
                String parentCasId = inputCasReferenceId;
                // Loop through the parent tree until an origin is found
                while (!gotTheEndpoint) {
                  // Check if the current parent has an associated origin
                  endp = ((AggregateAnalysisEngineController) getController())
                          .getMessageOrigin(parentCasId);
                  // Check if there is an origin. If so, we are done
                  if (endp != null) {
                    break;
                  }
                  // The current parent has no origin, get its parent and try again
                  CacheEntry entry = getController().getInProcessCache().getCacheEntryForCAS(
                          parentCasId);
                  parentCasId = entry.getInputCasReferenceId();
                  // Check if we reached the top of the hierarchy tree. If so, we have no origin.
                  // This should
                  // never be the case. Every Cas must have an origin
                  if (parentCasId == null) {
                    break;
                  }
                }
              }
              // If origin not found log it as this indicates an error
              if (endp == null) {
                if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
                  UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
                          "handleProcessRequestWithCASReference",
                          UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                          "UIMAEE_msg_origin_not_found__INFO",
                          new Object[] { getController().getComponentName(), inputCasReferenceId });
                }
              } else {
                ((AggregateAnalysisEngineController) getController()).addMessageOrigin(
                        casReferenceId, endp);
                if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
                  UIMAFramework.getLogger(CLASS_NAME).logrb(
                          Level.FINEST,
                          CLASS_NAME.getName(),
                          "handleProcessRequestWithCASReference",
                          UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                          "UIMAEE_msg_origin_added__FINEST",
                          new Object[] { getController().getComponentName(), casReferenceId,
                              newCASProducedBy });
                }
              }
            } catch (Exception e) {
             
              if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
                if ( getController() != null ) {
                  UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
                          "handleProcessRequestWithCASReference", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                          "UIMAEE_service_exception_WARNING", getController().getComponentName());
                }

                UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
                        "handleProcessRequestWithCASReference",
                        UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", e);
              }
            }
          } else {
            if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
              UIMAFramework.getLogger(CLASS_NAME).logrb(
                      Level.INFO,
                      CLASS_NAME.getName(),
                      "handleProcessRequestWithCASReference",
                      UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                      "UIMAEE_input_cas_invalid__INFO",
                      new Object[] { getController().getComponentName(), newCASProducedBy,
                          casReferenceId });
            }
          }
        }
        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
                  "handleProcessRequestWithCASReference", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                  "UIMAEE_new_cas__FINE", new Object[] { casReferenceId, newCASProducedBy });
        }
        aMessageContext.getEndpoint().setEndpoint(casMultiplierEndpoint.getEndpoint());
        aMessageContext.getEndpoint().setServerURI(casMultiplierEndpoint.getServerURI());
      } else {
        if (getController() instanceof AggregateAnalysisEngineController) {
          ((AggregateAnalysisEngineController) getController()).addMessageOrigin(casReferenceId,
                  aMessageContext.getEndpoint());
        }

      }
      cas = getController().getInProcessCache().getCasByReference(casReferenceId);

      long arrivalTime = System.nanoTime();
      getController().saveTime(arrivalTime, casReferenceId, getController().getName());// aMessageContext.getEndpointName());

      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
                "handleProcessRequestWithCASReference", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                "UIMAEE_analyzing_cas__FINE", new Object[] { casReferenceId });
      }
      // Save Process command in the client endpoint.
      cacheProcessCommandInClientEndpoint();

      if (getController().isStopped()) {
        return;
      }

      if (isNewCAS) {
        invokeProcess(cas, inputCasReferenceId, casReferenceId, aMessageContext, newCASProducedBy);
      } else {
        invokeProcess(cas, casReferenceId, null, aMessageContext, newCASProducedBy);
      }
    } catch (AsynchAEException e) {
      throw e;
    } catch (Exception e) {
      throw new AsynchAEException(e);
    }

  }