in uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java [798:940]
public void afterPropertiesSet(final boolean propagate) {
if (endpoint != null) {
// Override the prefetch size. The dd2spring always sets this to 1 which
// may effect the throughput of a service. Change the prefetch size to
// number of consumer threads defined in DD.
if ( cc > 1 && endpoint.isTempReplyDestination() && connectionFactory instanceof ActiveMQConnectionFactory ) {
((ActiveMQConnectionFactory)connectionFactory).getPrefetchPolicy().setQueuePrefetch(cc);
}
// Endpoint has been plugged in from spring xml. This means this is a listener
// for a reply queue. We need to rewire things a bit. First make Spring use
// one thread to make sure we receive messages in order. To fix a race condition
// where a parent CAS is processed first instead of its last child, we need to
// assure that we get the child first. We need to update the counter of the
// parent CAS to reflect that there is another child. In the race condition that
// was observed, the parent was being processed first in one thread. The parent
// reached the final step and subsequently was dropped. Subsequent to that, a
// child CAS processed on another thread begun executing and failed since a look
// on its parent resulted in CAS Not Found In Cache Exception.
// Make sure Spring uses one thread
super.setConcurrentConsumers(1);
if (cc > 1) {
try {
String prefix = endpoint.getDelegateKey()+" Reply Thread";
concurrentListener = new ConcurrentMessageListener(cc, ml, getDestinationName(), threadGroup,prefix);
super.setMessageListener(concurrentListener);
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
if ( controller != null ) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"afterPropertiesSet", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_service_exception_WARNING", controller.getComponentName());
}
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"afterPropertiesSet", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_exception__WARNING", e);
}
return;
}
} else {
pluginThreadPool = true;
}
} else {
super.setConcurrentConsumers(1);
//if ( targetedListener ) {
//super.setConcurrentConsumers(1);
//} else {
// super.setConcurrentConsumers(cc);
//}
pluginThreadPool = true;
}
Thread t = new Thread(threadGroup, new Runnable() {
public void run() {
Destination destination = __listenerRef.getDestination();
try {
// Wait until the connection factory is injected by Spring
while (connectionFactory == null) {
try {
Thread.sleep(50);
} catch (InterruptedException ex) {
}
}
System.setProperty("BrokerURI", ((ActiveMQConnectionFactory) connectionFactory)
.getBrokerURL());
boolean done = false;
// Wait for controller to be injected by Uima AS
if (isActiveMQDestination() && !isGetMetaListener()
&& !((ActiveMQDestination) destination).isTemporary()) {
// Add self to InputChannel
connectWithInputChannel();
// Wait for InputChannel to plug in a controller
done = true;
while (controller == null)
try {
Thread.sleep(50);
} catch (InterruptedException ex) {
}
;
}
// Plug in connection Factory to Spring's Listener
__listenerRef.injectConnectionFactory();
if ( pluginThreadPool ) {
setUimaASThreadPoolExecutor(cc);
}
// Initialize the TaskExecutor. This call injects a custom Thread Pool into the
// TaskExecutor provided in the spring xml. The custom thread pool initializes
// an instance of AE in a dedicated thread
if ( !isGetMetaListener()) {
initializeTaskExecutor(cc);
}
// if ( threadPoolExecutor == null ) {
// // Plug in TaskExecutor to Spring's Listener
// __listenerRef.injectTaskExecutor();
// }
if ( propagate ) {
// Notify Spring Listener that all properties are ready
__listenerRef.allPropertiesSet();
}
if (isActiveMQDestination() && destination != null) {
destinationName = ((ActiveMQDestination) destination).getPhysicalName();
}
if (!done) {
connectWithInputChannel();
done = true;
}
if (concurrentListener != null) {
concurrentListener.setAnalysisEngineController(controller);
}
// Save number of concurrent consumers on the temp reply queue in case we need to
// recreate a new listener on a new temp queue created during recovery
if (endpoint != null && controller instanceof AggregateAnalysisEngineController) {
Delegate delegate = ((AggregateAnalysisEngineController) controller)
.lookupDelegate(endpoint.getDelegateKey());
if (delegate != null) {
delegate.getEndpoint().setConcurrentReplyConsumers(cc);
}
}
// Show ready message on the console only if this listener is *not* listening
// on an input queue. Input queue listeners are not started until the service
// is fully initialized
if (__listenerRef.getMessageListener() == null && getDestination() != null) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, this.getClass().getName(),
"afterPropertiesSet", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_listener_ready__INFO",
new Object[] {controller.getComponentName(), getBrokerUrl(), getDestination() });
}
} catch (Exception e) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
"afterPropertiesSet", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_jms_listener_failed_WARNING",
new Object[] { destination, getBrokerUrl(), e });
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"afterPropertiesSet", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_exception__WARNING", e);
}
}
});
t.start();
}