private void sendCasToRemoteEndpoint()

in uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java [1630:1756]


  private void sendCasToRemoteEndpoint(boolean isRequest, Object aSerializedCAS, CacheEntry entry,
          Endpoint anEndpoint, boolean startTimer) throws AsynchAEException,
          ServiceShutdownException {
    CasStateEntry casStateEntry = null;
    long msgSize = 0;
    try {
      if (aborting) {
        return;
      }
      //  If this is a reply to a client, use the same broker URL that manages this service input queue.
      //  Otherwise this is a request so use a broker specified in the endpoint object.
      String brokerConnectionURL = (anEndpoint.isReplyEndpoint()) ? serverURI : anEndpoint.getServerURI();

      casStateEntry = getAnalysisEngineController().getLocalCache().lookupEntry(
              entry.getCasReferenceId());
      if (casStateEntry == null) {
        UIMAFramework.getLogger(CLASS_NAME).logrb(
                Level.WARNING,
                CLASS_NAME.getName(),
                "sendCasToRemoteDelegate",
                JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                "UIMAJMS_unable_to_send_reply__WARNING",
                new Object[] { getAnalysisEngineController().getComponentName(),
                  anEndpoint.getDestination(), brokerConnectionURL, 
                  entry.getInputCasReferenceId() == null ? "" : entry.getInputCasReferenceId(), 
                          entry.getCasReferenceId(), 0, 
                          new Exception("Unable to lookup entry in Local Cache for a given Cas Id")  });
        return;
      }

      // Get the connection object for a given endpoint
      JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);

      if (!endpointConnection.isOpen()) {
        if (!isRequest) {
          return;
        }
      }
      Message tm = null;
      try {
        if (anEndpoint.getSerialFormat() == SerialFormat.XMI) {
          tm = endpointConnection.produceTextMessage((String)aSerializedCAS);
          if (aSerializedCAS != null) {
            msgSize = ((String)aSerializedCAS).length();
          }
          tm.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.XMIPayload);
        } else {
          // Create empty JMS Bytes Message
          tm = endpointConnection.produceByteMessage((byte[])aSerializedCAS);
          if (aSerializedCAS != null) {
            msgSize = ((byte[])aSerializedCAS).length;
          }
          tm.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.BinaryPayload);
        }
      } catch (AsynchAEException ex) {
          UIMAFramework.getLogger(CLASS_NAME).logrb(
                  Level.WARNING,
                  CLASS_NAME.getName(),
                  "sendCasToRemoteDelegate",
                  JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                  "UIMAJMS_unable_to_send_reply__WARNING",
                  new Object[] { getAnalysisEngineController().getComponentName(),
                  	anEndpoint.getDestination(), brokerConnectionURL, entry.getInputCasReferenceId() == null ? "" : entry.getInputCasReferenceId(), entry.getCasReferenceId(), 0, ex  });
        return;
      }
      // Add Cas Reference Id to the outgoing JMS Header
      tm.setStringProperty(AsynchAEMessage.CasReference, entry.getCasReferenceId());
      // Add common properties to the JMS Header
      if (isRequest == true) {
        populateHeaderWithRequestContext(tm, anEndpoint, AsynchAEMessage.Process);
      } else {
        populateHeaderWithResponseContext(tm, anEndpoint, AsynchAEMessage.Process);   
        tm.setBooleanProperty(AsynchAEMessage.SentDeltaCas, entry.sentDeltaCas());
      }
      // The following is true when the analytic is a CAS Multiplier
      if (casStateEntry.isSubordinate() && !isRequest) {
        // Override MessageType set in the populateHeaderWithContext above.
        // Make the reply message look like a request. This message will contain a new CAS
        // produced by the CAS Multiplier. The client will treat this CAS
        // differently from the input CAS.
        tm.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
        isRequest = true;
        // Save the id of the parent CAS
        tm.setStringProperty(AsynchAEMessage.InputCasReference, getTopParentCasReferenceId(entry
                .getCasReferenceId()));
        // Add a sequence number assigned to this CAS by the controller
        tm.setLongProperty(AsynchAEMessage.CasSequence, entry.getCasSequence());
        // If this is a Cas Multiplier, add a reference to a special queue where
        // the client sends Free Cas Notifications
        if (freeCASTempQueue != null) {
          // Attach a temp queue to the outgoing message. This is a queue where
          // Free CAS notifications need to be sent from the client
          tm.setJMSReplyTo(freeCASTempQueue);
        }
        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
          UIMAFramework.getLogger(CLASS_NAME).logrb(
                  Level.FINE,
                  CLASS_NAME.getName(),
                  "sendCasToRemoteEndpoint",
                  JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                  "UIMAJMS_send_cas_to_collocated_service_detail__FINE",
                  new Object[] { getAnalysisEngineController().getComponentName(), "Remote",
                      anEndpoint.getEndpoint(), entry.getCasReferenceId(),
                      entry.getInputCasReferenceId(), entry.getInputCasReferenceId() });
        }
      }
      dispatch(tm, anEndpoint, entry, isRequest, endpointConnection, msgSize);

    } catch (JMSException e) {
      // Unable to establish connection to the endpoint. Logit and continue
      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
                "sendCasToRemoteDelegate", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                "UIMAJMS_unable_to_connect__INFO",
                new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint() });
      }
    } catch (ServiceShutdownException e) {
      throw e;
    } catch (AsynchAEException e) {
    	throw e;
    } catch (ConnectException e) {
		  casStateEntry.setDeliveryToClientFailed();
    } catch (Exception e) {
        throw new AsynchAEException(e);
      }

  }