private void openChannel()

in uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java [177:428]


  private void openChannel(String brokerUri, String aComponentName,
          String anEndpointName, AnalysisEngineController aController) throws AsynchAEException,
          ServiceShutdownException, ConnectException {
	  synchronized (lock) {
		    try {

		        // If replying to http request, reply to a queue managed by this service broker using tcp
		        // protocol
		        if (isReplyEndpoint && brokerUri.startsWith("http")) {
		          brokerUri = ((JmsOutputChannel) aController.getOutputChannel()).getServerURI();

		          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
		            UIMAFramework.getLogger(CLASS_NAME).logrb(
		                    Level.FINE,
		                    CLASS_NAME.getName(),
		                    "open",
		                    JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
		                    "UIMAJMS_override_connection_to_endpoint__FINE",
		                    new Object[] {  aComponentName, getEndpoint(),
		                      ((JmsOutputChannel) aController.getOutputChannel()).getServerURI() });
		          }
		        } else if ( !brokerUri.startsWith("http") && !brokerUri.startsWith("failover") && !brokerUri.startsWith("vm://localhost?broker.persistent=false")){
				  String prefix = "";
  		          if ( brokerUri.indexOf("?") > -1) {
		            prefix = "&";
		          } else {
		            prefix ="?";
		          }
		          String extraParams = "";
		          // check if maxInactivityDuration exists in the given url
		          if ( brokerUri.indexOf("wireFormat.maxInactivityDuration") == -1 ) {
		            // turn off activemq inactivity monitor
		            extraParams = prefix+"wireFormat.maxInactivityDuration=0";
		          }
				  brokerUri += extraParams;
		        }

		        if (!isOpen()) {
		          Connection conn = null;
		          //  Check connection status and create a new one (if necessary) as an atomic operation
		          try {
		            connectionSemaphore.acquire();
		            
		            if (connectionClosedOrFailed(brokerDestinations)) {
		              // Create one shared connection per unique brokerURL.
		              if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
		                UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
		                        "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
		                        "UIMAJMS_activemq_open__FINE",
		                        new Object[] { aController.getComponentName(), anEndpointName, brokerUri });
		              }
		              if ( brokerDestinations.getConnection() != null ) {
		                try {
		                	//  Close the connection to avoid leaks in the broker
		                  brokerDestinations.getConnection().close();
		                } catch( Exception e) {
		                  //  Ignore exceptions on a close of a bad connection
		                }
		              }
		              // log connectivity problem once and retry
		              boolean logConnectionProblem=true;

		              int retryCount = 4;  // 
		              // recover lost connection indefinitely while the service is running
//		              while( !controller.isStopped() ) {
		              while( retryCount > 0 ) {
		            	  retryCount--;
		            	  if ( controller.isStopped() ) {
		            		  break;
		            	  }
		            	  
		            	  try {
				              ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUri);
				              // White list packages for deserialization 
				              factory.setTrustAllPackages(true);
				              factory.setConnectionIDPrefix("JmsOutputChannel");
				              factory.setWatchTopicAdvisories(false);
				              
				              //  Create shared jms connection to a broker
				              conn = factory.createConnection();
				              factory.setDispatchAsync(true);
				              factory.setUseAsyncSend(true);
				              factory.setCopyMessageOnSend(false);
				              conn.start();
				              //  Cache the connection. There should only be one connection in the jvm
				              //  per unique broker url. 
				              brokerDestinations.setConnection(conn);
				              // Close and invalidate all sessions previously created from the old connection
				              Iterator<Map.Entry<Object, JmsEndpointConnection_impl>> it = brokerDestinations.endpointMap
				                      .entrySet().iterator();
				              while (it.hasNext()) {
				                Map.Entry<Object, JmsEndpointConnection_impl> entry = it.next();
				                if (entry.getValue().producerSession != null) {
				                  // Close session
				                  entry.getValue().producerSession.close();
				                  // Since we created a new connection invalidate session that
				                  // have been created with the old connection
				                  entry.getValue().producerSession = null;
				                }
				              }
		            		  break; // Got the connection, break out of the while-loop
		            		  
		            	  } catch( JMSException jex) {
		            		  if ( conn != null  ) {
		            			  try {
		            				  conn.close();
		            			  } catch( Exception ee) {}
		            		  }
		            		  if ( jex.getCause() != null && logConnectionProblem ) {
		            			  logConnectionProblem = false;   // log once
			            		  // Check if unable to connect to the broker and retry ...
		            			  if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
		            		          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
		            		                  "openChannel", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
		            		                  "UIMAEE_service_lost_connectivity_WARNING",
		            		                  new Object[] { controller.getComponentName(), brokerUri});
		            		          
		            		          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
		            		                  "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
		            		                  "UIMAJMS_exception__WARNING", jex);
		            		          
		            		          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
		            		                  "openChannel", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
		            		                  "UIMAEE_service_lost_connectivity_WARNING",
		            		                  new Object[] { controller.getComponentName(), brokerUri});
		            		          
		            		          
		            		        }
		            		  } 
		            		 lock.wait(5000);  // wait between retries 
		            	  } catch ( Exception ee) {
		            		  ee.printStackTrace();
		            		  if ( conn != null  ) {
		            			  try {
		            				  conn.close();
		            			  } catch( Exception eee) {}
		            		  }
		            	  }
		              } //while

	            	  if ( retryCount == 0) {   // failed recovering a connection
//	            		  Thread.currentThread().dumpStack();
	            		  throw new ConnectException("Unable to Create Connection to Broker:"+brokerUri);
	            	  }
		              if ( logConnectionProblem == false )  { // we had conectivity problem. Log the fact that it was recovered
		            	  if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
            		          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
            		                  "openChannel", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
            		                  "UIMAEE_service_regained_connectivity_INFO",
            		                  new Object[] { controller.getComponentName(), brokerUri});
		            	  }
		            	  
		              }
		              }
		          } catch( Exception exc) {
		        	  if ( conn != null  ) {
	          			  try {
	          				  conn.close();
	          			  } catch( Exception ee) {}
		        	  }
		             throw exc; // rethrow
		          } finally {
		            connectionSemaphore.release();
		           
		          }
		          
		          connectionCreationTimestamp = System.nanoTime();
		          failed = false;
		        } else {
		        	System.out.println("...... Reusing Existing Broker Connetion");
		        }
		        Connection conn = brokerDestinations.getConnection();
		        if (failed) {
		          // Unable to create a connection
		          return;
		        }

		        producerSession = conn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
		        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
	            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
                      "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                      "UIMAJMS_session_open__FINE",
                      new Object[] { aComponentName, anEndpointName, brokerUri });
		        }

		        if ((delegateEndpoint.getCommand() == AsynchAEMessage.Stop || isReplyEndpoint)
		                && delegateEndpoint.getDestination() != null) {
		          producer = producerSession.createProducer(null);
		          if (aController != null) {
		            if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
		              UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
		                      "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
		                      "UIMAJMS_temp_conn_starting__FINE",
		                      new Object[] { aComponentName, anEndpointName, brokerUri });
		            }
		          }
		        } else {
		          destination = producerSession.createQueue(getEndpoint());
		          producer = producerSession.createProducer(destination);
		          if (controller != null) {
		            if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
		              UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
		                      "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
		                      "UIMAJMS_conn_starting__FINE",
		                      new Object[] { aComponentName, anEndpointName, brokerUri });
		            }
		          }
		        }
		        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
		        // Since the connection is shared, start it only once
		        if (!((ActiveMQConnection) brokerDestinations.getConnection()).isStarted()) {
		          brokerDestinations.getConnection().start();
		        }
		        if (controller != null) {
		          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
		            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
		                    "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
		                    "UIMAJMS_conn_started__FINE", new Object[] { endpoint, brokerUri });
		            if (controller.getInputChannel() != null) {
		              UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
		                      "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
		                      "UIMAJMS_connection_open_to_endpoint__FINE",
		                      new Object[] { aComponentName, getEndpoint(), brokerUri });
		            }
		          }
		        }
		        failed = false;
		      } catch (Exception e) {
		        boolean rethrow = true;
		        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
		          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
		                  "openChannel", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
		                  "UIMAEE_service_exception_WARNING", controller.getComponentName());
		          
		          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
		                  "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
		                  "UIMAJMS_exception__WARNING", e);
		        }
		        if ( e instanceof ConnectException ) {
		            throw (ConnectException)e;
		        }

		        if (e instanceof JMSException) {
		          rethrow = handleJmsException((JMSException) e);
		        } 
		        
		        if (rethrow) {
		          throw new AsynchAEException(e);
		        }
		      }
	  }  // synchronized
  }