in uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java [1630:1756]
private void sendCasToRemoteEndpoint(boolean isRequest, Object aSerializedCAS, CacheEntry entry,
Endpoint anEndpoint, boolean startTimer) throws AsynchAEException,
ServiceShutdownException {
CasStateEntry casStateEntry = null;
long msgSize = 0;
try {
if (aborting) {
return;
}
// If this is a reply to a client, use the same broker URL that manages this service input queue.
// Otherwise this is a request so use a broker specified in the endpoint object.
String brokerConnectionURL = (anEndpoint.isReplyEndpoint()) ? serverURI : anEndpoint.getServerURI();
casStateEntry = getAnalysisEngineController().getLocalCache().lookupEntry(
entry.getCasReferenceId());
if (casStateEntry == null) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.WARNING,
CLASS_NAME.getName(),
"sendCasToRemoteDelegate",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_unable_to_send_reply__WARNING",
new Object[] { getAnalysisEngineController().getComponentName(),
anEndpoint.getDestination(), brokerConnectionURL,
entry.getInputCasReferenceId() == null ? "" : entry.getInputCasReferenceId(),
entry.getCasReferenceId(), 0,
new Exception("Unable to lookup entry in Local Cache for a given Cas Id") });
return;
}
// Get the connection object for a given endpoint
JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);
if (!endpointConnection.isOpen()) {
if (!isRequest) {
return;
}
}
Message tm = null;
try {
if (anEndpoint.getSerialFormat() == SerialFormat.XMI) {
tm = endpointConnection.produceTextMessage((String)aSerializedCAS);
if (aSerializedCAS != null) {
msgSize = ((String)aSerializedCAS).length();
}
tm.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.XMIPayload);
} else {
// Create empty JMS Bytes Message
tm = endpointConnection.produceByteMessage((byte[])aSerializedCAS);
if (aSerializedCAS != null) {
msgSize = ((byte[])aSerializedCAS).length;
}
tm.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.BinaryPayload);
}
} catch (AsynchAEException ex) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.WARNING,
CLASS_NAME.getName(),
"sendCasToRemoteDelegate",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_unable_to_send_reply__WARNING",
new Object[] { getAnalysisEngineController().getComponentName(),
anEndpoint.getDestination(), brokerConnectionURL, entry.getInputCasReferenceId() == null ? "" : entry.getInputCasReferenceId(), entry.getCasReferenceId(), 0, ex });
return;
}
// Add Cas Reference Id to the outgoing JMS Header
tm.setStringProperty(AsynchAEMessage.CasReference, entry.getCasReferenceId());
// Add common properties to the JMS Header
if (isRequest == true) {
populateHeaderWithRequestContext(tm, anEndpoint, AsynchAEMessage.Process);
} else {
populateHeaderWithResponseContext(tm, anEndpoint, AsynchAEMessage.Process);
tm.setBooleanProperty(AsynchAEMessage.SentDeltaCas, entry.sentDeltaCas());
}
// The following is true when the analytic is a CAS Multiplier
if (casStateEntry.isSubordinate() && !isRequest) {
// Override MessageType set in the populateHeaderWithContext above.
// Make the reply message look like a request. This message will contain a new CAS
// produced by the CAS Multiplier. The client will treat this CAS
// differently from the input CAS.
tm.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
isRequest = true;
// Save the id of the parent CAS
tm.setStringProperty(AsynchAEMessage.InputCasReference, getTopParentCasReferenceId(entry
.getCasReferenceId()));
// Add a sequence number assigned to this CAS by the controller
tm.setLongProperty(AsynchAEMessage.CasSequence, entry.getCasSequence());
// If this is a Cas Multiplier, add a reference to a special queue where
// the client sends Free Cas Notifications
if (freeCASTempQueue != null) {
// Attach a temp queue to the outgoing message. This is a queue where
// Free CAS notifications need to be sent from the client
tm.setJMSReplyTo(freeCASTempQueue);
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINE,
CLASS_NAME.getName(),
"sendCasToRemoteEndpoint",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_send_cas_to_collocated_service_detail__FINE",
new Object[] { getAnalysisEngineController().getComponentName(), "Remote",
anEndpoint.getEndpoint(), entry.getCasReferenceId(),
entry.getInputCasReferenceId(), entry.getInputCasReferenceId() });
}
}
dispatch(tm, anEndpoint, entry, isRequest, endpointConnection, msgSize);
} catch (JMSException e) {
// Unable to establish connection to the endpoint. Logit and continue
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
"sendCasToRemoteDelegate", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_unable_to_connect__INFO",
new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint() });
}
} catch (ServiceShutdownException e) {
throw e;
} catch (AsynchAEException e) {
throw e;
} catch (ConnectException e) {
casStateEntry.setDeliveryToClientFailed();
} catch (Exception e) {
throw new AsynchAEException(e);
}
}