protected synchronized void aggregateDelegateStats()

in uimaj-as-core/src/main/java/org/apache/uima/aae/handler/HandlerBase.java [208:314]


  protected synchronized void aggregateDelegateStats(MessageContext aMessageContext,
          String aCasReferenceId) throws AsynchAEException {
    String delegateKey = "";
    try {

      delegateKey = ((AggregateAnalysisEngineController) getController())
              .lookUpDelegateKey(aMessageContext.getEndpoint().getEndpoint());
      CacheEntry entry = getController().getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
      if (entry == null) {
        throw new AsynchAEException("CasReferenceId:" + aCasReferenceId
                + " Not Found in the Cache.");
      }
      CacheEntry inputCasEntry = null;
      String inputCasReferenceId = entry.getInputCasReferenceId();
      ServicePerformance casStats = ((AggregateAnalysisEngineController) getController())
              .getCasStatistics(aCasReferenceId);
      if (inputCasReferenceId != null
              && getController().getInProcessCache().entryExists(inputCasReferenceId)) {
        String casProducerKey = entry.getCasProducerKey();
        if (casProducerKey != null
                && ((AggregateAnalysisEngineController) getController())
                        .isDelegateKeyValid(casProducerKey)) {
          // Get entry for the input CAS
          inputCasEntry = getController().getInProcessCache().getCacheEntryForCAS(
                  inputCasReferenceId);
        }

      }
      ServicePerformance delegateServicePerformance = ((AggregateAnalysisEngineController) getController())
              .getServicePerformance(delegateKey);

      if (aMessageContext.propertyExists(AsynchAEMessage.TimeToSerializeCAS)) {
        long timeToSerializeCAS = ((Long) aMessageContext
                .getMessageLongProperty(AsynchAEMessage.TimeToSerializeCAS)).longValue();
        if (timeToSerializeCAS > 0) {
          if (delegateServicePerformance != null) {
            delegateServicePerformance.incrementCasSerializationTime(timeToSerializeCAS);
          }
        }
      }
      if (aMessageContext.propertyExists(AsynchAEMessage.TimeToDeserializeCAS)) {
        long timeToDeserializeCAS = ((Long) aMessageContext
                .getMessageLongProperty(AsynchAEMessage.TimeToDeserializeCAS)).longValue();
        if (timeToDeserializeCAS > 0) {
          if (delegateServicePerformance != null) {
            delegateServicePerformance.incrementCasDeserializationTime(timeToDeserializeCAS);
          }
        }
      }

      if (aMessageContext.propertyExists(AsynchAEMessage.IdleTime)) {
        long idleTime = ((Long) aMessageContext.getMessageLongProperty(AsynchAEMessage.IdleTime))
                .longValue();
        if (idleTime > 0 && delegateServicePerformance != null) {
          Endpoint endp = aMessageContext.getEndpoint();
          if (endp != null && endp.isRemote()) {
            delegateServicePerformance.incrementIdleTime(idleTime);
          }
        }
      }

      if (aMessageContext.propertyExists(AsynchAEMessage.TimeWaitingForCAS)) {
        long timeWaitingForCAS = ((Long) aMessageContext
                .getMessageLongProperty(AsynchAEMessage.TimeWaitingForCAS)).longValue();
        if (timeWaitingForCAS > 0 && aMessageContext.getEndpoint().isRemote()) {
          entry.incrementTimeWaitingForCAS(timeWaitingForCAS);
          delegateServicePerformance.incrementCasPoolWaitTime(timeWaitingForCAS
                  - delegateServicePerformance.getRawCasPoolWaitTime());
          if (inputCasEntry != null) {
            inputCasEntry.incrementTimeWaitingForCAS(timeWaitingForCAS);
          }
        }
      }
      if (aMessageContext.propertyExists(AsynchAEMessage.TimeInProcessCAS)) {
        long timeInProcessCAS = ((Long) aMessageContext
                .getMessageLongProperty(AsynchAEMessage.TimeInProcessCAS)).longValue();
        Endpoint endp = aMessageContext.getEndpoint();
        if (endp != null && endp.isRemote()) {
          if (delegateServicePerformance != null) {
            // calculate the time spent in analysis. The remote service returns total time
            // spent in the analysis. Compute the delta.
            long dt = timeInProcessCAS - delegateServicePerformance.getRawAnalysisTime();
            // increment total time in analysis
            delegateServicePerformance.incrementAnalysisTime(dt);
            getController().getServicePerformance().incrementAnalysisTime(dt);
          }
        } else {
          getController().getServicePerformance().incrementAnalysisTime(timeInProcessCAS);
        }
        casStats.incrementAnalysisTime(timeInProcessCAS);

        if (inputCasReferenceId != null) {
          ServicePerformance inputCasStats = ((AggregateAnalysisEngineController) getController())
                  .getCasStatistics(inputCasReferenceId);
          // Update processing time for this CAS
          if (inputCasStats != null) {
            inputCasStats.incrementAnalysisTime(timeInProcessCAS);
          }
        }

      }
    } catch (AsynchAEException e) {
      throw e;
    } catch (Exception e) {
      throw new AsynchAEException(e);
    }
  }