private void handleProcessResponseFromRemote()

in uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java [151:422]


  private void handleProcessResponseFromRemote(MessageContext aMessageContext, String aDelegateKey) {
    CAS cas = null;
    String casReferenceId = null;
    Endpoint endpointWithTimer = null;
    try {
      final int payload = aMessageContext.getMessageIntProperty(AsynchAEMessage.Payload);
      casReferenceId = aMessageContext.getMessageStringProperty(AsynchAEMessage.CasReference);
      endpointWithTimer = lookupEndpoint(aMessageContext.getEndpoint().getEndpoint(),
              casReferenceId);

      if (endpointWithTimer == null) {
        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
                  "handleProcessResponseFromRemote", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                  "UIMAEE_invalid_endpoint__WARNING",
                  new Object[] { aMessageContext.getEndpoint().getEndpoint(), casReferenceId });
        }
        return;
      }
      String delegateKey = ((AggregateAnalysisEngineController) getController())
              .lookUpDelegateKey(aMessageContext.getEndpoint().getEndpoint());
      Delegate delegate = ((AggregateAnalysisEngineController) getController())
              .lookupDelegate(delegateKey);
      boolean casRemovedFromOutstandingList = delegate.removeCasFromOutstandingList(casReferenceId);

      // Check if this process reply message is expected. A message is expected if the Cas Id
      // in the message matches an entry in the delegate's outstanding list. This list stores
      // ids of CASes sent to the remote delegate pending reply.
      if (!casRemovedFromOutstandingList) {
        //handleUnexpectedMessage(casReferenceId, aMessageContext.getEndpoint());
        return;   // out of band reply. Most likely the CAS previously timedout
      }

      // Increment number of CASes processed by this delegate
      if (aDelegateKey != null) {
        ServicePerformance delegateServicePerformance = ((AggregateAnalysisEngineController) getController())
                .getServicePerformance(aDelegateKey);
        if (delegateServicePerformance != null) {
          delegateServicePerformance.incrementNumberOfCASesProcessed();
        }
      }

      String xmi = aMessageContext.getStringMessage();

      // Fetch entry from the cache for a given Cas Id. The entry contains a CAS that will be used
      // during deserialization
      CacheEntry cacheEntry = getController().getInProcessCache().getCacheEntryForCAS(
              casReferenceId);
      // check if the client requested Performance Metrics for the CAS
      if ( aMessageContext.propertyExists(AsynchAEMessage.CASPerComponentMetrics) ) {
        try {
          // find top ancestor of this CAS. All metrics are accumulated there since
          // this is what will be returned to the client
          CacheEntry ancestor = 
                  getController().
                    getInProcessCache().
                      getTopAncestorCasEntry(cacheEntry);
          if ( ancestor != null ) {
        	// fetch Performance Metrics from remote delegate reply
            List<AnalysisEnginePerformanceMetrics> metrics = 
                    UimaSerializer.deserializePerformanceMetrics(aMessageContext.getMessageStringProperty(AsynchAEMessage.CASPerComponentMetrics));
            List<AnalysisEnginePerformanceMetrics> adjustedMetrics =
                    new ArrayList<AnalysisEnginePerformanceMetrics>();
            for(AnalysisEnginePerformanceMetrics delegateMetric : metrics ) {
              String adjustedUniqueName = ((AggregateAnalysisEngineController) getController()).getJmxContext();

              if ( adjustedUniqueName.startsWith("p0=")) {
            	  adjustedUniqueName = adjustedUniqueName.substring(3);  // skip p0=
              }
              adjustedUniqueName = adjustedUniqueName.replaceAll(" Components", "");
              if (!adjustedUniqueName.startsWith("/")) {
            	  adjustedUniqueName = "/"+adjustedUniqueName;
              }
              adjustedUniqueName += delegateMetric.getUniqueName();
              
              boolean found = false;
              AnalysisEnginePerformanceMetrics metric = null;
              for( AnalysisEnginePerformanceMetrics met : ancestor.getDelegateMetrics() ) {
            	  if ( met.getUniqueName().equals(adjustedUniqueName)) {
            		  long at = delegateMetric.getAnalysisTime();
            		  long count = delegateMetric.getNumProcessed();
            		  metric = new AnalysisEnginePerformanceMetrics(delegateMetric.getName(),adjustedUniqueName,at,count);
            		  found = true;
            		  ancestor.getDelegateMetrics().remove(met);
            		  break;
            	  }
              }
              if ( !found ) {
                  metric = new AnalysisEnginePerformanceMetrics(delegateMetric.getName(),adjustedUniqueName,delegateMetric.getAnalysisTime(),delegateMetric.getNumProcessed());
              }
              adjustedMetrics.add(metric);
            }
            
            ancestor.addDelegateMetrics(delegateKey, adjustedMetrics, true);  // true=remote
          }
        } catch (Exception e) {
          // An exception be be thrown here if the service is being stopped.
          // The top level controller may have already cleaned up the cache
          // and the getCacheEntryForCAS() will throw an exception. Ignore it
          // here, we are shutting down.
        }
        
      }
      CasStateEntry casStateEntry = ((AggregateAnalysisEngineController) getController())
              .getLocalCache().lookupEntry(casReferenceId);
      if (casStateEntry != null) {
        casStateEntry.setReplyReceived();
        // Set the key of the delegate that returned the CAS
        casStateEntry.setLastDelegate(delegate);
      } else {
        return; // Cache Entry Not found
      }

      cas = cacheEntry.getCas();
      int totalNumberOfParallelDelegatesProcessingCas = casStateEntry
              .getNumberOfParallelDelegates();
      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
                "handleProcessResponseFromRemote", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                "UIMAEE_number_parallel_delegates_FINE",
                new Object[] { totalNumberOfParallelDelegatesProcessingCas, Thread.currentThread().getId(), Thread.currentThread().getName() });
      }
      if (totalNumberOfParallelDelegatesProcessingCas > 1) {
    	  // Block this thread until CAS is dispatched to all delegates in parallel step. Fixes race condition where
    	  // a reply comes from one of delegates in parallel step before dispatch sequence completes. Without
    	  // this blocking the result of analysis are merged into a CAS.
    	  casStateEntry.blockIfParallelDispatchNotComplete();
      }
      
      if (cas == null) {
        throw new AsynchAEException(Thread.currentThread().getName()
                + "-Cache Does not contain a CAS. Cas Reference Id::" + casReferenceId);
      }
      if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
                "handleProcessResponseFromRemote", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                "UIMAEE_rcvd_reply_FINEST",
                new Object[] { aMessageContext.getEndpoint().getEndpoint(), casReferenceId, xmi });
      }
      long t1 = getController().getCpuTime();
      /* --------------------- */
      /** DESERIALIZE THE CAS. */
      /* --------------------- */
      //all subsequent serialization must be complete CAS.
      if ( !aMessageContext.getMessageBooleanProperty(AsynchAEMessage.SentDeltaCas))  {
    	cacheEntry.setAcceptsDeltaCas(false);
      }

      SerialFormat serialFormat = endpointWithTimer.getSerialFormat();
      // check if the CAS is part of the Parallel Step
      if (totalNumberOfParallelDelegatesProcessingCas > 1) {
        // Synchronized because replies are merged into the same CAS.
        synchronized (cas) {
          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
                    "handleProcessResponseFromRemote", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                    "UIMAEE_delegate_responded_count_FINEST",
                    new Object[] { casStateEntry.howManyDelegatesResponded(), casReferenceId });
          }
          // If a delta CAS, merge it while checking that no pre-existing FSs are modified.
          if (aMessageContext.getMessageBooleanProperty(AsynchAEMessage.SentDeltaCas)) {
            switch (serialFormat) {
            case XMI:
              int highWaterMark = cacheEntry.getHighWaterMark();
              deserialize(xmi, cas, casReferenceId, highWaterMark, AllowPreexistingFS.disallow);
              break;
            case COMPRESSED_FILTERED:
              deserialize(aMessageContext.getByteMessage(), cas, cacheEntry, endpointWithTimer.getTypeSystemImpl(), AllowPreexistingFS.disallow);
              break;
            default:
              throw new UIMARuntimeException(new Exception("Internal error"));
            }
          } else {
            // If not a delta CAS (old service), take all of first reply, and merge in the new
            // entries in the later replies. Ignoring pre-existing FS for 2.2.2 compatibility
            // Note: can't be a compressed binary - that would have returned a delta
            if (casStateEntry.howManyDelegatesResponded() == 0) {
              deserialize(xmi, cas, casReferenceId);
            } else { // process secondary reply from a parallel step
              int highWaterMark = cacheEntry.getHighWaterMark();
              deserialize(xmi, cas, casReferenceId, highWaterMark, AllowPreexistingFS.ignore);
            }
          }
          casStateEntry.incrementHowManyDelegatesResponded();
        }
      } else { // Processing a reply from a non-parallel delegate (binary or delta xmi or xmi)
        byte[] binaryData = aMessageContext.getByteMessage();
        ByteArrayInputStream istream = new ByteArrayInputStream(binaryData);
        switch (serialFormat) {
        case BINARY:
          ((CASImpl)cas).reinit(istream);
          break;
        case COMPRESSED_FILTERED:
          BinaryCasSerDes6 bcs = new BinaryCasSerDes6(cas, (MarkerImpl) cacheEntry.getMarker(), endpointWithTimer.getTypeSystemImpl(), cacheEntry.getCompress6ReuseInfo());          
          bcs.deserialize(istream, AllowPreexistingFS.allow);
          break;
        case XMI:
          if (aMessageContext.getMessageBooleanProperty(AsynchAEMessage.SentDeltaCas)) {
            int highWaterMark = cacheEntry.getHighWaterMark();
            deserialize(xmi, cas, casReferenceId, highWaterMark, AllowPreexistingFS.allow);
          } else {
            deserialize(xmi, cas, casReferenceId);
          }
          break;
        default:
          throw new UIMARuntimeException(new Exception("Internal error"));
        }
      }
      long timeToDeserializeCAS = getController().getCpuTime() - t1;

      getController().getServicePerformance().incrementCasDeserializationTime(timeToDeserializeCAS);

      ServicePerformance casStats = getController().getCasStatistics(casReferenceId);
      casStats.incrementCasDeserializationTime(timeToDeserializeCAS);
      LongNumericStatistic statistic;
      if ((statistic = getController().getMonitor().getLongNumericStatistic("",
              Monitor.TotalDeserializeTime)) != null) {
        statistic.increment(timeToDeserializeCAS);
      }

      computeStats(aMessageContext, casReferenceId);

      // Send CAS for processing when all delegates reply
      // totalNumberOfParallelDelegatesProcessingCas indicates how many delegates are processing CAS
      // in parallel. Default is 1, meaning only one delegate processes the CAS at the same.
      // Otherwise, check if all delegates responded before passing CAS on to the Flow Controller.
      // The idea is that all delegates processing one CAS concurrently must respond, before the CAS
      // is allowed to move on to the next step.
      // HowManyDelegatesResponded is incremented every time a parallel delegate sends response.
      if (totalNumberOfParallelDelegatesProcessingCas == 1
              || receivedAllResponsesFromParallelDelegates(casStateEntry,
                      totalNumberOfParallelDelegatesProcessingCas)) {
        super.invokeProcess(cas, casReferenceId, null, aMessageContext, null);
      }

    } catch (Exception e) {
      // Check if the exception was thrown by the Cache while looking up
      // the CAS. It may be the case if in the parallel step one thread
      // drops the CAS in the Error Handling while another thread processes
      // reply from another delegate in the Parallel Step. A race condition
      // may occur here. If one thread drops the CAS due to excessive exceptions
      // and Flow Controller is configured to drop the CAS, the other thread
      // should not be allowed to move the CAS to process()method. The second
      // thread will find the CAS missing in the cache and the logic below
      // just logs the stale CAS and returns and doesnt attempt to handle
      // the missing CAS exception.
      if (e instanceof AsynchAEException && e.getMessage() != null
              && e.getMessage().startsWith("Cas Not Found")) {
        String key = "N/A";
        if (endpointWithTimer != null) {
          key = ((AggregateAnalysisEngineController) getController())
                  .lookUpDelegateKey(endpointWithTimer.getEndpoint());
        }
        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
                  "handleProcessResponseFromRemote", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                  "UIMAEE_stale_reply__INFO",
                  new Object[] { getController().getComponentName(), key, casReferenceId });
        }
        // The reply came late. The CAS was removed from the cache.
        return;
      }
      ErrorContext errorContext = new ErrorContext();
      errorContext.add(AsynchAEMessage.Command, AsynchAEMessage.Process);
      errorContext.add(AsynchAEMessage.CasReference, casReferenceId);
      errorContext.add(AsynchAEMessage.Endpoint, aMessageContext.getEndpoint());
      getController().getErrorHandlerChain().handle(e, errorContext, getController());
    } finally {
      incrementDelegateProcessCount(aMessageContext);
    }

  }