private String sendCAS()

in uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java [879:1014]


  private String sendCAS(CAS aCAS, ClientRequest requestToCache, String targetServiceId) throws ResourceProcessException {
    synchronized (sendMux) {
      if ( requestToCache == null ) {
        throw new ResourceProcessException(new Exception("Invalid Process Request. Cache Entry is Null"));
      }
      String casReferenceId = requestToCache.getCasReferenceId();
      // check if application wants to target a specific service instance to process the CAS
      if ( targetServiceId != null) {
    	  // the dispatcher will fetch the service target id before sending the msg
    	  requestToCache.setTargetServiceId(targetServiceId);
      }
      try {
        if (!running) {
          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "sendCAS",
                    JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_not_sending_cas_INFO",
                    new Object[] { "Asynchronous Client is Stopping" });
          }
          return null;
        }

        clientCache.put(casReferenceId, requestToCache);
        PendingMessage msg = new PendingMessage(AsynchAEMessage.Process);
        long t1 = System.nanoTime();
        switch (serialFormat) {
        case XMI:
          XmiSerializationSharedData serSharedData = new XmiSerializationSharedData();
          String serializedCAS = serializeCAS(aCAS, serSharedData);
          msg.put(AsynchAEMessage.CAS, serializedCAS);
          if (remoteService) {  // always true 5/2013
            // Store the serialized CAS in case the timeout occurs and need to send the
            // the offending CAS to listeners for reporting
            requestToCache.setCAS(serializedCAS);
            requestToCache.setXmiSerializationSharedData(serSharedData);
          }
          break;
        case BINARY:
          byte[] serializedBinaryCAS = uimaSerializer.serializeCasToBinary(aCAS);
          msg.put(AsynchAEMessage.CAS, serializedBinaryCAS);
          break;
        case COMPRESSED_FILTERED:
          // can't use uimaserializer directly - project doesn't have ref to this one
          // for storing the reuse info
          BinaryCasSerDes6 bcs = new BinaryCasSerDes6(aCAS, this.getRemoteTypeSystem());
          ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
          bcs.serialize(baos);
          requestToCache.setCompress6ReuseInfo(bcs.getReuseInfo());           
          msg.put(AsynchAEMessage.CAS, baos.toByteArray());
          break;
        default:
          throw new UIMARuntimeException(new Exception("Internal Error"));    
        }
        
        requestToCache.setCAS(aCAS);

        requestToCache.setSerializationTime(System.nanoTime() - t1);
        msg.put(AsynchAEMessage.CasReference, casReferenceId);
        requestToCache.setIsRemote(remoteService);
        requestToCache.setEndpoint(getEndPointName());
        requestToCache.setProcessTimeout(processTimeout);
        requestToCache.clearTimeoutException();

        // The sendCAS() method is synchronized no need to synchronize the code below
        if (serviceDelegate.getState() == Delegate.TIMEOUT_STATE ) {
          SharedConnection sharedConnection = lookupConnection(getBrokerURI());
          
          //  Send Ping to service as getMeta request
          if ( sharedConnection != null && !serviceDelegate.isAwaitingPingReply() && sharedConnection.isOpen() ) {
            serviceDelegate.setAwaitingPingReply();
            // Add the cas to a list of CASes pending reply. Also start the timer if necessary
			// serviceDelegate.addCasToOutstandingList(requestToCache.getCasReferenceId());
        	  
        	  // since the service is in time out state, we dont send CASes to it just yet. Instead, place
        	  // a CAS in a pending dispatch list. CASes from this list will be sent once a response to PING
        	  // arrives.
            serviceDelegate.addCasToPendingDispatchList(requestToCache.getCasReferenceId(), aCAS.hashCode(), timerPerCAS); 
            if ( cpcReadySemaphore.availablePermits() > 0 ) {
              acquireCpcReadySemaphore();
            }

            // Send PING Request to check delegate's availability
            sendMetaRequest();
            // @@@@@@@@@@@@@@@ Changed on 4/20 serviceDelegate.cancelDelegateTimer();
            // Start a timer for GetMeta ping and associate a cas id
            // with this timer. The delegate is currently in a timed out
            // state due to a timeout on a CAS with a given casReferenceId.
            //  
            serviceDelegate.startGetMetaRequestTimer(casReferenceId);
            
            if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
              UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "sendCAS",
                      JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_client_sending_ping__FINE",
                      new Object[] { serviceDelegate.getKey() });
            }
            return casReferenceId;
          } else {
            if ( !requestToCache.isSynchronousInvocation() && !sharedConnection.isOpen() ) {
              Exception exception = new BrokerConnectionException("Unable To Deliver CAS:"+requestToCache.getCasReferenceId()+" To Destination. Connection To Broker "+getBrokerURI()+" Has Been Lost");
              handleException(exception, requestToCache.getCasReferenceId(), null, requestToCache, true);
              return casReferenceId;
            } else {
              //  Add to the outstanding list.  
			  //  serviceDelegate.addCasToOutstandingList(requestToCache.getCasReferenceId());
        	  // since the service is in time out state, we dont send CASes to it just yet. Instead, place
        	  // a CAS in a pending dispatch list. CASes from this list will be sent once a response to PING
        	  // arrives.
              serviceDelegate.addCasToPendingDispatchList(requestToCache.getCasReferenceId(), aCAS.hashCode(), timerPerCAS);
              return casReferenceId;
            }
          }
        }
        SharedConnection sharedConnection = lookupConnection(getBrokerURI());
        if ( sharedConnection != null &&  !sharedConnection.isOpen() ) {
          if (requestToCache != null && !requestToCache.isSynchronousInvocation() && aCAS != null ) {
            aCAS.release();
          }
          throw new ResourceProcessException(new BrokerConnectionException("Unable To Deliver Message To Destination. Connection To Broker "+sharedConnection.getBroker()+" Has Been Lost")); 
        }    
        // Incremented number of outstanding CASes sent to a service. When a reply comes
        // this counter is decremented
        outstandingCasRequests.incrementAndGet();
        // Increment total number of CASes sent to a service. This is reset
        // on CPC
        totalCasRequestsSentBetweenCpCs.incrementAndGet();
        // Add message to the pending queue
        addMessage(msg);
      } catch (ResourceProcessException e) {
        clientCache.remove(casReferenceId);
        throw e;
      } catch (Exception e) {
        clientCache.remove(casReferenceId);
        throw new ResourceProcessException(e);
      }
      return casReferenceId;
    }
  }