in uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java [1338:1453]
private void setUimaASThreadPoolExecutor(int consumerCount) throws Exception {
if ( isGetMetaListener() ) {
super.setMessageListener(ml);
} else if ( isFreeCasQueueListener()) {
super.setMessageListener(ml);
} else if (endpoint != null && endpoint.isTempReplyDestination()) {
super.setMessageListener(ml);
} else {
if ( isProcessListener() ) { //controller != null && controller instanceof PrimitiveAnalysisEngineController ) {
// Singleton handler shared by Process CAS listener and a targeted listener. The handler
// onMessage() is called by Spring when a message with a matching selector is available.
// When onMessage() is called, it adds a message to the Priority Queue
// PriorityMessageHandler h = PriorityMessageHandler.getInstance();
// System.out.println("+++++++++++++ Controller:"+controller.getComponentName()+" Listener Scaleout="+cc+" Selector:"+super.getMessageSelector());
//super.setMessageListener(h);
// targeted listener should not have its own thread pool because it needs to use
// threads created by the Process Cas Listener. Each of these threads is pinned to
// a dedicated AE initialized at startup. Contract says that each AE process() will be called
// on the same thread that initialized it. The targeted listener and process listener share
// the same handler where CASes are pushed onto a Blocking Priority Queue for processing.
if (!targetedListener && !isFreeCasQueueListener()) {
// System.out.println(">>>>>>>>>>> Controller:"+controller.getComponentName()+" Listener Scaleout="+cc+" Selector:"+super.getMessageSelector());
PriorityMessageHandler h = new PriorityMessageHandler(cc);
super.setMessageListener(h);
try {
while (controller.getInputChannel() == null) {
synchronized (h) {
h.wait(100);
}
}
} catch (Exception e) {
e.printStackTrace();
}
//if (isPrimitiveService()) {
latchToCountNumberOfTerminatedThreads = new CountDownLatch(consumerCount);
// Create a Custom Thread Factory. Provide it with an instance of
// PrimitiveController so that every thread can call it to initialize
// the next available instance of a AE.
tf = new UimaAsPriorityBasedThreadFactory(threadGroup,
controller, latchToCountNumberOfTerminatedThreads);
//(PrimitiveAnalysisEngineController) controller, latchToCountNumberOfTerminatedThreads);
((UimaAsPriorityBasedThreadFactory) tf).withQueue(h.getQueue())
.withChannel(controller.getInputChannel());
((UimaAsPriorityBasedThreadFactory) tf).setDaemon(true);
if ( taskExecutor == null ) { // true for aggregates
taskExecutor = new ThreadPoolTaskExecutor();
}
// This ThreadExecutor will use custom thread factory instead of defult one
((ThreadPoolTaskExecutor) taskExecutor).setThreadFactory(tf);
// Initialize the thread pool
((ThreadPoolTaskExecutor) taskExecutor).initialize();
// Make sure all threads are started. This forces each thread to call
// PrimitiveController to initialize the next instance of AE
((ThreadPoolTaskExecutor) taskExecutor).getThreadPoolExecutor().prestartAllCoreThreads();
// } else {
//
// }
}
} else {
if ( controller.getInputChannel() == null ) {
//System.out.println("............. Error - JmsInputChannel not set yet...");
} else if ( !(controller.getInputChannel() instanceof MessageListener ||
controller.getInputChannel() instanceof SessionAwareMessageListener) ) {
//System.out.println("............. Error - wrong MessageListener type - Getting this:"+controller.getInputChannel().getClass().getName());
}
super.setMessageListener(controller.getInputChannel());
}
}
// create task executor with custom thread pool for:
// 1) GetMeta request processing
// 2) ReleaseCAS request
if ( !targetedListener && taskExecutor == null ) {
UimaAsThreadFactory utf = new UimaAsThreadFactory(threadGroup);
utf.setDaemon(false);
// tf.defineUsageAs(UsedFor.GetMetaHandling);//setForGetMetaHandling();
if ( isFreeCasQueueListener()) {
utf.setThreadNamePrefix(controller.getComponentName()+" - FreeCASRequest Thread");
} else if ( isGetMetaListener() ) {
utf.setThreadNamePrefix(super.getBeanName()+" - Thread");
} else if ( getDestination() != null && getMessageSelector() != null ) {
utf.setThreadNamePrefix(controller.getComponentName() + " Process Thread");
} else if ( endpoint != null && endpoint.isTempReplyDestination() ) {
utf.setThreadNamePrefix(super.getBeanName()+" - Thread");
} else {
throw new Exception("Unknown Context Detected in setUimaASThreadPoolExecutor()");
}
ExecutorService es = Executors.newFixedThreadPool(consumerCount,utf);
if ( es instanceof ThreadPoolExecutor ) {
threadPoolExecutor = (ThreadPoolExecutor)es;
super.setTaskExecutor(es);
}
}
/*
else {
UimaAsThreadFactory tf = new UimaAsThreadFactory(threadGroup);
tf.setDaemon(true);
if ( isFreeCasQueueListener()) {
tf.setThreadNamePrefix(controller.getComponentName()+" - FreeCASRequest Thread");
} else if ( isGetMetaListener() ) {
tf.setThreadNamePrefix(super.getBeanName()+" - Thread");
} else if ( getDestination() != null && getMessageSelector() != null ) {
tf.setThreadNamePrefix(controller.getComponentName() + " Process Thread");
} else if ( endpoint != null && endpoint.isTempReplyDestination() ) {
tf.setThreadNamePrefix(super.getBeanName()+" - Thread");
} else {
throw new Exception("Unknown Context Detected in setUimaASThreadPoolExecutor()");
}
}
*/
}