private void handleProcessRequestFromRemoteClient()

in uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java [406:632]


  private void handleProcessRequestFromRemoteClient(MessageContext aMessageContext)
          throws AsynchAEException {
    CacheEntry entry = null;
    String casReferenceId = null;
    // Check if there is a cargo in the message
    if (aMessageContext.getMessageIntProperty(AsynchAEMessage.Payload) == AsynchAEMessage.XMIPayload
            && aMessageContext.getStringMessage() == null) {
      return; // No XMI just return
    }

    CasStateEntry inputCasStateEntry = null;

    try {

      String newCASProducedBy = null;
      // Get the CAS Reference Id of the input CAS
      // Fetch id of the CAS from the message. If it doesnt exist the method will create an entry in
      // the log file and return null
      casReferenceId = getCasReferenceId(aMessageContext);
      if (casReferenceId == null) {
        return; // Invalid message. Nothing to do
      }
      // Initially make both equal
      String inputCasReferenceId = casReferenceId;
      // Destination where Free Cas Notification will be sent if the CAS came from a Cas Multiplier
      Endpoint freeCasEndpoint = null;


      // CASes generated by a Cas Multiplier will have a CasSequence property set.
      if (aMessageContext.propertyExists(AsynchAEMessage.CasSequence)) {
        // Fetch the name of the Cas Multiplier's input queue
        // String cmEndpointName = aMessageContext.getEndpoint().getEndpoint();
        String cmEndpointName = aMessageContext
                .getMessageStringProperty(AsynchAEMessage.MessageFrom);
        newCASProducedBy = ((AggregateAnalysisEngineController) getController())
                .lookUpDelegateKey(cmEndpointName);
        // Fetch an ID of the parent CAS
        inputCasReferenceId = aMessageContext
                .getMessageStringProperty(AsynchAEMessage.InputCasReference);
        // Fetch Cache entry for the parent CAS
        CacheEntry inputCasCacheEntry = getController().getInProcessCache().getCacheEntryForCAS(
                inputCasReferenceId);
        // Fetch an endpoint where Free CAS Notification must be sent.
        // This endpoint is unique per CM instance. Meaning, each
        // instance of CM will have an endpoint where it expects Free CAS
        // notifications.
        freeCasEndpoint = aMessageContext.getEndpoint();
        // Clone an endpoint where Free Cas Request will be sent
        freeCasEndpoint = (Endpoint) ((Endpoint_impl) freeCasEndpoint).clone();

        if (getController() instanceof AggregateAnalysisEngineController) {
          inputCasStateEntry = ((AggregateAnalysisEngineController) getController())
                  .getLocalCache().lookupEntry(inputCasReferenceId);

          // Associate Free Cas Notification Endpoint with an input Cas
          inputCasStateEntry.setFreeCasNotificationEndpoint(freeCasEndpoint);
        }

        computeStats(aMessageContext, inputCasReferenceId);
        // Reset the destination
        aMessageContext.getEndpoint().setDestination(null);
        // This CAS came in from a CAS Multiplier. Treat it differently than the
        // input CAS. In case the Aggregate needs to send this CAS to the
        // client, retrieve the client destination by looking up the client endpoint
        // using input CAS reference id. CASes generated by the CAS multiplier will have
        // the same Cas Reference id.
        Endpoint replyToEndpoint = inputCasCacheEntry.getMessageOrigin();
        // The message context contains a Cas Multiplier endpoint. Since
        // we dont want to send a generated CAS back to the CM, override
        // with an endpoint provided by the client of
        // this service. Client endpoint is attached to an input Cas cache entry.
        if ( replyToEndpoint != null ) {
            aMessageContext.getEndpoint().setEndpoint(replyToEndpoint.getEndpoint());
            aMessageContext.getEndpoint().setServerURI(replyToEndpoint.getServerURI());
        } 
        // Before sending a CAS to Cas Multiplier, the aggregate has
        // saved the CM key in the CAS cache entry. Fetch the key
        // of the CM so that we can ask the right Shadow Cas Pool for
        // a new CAS. Every Shadow Cas Pool has a unique id which
        // corresponds to a Cas Multiplier key.
        // newCASProducedBy = inputCasCacheEntry.getCasMultiplierKey();
        if (getController() instanceof AggregateAnalysisEngineController) {
          Endpoint casMultiplierEndpoint = ((AggregateAnalysisEngineController) getController())
                  .lookUpEndpoint(newCASProducedBy, false);
          if (casMultiplierEndpoint != null) {
            // Save the URL of the broker managing the Free Cas Notification queue.
            // This is needed when we try to establish a connection to the broker.
            freeCasEndpoint.setServerURI(casMultiplierEndpoint.getServerURI());
          }
        }
      } else if (getController().isTopLevelComponent()) {
        if (getController() instanceof AggregateAnalysisEngineController) {
          ((AggregateAnalysisEngineController) getController()).addMessageOrigin(casReferenceId,
                  aMessageContext.getEndpoint());
        }

      }
      // To prevent processing multiple messages with the same CasReferenceId, check the CAS cache
      // to see if the message with a given CasReferenceId is already being processed. It is, the
      // message contains the same request possibly issued by the caller due to timeout. Also this
      // mechanism helps with dealing with scenario when this service is not up when the client
      // sends
      // request. The client can keep re-sending the same request until its timeout thresholds are
      // exceeded. By that time, there may be multiple messages in this service queue with the same
      // CasReferenceId. When the service finally comes back up, it will have multiple messages in
      // its queue possibly from the same client. Only the first message for any given
      // CasReferenceId
      // should be processed.
      CasStateEntry cse = null;
      if (!getController().getInProcessCache().entryExists(casReferenceId)) {
        
        if (getController().getLocalCache().lookupEntry(casReferenceId) == null) {
          // Create a new entry in the local cache for the CAS received from the remote
          cse = getController().getLocalCache().createCasStateEntry(casReferenceId);
          // Check if this CAS is a child
          if (aMessageContext.propertyExists(AsynchAEMessage.CasSequence)) {
            cse.setInputCasReferenceId(inputCasReferenceId);
          }
        } else {
          cse = getController().getLocalCache().lookupEntry(casReferenceId);
        }

        if (getController() instanceof AggregateAnalysisEngineController
                && aMessageContext.propertyExists(AsynchAEMessage.CasSequence)) {
          String delegateInputQueueName = aMessageContext
                  .getMessageStringProperty(AsynchAEMessage.MessageFrom);
          String delegateKey = ((AggregateAnalysisEngineController) getController())
                  .lookUpDelegateKey(delegateInputQueueName); // aMessageContext.getEndpoint().getEndpoint());
          if (delegateKey != null) {
            Delegate delegate = ((AggregateAnalysisEngineController) getController())
                    .lookupDelegate(delegateKey);
            // Save the last delegate handling this CAS
            cse.setLastDelegate(delegate);
            // If there is one thread receiving messages from Cas Multiplier increment number of
            // child Cases
            // of the parent CAS. If there are more threads (consumers) a special object
            // ConcurrentMessageListener
            // has already incremented the count. This special object enforces order of processing
            // for CASes
            // coming in from the Cas Multiplier.
            if (!delegate.hasConcurrentConsumersOnReplyQueue()) {
              inputCasStateEntry.incrementSubordinateCasInPlayCount();
            }
          }
        }
        try {
            entry = deserializeCASandRegisterWithCache(casReferenceId, freeCasEndpoint,
                    newCASProducedBy, aMessageContext);
        } catch( Exception ex) {
            // Mark this CAS as failed.
        	if ( cse != null ) {
        		cse.setFailed();
        	}
        	throw ex;
        }
        if (getController().isStopped() || entry == null || entry.getCas() == null) {
          if (entry != null) {
            // The Controller is in shutdown state, release the CAS
            getController().dropCAS(entry.getCasReferenceId(), true);
            entry = null;
          }
          return;
        }
        // *****************************************************************
        // Process the CAS
        // *****************************************************************
        invokeProcess(entry.getCas(), inputCasReferenceId, casReferenceId, aMessageContext,
                newCASProducedBy);
        
        /**
         * Below comments apply to UIMA AS aggregate only.
         * CAS has been handed off to a delegate. Now block the receiving thread until
         * the CAS is processed or there is a timeout or error. Fetch this thread's ThreadLocal
         * semaphore to block the thread. It will be unblocked when the aggregate is done with
         * the CAS.
         */
        if (!getController().isPrimitive() && 
        		cse != null && !cse.isSubordinate() ) {
        		
          try {
        	  synchronized(lock) {
        	    while( !getController().isStopped()) {
                if ( entry.getThreadCompletionSemaphore() != null) {
                  boolean gotIt = entry.getThreadCompletionSemaphore().tryAcquire(500, TimeUnit.MILLISECONDS);
                  if ( gotIt ) {
                    break;
                  }
                } else {
                  break;
                }
        	      
        	    }
        	  }
          } catch( InterruptedException ex) {
          } 
        }
        
      } else {
        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
                  "handleProcessRequestFromRemoteClient", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                  "UIMAEE_duplicate_request__INFO", new Object[] { casReferenceId });
        }
      }
    } catch (Exception e) {
      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
        if ( getController() != null ) {
          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
                  "handleProcessRequestFromRemoteClient", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                  "UIMAEE_service_exception_WARNING", getController().getComponentName());
        }

        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
                "handleProcessRequestFromRemoteClient", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                "UIMAEE_exception__WARNING", e);
      }
      ErrorContext errorContext = new ErrorContext();
      errorContext.add(AsynchAEMessage.Endpoint, aMessageContext.getEndpoint());
      errorContext.add(AsynchAEMessage.Command, AsynchAEMessage.Process);
      errorContext.add(AsynchAEMessage.CasReference, casReferenceId);
      if (entry != null) {
        getController().dropCAS(entry.getCas());
      }
      getController().getErrorHandlerChain().handle(e, errorContext, getController());
    }

  }