in uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java [1210:1336]
public void destroy(final boolean stopImmediate) {
if (awaitingShutdown) {
return;
}
// Spin a thread that will shutdown all taskExecutors and wait for their threads to stop.
// A separate thread is necessary since we cant stop a threadPoolExecutor if one of its
// threads is busy stopping the executor. This leads to a hang.
Thread threadGroupDestroyer = new Thread(threadGroup.getParent().getParent(),
"threadGroupDestroyer") {
public void run() {
try {
if ( !__listenerRef.awaitingShutdown ) {
ActiveMQConnection amqc = (ActiveMQConnection)getSharedConnection();
if ( amqc != null ) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
String msg = "......... Stopping Client ID:"+amqc.getConnectionInfo().getClientId();
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "destroy.run",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_debug_msg__FINEST",
new Object[] { msg });
}
amqc.stop();
}
awaitingShutdown = true;
if ( getMessageListener() instanceof PriorityMessageHandler ) {
((PriorityMessageHandler)getMessageListener()).getQueue().put(new MessageWrapper(null, null, null,HIGH_PRIORITY));
}
if (taskExecutor != null && taskExecutor instanceof ThreadPoolTaskExecutor) {
// Modify task executor to terminate idle threads. While the thread terminates
// it calls destroy() method on the pinned instance of AE
// java 5 ThreadPoolExecutor doesnt implement allowCoreThreadTimeout Method.
// Use alternate mechanism to passivate threads in the pool.
try {
Method m = ((ThreadPoolTaskExecutor) taskExecutor).
getThreadPoolExecutor().getClass().getMethod("allowCoreThreadTimeOut", boolean.class);
m.invoke(((ThreadPoolTaskExecutor) taskExecutor).getThreadPoolExecutor(), true);
} catch ( NoSuchMethodException e) {
((ThreadPoolTaskExecutor) taskExecutor).getThreadPoolExecutor().setCorePoolSize(0);
}
((ThreadPoolTaskExecutor) taskExecutor).getThreadPoolExecutor().setKeepAliveTime(1000, TimeUnit.MILLISECONDS);
((ThreadPoolTaskExecutor) taskExecutor).setWaitForTasksToCompleteOnShutdown(true);
((ThreadPoolTaskExecutor) taskExecutor).shutdown();
((ThreadPoolTaskExecutor) taskExecutor).destroy();
((ThreadPoolTaskExecutor) taskExecutor).getThreadPoolExecutor().shutdownNow();
} else if (concurrentListener != null) {
shutdownTaskExecutor(concurrentListener.getTaskExecutor(), stopImmediate);
concurrentListener.stop();
} else if ( threadPoolExecutor != null ) {
shutdownTaskExecutor(threadPoolExecutor, true);
}
}
if ( getTaskExecutor() != null ) {
if ( getTaskExecutor() instanceof ThreadPoolTaskExecutor ) {
((ThreadPoolTaskExecutor)getTaskExecutor()).shutdown();
}
}
String controllerName = (__listenerRef.controller == null) ? "" :__listenerRef.controller.getComponentName();
__listenerRef.shutdown();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
String msg = "................... Listener shutdown() - Called - Listener:"+getDestinationName();
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "destroy.run",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_debug_msg__FINEST",
new Object[] { msg });
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
"destroy.run()", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_listener_shutdown__INFO", new Object[] {controllerName,__listenerRef.getMessageSelector(),__listenerRef.getBrokerUrl()});
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
"destroy.run()", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_listener_jms_connection_closed__INFO", new Object[] {controllerName,__listenerRef.getMessageSelector()});
}
} catch (Exception e) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
"destroy", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_exception__WARNING", e);
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
threadGroup.getParent().list();
}
try {
synchronized (threadGroup) {
if (!threadGroup.isDestroyed()) {
threadGroup.destroy();
}
}
} catch (Exception e) {
} // Ignore
// Wait for process threads to finish. Each thread
// will count down the latch on exit. When all thread
// finish we can continue. Otherwise we block on the latch
try {
if ( latchToCountNumberOfTerminatedThreads != null && cc > 1) {
latchToCountNumberOfTerminatedThreads.await();
}
} catch( Exception ex) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
"destroy", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_exception__WARNING", ex);
}
}
};
threadGroupDestroyer.start();
// Wait for process threads to finish. Each thread
// will count down the latch on exit. When all thread
// finish we can continue. Otherwise we block on the latch
// try {
// if ( latchToCountNumberOfTerminatedThreads != null && cc > 1) {
// System.out.println("---------------- Latch Count:"+latchToCountNumberOfTerminatedThreads.getCount());
// latchToCountNumberOfTerminatedThreads.await();
// }
// } catch( Exception ex) {
// UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
// "destroy", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
// "UIMAJMS_exception__WARNING", ex);
// }
}