in uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java [177:428]
private void openChannel(String brokerUri, String aComponentName,
String anEndpointName, AnalysisEngineController aController) throws AsynchAEException,
ServiceShutdownException, ConnectException {
synchronized (lock) {
try {
// If replying to http request, reply to a queue managed by this service broker using tcp
// protocol
if (isReplyEndpoint && brokerUri.startsWith("http")) {
brokerUri = ((JmsOutputChannel) aController.getOutputChannel()).getServerURI();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINE,
CLASS_NAME.getName(),
"open",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_override_connection_to_endpoint__FINE",
new Object[] { aComponentName, getEndpoint(),
((JmsOutputChannel) aController.getOutputChannel()).getServerURI() });
}
} else if ( !brokerUri.startsWith("http") && !brokerUri.startsWith("failover") && !brokerUri.startsWith("vm://localhost?broker.persistent=false")){
String prefix = "";
if ( brokerUri.indexOf("?") > -1) {
prefix = "&";
} else {
prefix ="?";
}
String extraParams = "";
// check if maxInactivityDuration exists in the given url
if ( brokerUri.indexOf("wireFormat.maxInactivityDuration") == -1 ) {
// turn off activemq inactivity monitor
extraParams = prefix+"wireFormat.maxInactivityDuration=0";
}
brokerUri += extraParams;
}
if (!isOpen()) {
Connection conn = null;
// Check connection status and create a new one (if necessary) as an atomic operation
try {
connectionSemaphore.acquire();
if (connectionClosedOrFailed(brokerDestinations)) {
// Create one shared connection per unique brokerURL.
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
"openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_activemq_open__FINE",
new Object[] { aController.getComponentName(), anEndpointName, brokerUri });
}
if ( brokerDestinations.getConnection() != null ) {
try {
// Close the connection to avoid leaks in the broker
brokerDestinations.getConnection().close();
} catch( Exception e) {
// Ignore exceptions on a close of a bad connection
}
}
// log connectivity problem once and retry
boolean logConnectionProblem=true;
int retryCount = 4; //
// recover lost connection indefinitely while the service is running
// while( !controller.isStopped() ) {
while( retryCount > 0 ) {
retryCount--;
if ( controller.isStopped() ) {
break;
}
try {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUri);
// White list packages for deserialization
factory.setTrustAllPackages(true);
factory.setConnectionIDPrefix("JmsOutputChannel");
factory.setWatchTopicAdvisories(false);
// Create shared jms connection to a broker
conn = factory.createConnection();
factory.setDispatchAsync(true);
factory.setUseAsyncSend(true);
factory.setCopyMessageOnSend(false);
conn.start();
// Cache the connection. There should only be one connection in the jvm
// per unique broker url.
brokerDestinations.setConnection(conn);
// Close and invalidate all sessions previously created from the old connection
Iterator<Map.Entry<Object, JmsEndpointConnection_impl>> it = brokerDestinations.endpointMap
.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<Object, JmsEndpointConnection_impl> entry = it.next();
if (entry.getValue().producerSession != null) {
// Close session
entry.getValue().producerSession.close();
// Since we created a new connection invalidate session that
// have been created with the old connection
entry.getValue().producerSession = null;
}
}
break; // Got the connection, break out of the while-loop
} catch( JMSException jex) {
if ( conn != null ) {
try {
conn.close();
} catch( Exception ee) {}
}
if ( jex.getCause() != null && logConnectionProblem ) {
logConnectionProblem = false; // log once
// Check if unable to connect to the broker and retry ...
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"openChannel", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_service_lost_connectivity_WARNING",
new Object[] { controller.getComponentName(), brokerUri});
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_exception__WARNING", jex);
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"openChannel", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_service_lost_connectivity_WARNING",
new Object[] { controller.getComponentName(), brokerUri});
}
}
lock.wait(5000); // wait between retries
} catch ( Exception ee) {
ee.printStackTrace();
if ( conn != null ) {
try {
conn.close();
} catch( Exception eee) {}
}
}
} //while
if ( retryCount == 0) { // failed recovering a connection
// Thread.currentThread().dumpStack();
throw new ConnectException("Unable to Create Connection to Broker:"+brokerUri);
}
if ( logConnectionProblem == false ) { // we had conectivity problem. Log the fact that it was recovered
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
"openChannel", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_service_regained_connectivity_INFO",
new Object[] { controller.getComponentName(), brokerUri});
}
}
}
} catch( Exception exc) {
if ( conn != null ) {
try {
conn.close();
} catch( Exception ee) {}
}
throw exc; // rethrow
} finally {
connectionSemaphore.release();
}
connectionCreationTimestamp = System.nanoTime();
failed = false;
} else {
System.out.println("...... Reusing Existing Broker Connetion");
}
Connection conn = brokerDestinations.getConnection();
if (failed) {
// Unable to create a connection
return;
}
producerSession = conn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
"openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_session_open__FINE",
new Object[] { aComponentName, anEndpointName, brokerUri });
}
if ((delegateEndpoint.getCommand() == AsynchAEMessage.Stop || isReplyEndpoint)
&& delegateEndpoint.getDestination() != null) {
producer = producerSession.createProducer(null);
if (aController != null) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
"openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_temp_conn_starting__FINE",
new Object[] { aComponentName, anEndpointName, brokerUri });
}
}
} else {
destination = producerSession.createQueue(getEndpoint());
producer = producerSession.createProducer(destination);
if (controller != null) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
"openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_conn_starting__FINE",
new Object[] { aComponentName, anEndpointName, brokerUri });
}
}
}
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// Since the connection is shared, start it only once
if (!((ActiveMQConnection) brokerDestinations.getConnection()).isStarted()) {
brokerDestinations.getConnection().start();
}
if (controller != null) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
"openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_conn_started__FINE", new Object[] { endpoint, brokerUri });
if (controller.getInputChannel() != null) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
"openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_connection_open_to_endpoint__FINE",
new Object[] { aComponentName, getEndpoint(), brokerUri });
}
}
}
failed = false;
} catch (Exception e) {
boolean rethrow = true;
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"openChannel", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_service_exception_WARNING", controller.getComponentName());
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_exception__WARNING", e);
}
if ( e instanceof ConnectException ) {
throw (ConnectException)e;
}
if (e instanceof JMSException) {
rethrow = handleJmsException((JMSException) e);
}
if (rethrow) {
throw new AsynchAEException(e);
}
}
} // synchronized
}