in uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java [248:424]
protected void dispatchMessage(PendingMessage pm, BaseUIMAAsynchronousEngineCommon_impl engine,
boolean casProcessRequest) throws Exception {
SharedConnection sc = engine.lookupConnection(engine.getBrokerURI());
ClientRequest cacheEntry = null;
boolean doCallback = false;
boolean addTimeToLive = true;
Session jmsSession = null;
// Check the environment for existence of NoTTL tag. If present,
// the deployer of the service wants to disable message expiration.
if (System.getProperty("NoTTL") != null) {
addTimeToLive = false;
}
try {
// long t1 = System.currentTimeMillis();
jmsSession = sc.getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
// Request JMS Message from the concrete implementation
Message message = null;
// Determine if this a CAS Process Request
// boolean casProcessRequest = isProcessRequest(pm);
// Only Process request can be serialized as binary
if (casProcessRequest && (engine.getSerialFormat() != SerialFormat.XMI)) {
message = jmsSession.createBytesMessage();
} else {
message = jmsSession.createTextMessage();
}
// get the producer initialized from a valid connection
// producer = getMessageProducer();
Destination d = null;
String selector = null;
// UIMA-AS ver 2.10.0 + sends Free Cas request to a service targeted queue
// instead of a temp queue. Regular queues can be recovered in case of
// a broker restart. The test below will be true for UIMA-AS v. 2.10.0 +.
// Code in JmsOutputChannel will add the selector if the service is a CM.
if (pm.get(AsynchAEMessage.TargetingSelector) != null) {
selector = (String) pm.get(AsynchAEMessage.TargetingSelector);
}
if (selector == null && (pm.getMessageType() == AsynchAEMessage.ReleaseCAS
|| pm.getMessageType() == AsynchAEMessage.Stop)) {
d = (Destination) pm.get(AsynchAEMessage.Destination);
} else {
d = jmsSession.createQueue(destinationName);
}
MessageProducer mProducer = jmsSession.createProducer(d);
mProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// System.out.println(">>>>>>> Time to create and initialize JMS
// Sesssion:"+(System.currentTimeMillis()-t1));
super.initializeMessage(pm, message);
String destination = ((ActiveMQDestination) d).getPhysicalName();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "run",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_msg_to_endpoint__FINE",
new Object[] {
UimaMessageValidator.decodeIntToString(AsynchAEMessage.Command,
message.getIntProperty(AsynchAEMessage.Command)),
UimaMessageValidator.decodeIntToString(AsynchAEMessage.MessageType,
message.getIntProperty(AsynchAEMessage.MessageType)),
destination });
}
if (casProcessRequest) {
cacheEntry = (ClientRequest) engine.getCache().get(pm.get(AsynchAEMessage.CasReference));
if (cacheEntry != null) {
// CAS cas = cacheEntry.getCAS();
// enable logging
if (System.getProperty("UimaAsCasTracking") != null) {
message.setStringProperty("UimaAsCasTracking", "enable");
}
// Target specific service instance if targeting for the CAS is provided
// by the client application
if (cacheEntry.getTargetServiceId() != null) {
// System.out.println("------------Client Sending CAS to Service Instance With
// Id:"+cacheEntry.getTargetServiceId());;
message.setStringProperty(UimaAsynchronousEngine.TargetSelectorProperty,
cacheEntry.getTargetServiceId());
}
// Use Process Timeout value for the time-to-live property in the
// outgoing JMS message. When this time is exceeded
// while the message sits in a queue, the JMS Server will remove it from
// the queue. What happens with the expired message depends on the
// configuration. Most JMS Providers create a special dead-letter queue
// where all expired messages are placed. NOTE: In ActiveMQ expired msgs in the
// DLQ
// are not auto evicted yet and accumulate taking up memory.
long timeoutValue = cacheEntry.getProcessTimeout();
if (timeoutValue > 0 && addTimeToLive) {
// Set high time to live value
message.setJMSExpiration(10 * timeoutValue);
}
if (pm.getMessageType() == AsynchAEMessage.Process) {
cacheEntry.setCASDepartureTime(System.nanoTime());
}
cacheEntry.setCASDepartureTime(System.nanoTime());
doCallback = true;
} else {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "run",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_failed_cache_lookup__WARNING",
new Object[] { pm.get(AsynchAEMessage.CasReference),
UimaMessageValidator.decodeIntToString(AsynchAEMessage.Command,
message.getIntProperty(AsynchAEMessage.Command)),
UimaMessageValidator.decodeIntToString(AsynchAEMessage.MessageType,
message.getIntProperty(AsynchAEMessage.MessageType)),
destination });
}
return; // no cacheEntry so just return
}
}
// start timers
if (casProcessRequest) {
CAS cas = cacheEntry.getCAS();
// Add the cas to a list of CASes pending reply. Also start the timer if
// necessary
engine.serviceDelegate.addCasToOutstandingList(cacheEntry.getCasReferenceId(), cas.hashCode(),
engine.timerPerCAS); // true=timer per cas
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "sendCAS",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_cas_added_to_pending_FINE",
new Object[] { cacheEntry.getCasReferenceId(), String.valueOf(cas.hashCode()),
engine.serviceDelegate.toString() });
}
} else if (pm.getMessageType() == AsynchAEMessage.GetMeta
&& engine.serviceDelegate.getGetMetaTimeout() > 0) {
// timer for PING has been started in sendCAS()
if (!engine.serviceDelegate.isAwaitingPingReply()) {
engine.serviceDelegate.startGetMetaRequestTimer();
}
} else {
doCallback = false; // dont call onBeforeMessageSend() callback on CPC
}
// Dispatch asynchronous request to Uima AS service
mProducer.send(message);
if (doCallback) {
UimaASProcessStatus status = new UimaASProcessStatusImpl(new ProcessTrace_impl(), cacheEntry.getCAS(),
cacheEntry.getCasReferenceId());
// Notify engine before sending a message
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "run",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_calling_onBeforeMessageSend__FINE",
new Object[] { pm.get(AsynchAEMessage.CasReference),
String.valueOf(cacheEntry.getCAS().hashCode()) });
}
// Note the callback is a misnomer. The callback is made *after* the send now
// Application receiving this callback can consider the CAS as delivere to a
// queue
engine.onBeforeMessageSend(status);
}
} catch( Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
"run", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_exception__WARNING", e);
}
if (casProcessRequest) {
addCasToOutstandingList((String)pm.get(AsynchAEMessage.CasReference));
}
} finally {
if (jmsSession != null) {
try {
jmsSession.close();
} catch (Exception eee) {
}
}
}
}