private CacheEntry deserializeCASandRegisterWithCache()

in uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java [177:381]


  private CacheEntry deserializeCASandRegisterWithCache(String casReferenceId,
          Endpoint freeCasEndpoint, String shadowCasPoolKey, MessageContext aMessageContext)
          throws Exception {
    long inTime = System.nanoTime();
    boolean casRegistered = false;

    // Time how long we wait on Cas Pool to fetch a new CAS
    long t1 = getController().getCpuTime();
    // *************************************************************************
    // Fetch CAS from a Cas Pool. If the CAS came from a Cas Multiplier
    // fetch the CAS from a shadow CAS pool. Otherwise, fetch the CAS
    // from the service CAS Pool.
    // *************************************************************************
    Endpoint endpoint = aMessageContext.getEndpoint();
    CAS cas = null;
    CacheEntry entry = null;
    
    try {
      cas = getCAS(aMessageContext.propertyExists(AsynchAEMessage.CasSequence), shadowCasPoolKey,
              endpoint.getEndpoint());
      long timeWaitingForCAS = getController().getCpuTime() - t1;
      // Check if we are still running
      if (getController().isStopped()) {
        // The Controller is in shutdown state.
        getController().dropCAS(cas);
        return null;
      }
      // *************************************************************************
      // Deserialize CAS from the message
      // *************************************************************************
      t1 = getController().getCpuTime();
      SerialFormat serialFormat = endpoint.getSerialFormat();
      XmiSerializationSharedData deserSharedData = null;
      ReuseInfo reuseInfo = null;
      
      UimaSerializer uimaSerializer = SerializerCache.lookupSerializerByThreadId();
      byte[] binarySource = aMessageContext.getByteMessage();
      boolean failed = false;
      Exception cachedException = null;
      try {
          switch (serialFormat) {
          case XMI:
            // Fetch serialized CAS from the message
            String xmi = aMessageContext.getStringMessage();
            deserSharedData = new XmiSerializationSharedData();
            uimaSerializer.deserializeCasFromXmi(xmi, cas, deserSharedData, true, -1);
            break;
          case BINARY:
            // *************************************************************************
            // Register the CAS with a local cache
            // *************************************************************************
            // CacheEntry entry = getController().getInProcessCache().register(cas, aMessageContext,
            // deserSharedData, casReferenceId);
            // BINARY format may be COMPRESSED etc, so update it upon reading
            serialFormat = uimaSerializer.deserializeCasFromBinary(binarySource, cas);
            // BINARY format may be COMPRESSED etc, so update it upon reading
            endpoint.setSerialFormat(serialFormat);
            break;
          case COMPRESSED_FILTERED:
            ByteArrayInputStream bais = new ByteArrayInputStream(binarySource);
            reuseInfo = Serialization.deserializeCAS(cas, bais, endpoint.getTypeSystemImpl(), null).getReuseInfo();
            break;
          default:
            throw new RuntimeException("Never Happen");
          }
    	  
      } catch( Exception ex) {
          // When de-serialization fails we still need to create a cache entry for the CAS.
    	  // Code downstream tries to lookup an entry even for a CAS that failed. Cache the
    	  // fact that there was a failure and the cause. Once the entry is properly setup
    	  // rethrow the exception to  handle the failure.
    	  failed = true;
    	  cachedException = ex;
      }

      // *************************************************************************
      // Check and set up for Delta CAS reply
      // *************************************************************************
      boolean acceptsDeltaCas = false;
      Marker marker = null;
      if (!failed && aMessageContext.propertyExists(AsynchAEMessage.AcceptsDeltaCas)) {
        acceptsDeltaCas = aMessageContext.getMessageBooleanProperty(AsynchAEMessage.AcceptsDeltaCas);
        if (acceptsDeltaCas) {
          marker = cas.createMarker();
        }
      }
      // *************************************************************************
      // Register the CAS with a local cache
      // *************************************************************************
      // CacheEntry entry = getController().getInProcessCache().register(cas, aMessageContext,
      // deserSharedData, casReferenceId);
      entry = getController().getInProcessCache().register(cas, aMessageContext, deserSharedData, reuseInfo,
              casReferenceId, marker, acceptsDeltaCas);
     
      if ( failed ) {
    	  entry.setFailed(true);
      }
      /*
		Lazily create a Semaphore that will be used to throttle CAS ingestion from service
		input queue. This only applies to async aggregates. The semaphore is initialized
		with the number of permits equal to the service CasPool size. The idea is that this
		service should only ingest as many CASes as it is capable of processing without
		waiting for a free instance of CAS from the service CasPool.
      */
      boolean inputCAS = aMessageContext
           .getMessageStringProperty(AsynchAEMessage.InputCasReference) == null ? true : false;
      if ( !getController().isPrimitive() && inputCAS) {
    	 
    	  try {
    		  synchronized(lock) {
    			  // lazily create a Semaphore on the first Process request. This semaphore
    			  // will throttle ingestion of CASes from service input queue.
    			  if (((AggregateAnalysisEngineController_impl) getController()).semaphore == null) {
    				  ((AggregateAnalysisEngineController_impl) getController()).semaphore = 
    						  new Semaphore(
    						  ((AggregateAnalysisEngineController) getController())
    						  .getServiceCasPoolSize()-1);
    				 // semaphore.acquire();
    			  }
    		  }
    	  } catch( Exception e) {
    		  throw e;
    	  }
        entry.setThreadCompletionSemaphore(((AggregateAnalysisEngineController_impl) getController()).semaphore);
      }
      long timeToDeserializeCAS = getController().getCpuTime() - t1;
      getController().incrementDeserializationTime(timeToDeserializeCAS);
      LongNumericStatistic statistic;
      if ((statistic = getController().getMonitor().getLongNumericStatistic("",
              Monitor.TotalDeserializeTime)) != null) {
        statistic.increment(timeToDeserializeCAS);
      }
      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
                "handleProcessRequestWithXMI", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                "UIMAEE_deserialize_cas_time_FINE",
                new Object[] { (double) timeToDeserializeCAS / 1000000.0 });
      }

      // Update Stats
      ServicePerformance casStats = getController().getCasStatistics(casReferenceId);
      casStats.incrementCasDeserializationTime(timeToDeserializeCAS);
      if (getController().isTopLevelComponent()) {
        synchronized (mux) {
          getController().getServicePerformance().incrementCasDeserializationTime(
                  timeToDeserializeCAS);
        }
      }
      getController().saveTime(inTime, casReferenceId, getController().getName());

      if (getController() instanceof AggregateAnalysisEngineController) {
        // If the message came from a Cas Multiplier, associate the input/parent CAS id with this CAS
        if (aMessageContext.propertyExists(AsynchAEMessage.CasSequence)) {
          // Fetch parent CAS id
          String inputCasReferenceId = aMessageContext
                  .getMessageStringProperty(AsynchAEMessage.InputCasReference);
          if (shadowCasPoolKey != null) {
            // Save the key of the Cas Multiplier in the cache. It will be now known which Cas
            // Multiplier produced this CAS
            entry.setCasProducerKey(shadowCasPoolKey);
          }
          // associate this subordinate CAS with the parent CAS
          entry.setInputCasReferenceId(inputCasReferenceId);
          // Save a Cas Multiplier endpoint where a Free CAS notification will be sent
          entry.setFreeCasEndpoint(freeCasEndpoint);
          cacheStats(inputCasReferenceId, timeWaitingForCAS, timeToDeserializeCAS);
        } else {
          cacheStats(casReferenceId, timeWaitingForCAS, timeToDeserializeCAS);
        }
        DelegateStats stats = new DelegateStats();
        if (entry.getStat() == null) {
          entry.setStat(stats);
          // Add entry for self (this aggregate). MessageContext.getEndpointName()
          // returns the name of the queue receiving the message.
          stats.put(getController().getServiceEndpointName(), new TimerStats());
        } else {
          if (!stats.containsKey(getController().getServiceEndpointName())) {
            stats.put(getController().getServiceEndpointName(), new DelegateStats());
          }
        }
      } else {
        cacheStats(casReferenceId, timeWaitingForCAS, timeToDeserializeCAS);
      }
      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
                "handleProcessRequestWithXMI", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                "UIMAEE_deserialized_cas_ready_to_process_FINE",
                new Object[] { aMessageContext.getEndpoint().getEndpoint() });
      }
      cacheProcessCommandInClientEndpoint();
      
      if ( failed ) {
    	  throw cachedException;
      }
    } catch( Exception e) {
      if ( cas != null ) {
        cas.release();
        if ( entry != null ) {
            entry.releasedCAS();
        }
      }
      throw e;
    }
    return entry;
  }