in uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java [244:353]
private void stopConnection() {
SharedConnection sharedConnection;
// there is a dedicated shared connection for each broker URI
if ( brokerURI != null && (sharedConnection = lookupConnection(brokerURI)) != null) {
try {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
String msg = ":::::::::::::::: "+Thread.currentThread().getId()+" stopConnection() - acquiring semaphore";
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "stopConnection",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_debug_msg__FINEST",
new Object[] { msg });
}
// use broker dedicated semaphore to lock the code for updates
sharedConnection.getSemaphore().acquire();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
String msg =":::::::::::::::: "+Thread.currentThread().getId()+" stopConnection() - acquired semaphore";
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "stopConnection",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_debug_msg__FINEST",
new Object[] { msg });
}
// Remove a client from registry
sharedConnection.unregisterClient(this);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
String msg = ":::::::::::::::: "+Thread.currentThread().getId()+" stopConnection() - 1";
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "stopConnection",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_debug_msg__FINEST",
new Object[] { msg });
}
ActiveMQConnection amqc = (ActiveMQConnection)sharedConnection.getConnection();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
String msg = ":::::::::::::::: "+Thread.currentThread().getId()+" stopConnection() - 2";
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "stopConnection",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_debug_msg__FINEST",
new Object[] { msg });
}
if (initialized) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
String msg = ":::::::::::::::: "+Thread.currentThread().getId()+" stopConnection() - 3";
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "stopConnection",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_debug_msg__FINEST",
new Object[] { msg });
}
try {
if ( amqc != null ) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
String msg = "................ Closing Client Connection client ID:"+amqc.getConnectionInfo().getClientId()+" Client Count:"+sharedConnection.getClientCount();
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "stopConnection",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_debug_msg__FINEST",
new Object[] { msg });
}
} else {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
String msg = ":::::::::::::::: "+Thread.currentThread().getId()+" stopConnection() - 5";
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "stopConnection",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_debug_msg__FINEST",
new Object[] { msg });
}
}
if ( amqc != null && amqc.isStarted() &&
((ActiveMQSession)consumerSession).isRunning() ) {
consumerSession.close();
((ActiveMQMessageConsumer)consumer).stop();
consumer.close();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
String msg = ":::::::::::::::: "+Thread.currentThread().getId()+" stopConnection() - 6";
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "stopConnection",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_debug_msg__FINEST",
new Object[] { msg });
}
if ( sharedConnection.getClientCount() <= 1 ) {
sharedConnection.destroy();
amqc.close();
}
} else if ( sharedConnection.getClientCount() <= 1 ) {
sharedConnection.destroy();
amqc.close();
}
} catch (Exception exx) {exx.printStackTrace();}
}
// Delete client's temp reply queue from AMQ Broker
if ( amqc != null && !amqc.isClosed() && !amqc.isClosing() && consumerDestination != null &&
consumerDestination instanceof ActiveMQTempDestination ) {
try {
if ( !amqc.isClosed() && !amqc.isTransportFailed()) {
amqc.deleteTempDestination((ActiveMQTempDestination)consumerDestination);
}
} catch( Exception e) {
e.printStackTrace();
}
}
} catch (Exception e) {
} finally {
sharedConnection.destroy();
if ( sharedConnection != null ) {
sharedConnection.getSemaphore().release();
}
}
}
}