private synchronized JmsEndpointConnection_impl getEndpointConnection()

in uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java [436:647]


  private synchronized JmsEndpointConnection_impl getEndpointConnection(Endpoint anEndpoint)
          throws AsynchAEException, ServiceShutdownException, ConnectException {
	  
    try {
      controllerLatch.await();
    } catch (InterruptedException e) {
    }
    JmsEndpointConnection_impl endpointConnection = null;
    String brokerConnectionURL = null;
    
    
    try {
        connectionSemaphore.acquire();
        //  If sending a Free Cas Request to a remote Cas Multiplier always use the CM's
        //  broker
        if ( anEndpoint.isFreeCasEndpoint() && anEndpoint.isCasMultiplier() && anEndpoint.isReplyEndpoint()) {
          brokerConnectionURL = anEndpoint.getServerURI();
        } else {
          //  If this is a reply to a client, use the same broker URL that manages this service input queue.
          //  Otherwise this is a request so use a broker specified in the endpoint object.
          brokerConnectionURL = (anEndpoint.isReplyEndpoint()) ? serverURI : anEndpoint.getServerURI();
          
        }
        String key = getLookupKey(anEndpoint);
        String destination = getDestinationName(anEndpoint);

        // First get a Map containing destinations managed by a broker provided by the client
        BrokerConnectionEntry brokerConnectionEntry = null;
        boolean startInactivityReaperTimer = false;
        if (connectionMap.containsKey(brokerConnectionURL)) {
          brokerConnectionEntry = (BrokerConnectionEntry) connectionMap.get(brokerConnectionURL);
          // Findbugs thinks that the above may return null, perhaps due to a race condition. Add
          // the null check just in case
          if (brokerConnectionEntry == null) {
            throw new AsynchAEException("Controller:"
                    + getAnalysisEngineController().getComponentName()
                    + " Unable to Lookup Broker Connection For URL:" + brokerConnectionURL);
          } 
          brokerConnectionEntry.setBrokerURL(brokerConnectionURL);
          if ( JmsEndpointConnection_impl.connectionClosedOrFailed(brokerConnectionEntry) ) {
        	  brokerConnectionEntry.getConnectionTimer().cancelTimer();
            invalidateConnectionAndEndpoints(brokerConnectionEntry);
            brokerConnectionEntry = createConnectionEntry(brokerConnectionURL);
            startInactivityReaperTimer = true;
//            System.out.println(">>>>>> Connection Map Size:"+connectionMap.size());
          }
        } else {
          brokerConnectionEntry = createConnectionEntry(brokerConnectionURL);
          //System.out.println("---------------- New Broker "+brokerConnectionURL);
//          System.out.println(">>>>>> Connection Map Size:"+connectionMap.size());
//          long replyQueueInactivityTimeout = getInactivityTimeout(destination, brokerConnectionURL);
          startInactivityReaperTimer = true;
        }
        if ( startInactivityReaperTimer ) {
            long inactivityTimeout = getInactivityTimeout(destination, brokerConnectionURL);
            brokerConnectionEntry.getConnectionTimer().setInactivityTimeout(inactivityTimeout);
            // Start the FixedRate timer which wakes up at regular intervals defined by 'replyQueueInactivityTimeout'.
            // The purpose is to find inactive jms sessions. All sessions found the be inactive will be
            // closed.
            brokerConnectionEntry.getConnectionTimer().startSessionReaperTimer(getAnalysisEngineController().getComponentName());
        }
        
        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
          UIMAFramework.getLogger(CLASS_NAME).logrb(
                  Level.FINE,
                  CLASS_NAME.getName(),
                  "getEndpointConnection",
                  JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                  "UIMAJMS_acquiring_connection_to_endpoint__FINE",
                  new Object[] { getAnalysisEngineController().getComponentName(), destination,
                    brokerConnectionURL });
        }

        // check the cache first
        if (!brokerConnectionEntry.endpointExists(key)) {
          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
            UIMAFramework.getLogger(CLASS_NAME).logrb(
                    Level.FINE,
                    CLASS_NAME.getName(),
                    "getEndpointConnection",
                    JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                    "UIMAJMS_create_new_connection__FINE",
                    new Object[] { getAnalysisEngineController().getComponentName(), destination,
                      brokerConnectionURL });
          }
          
          Endpoint masterEndpoint = null;
          if ( getAnalysisEngineController() instanceof AggregateAnalysisEngineController ) {
            //  Check if the endpoint has previously FAILED. It may have been marked as FAILED
            //  due to a temp queue listener shutdown caused by a Broker failure. In such case,
            //  before sending a message to a remote delegate we need to start a new instance
            //  of a listener which creates a new temp reply queue. 
            if ( !anEndpoint.isReplyEndpoint() ) {  // this just means that we are not sending reply to a client
              //  The masterEndoint has the most current state. The 'anEndpoint' instance passed into
              //  this method is a clone from the master made at the begining of processing. The master endpoint
              //  may have been marked as FAILED after the clone was made
              masterEndpoint = ((AggregateAnalysisEngineController) getAnalysisEngineController()).
                        lookUpEndpoint(anEndpoint.getDelegateKey(),false);
              if ( masterEndpoint != null ) {
                //  Only one thread at a time is allowed here.
                synchronized( masterEndpoint ) {
                  if ( masterEndpoint.getStatus() == Endpoint.FAILED ) {
                	  
                	String name =  anEndpoint.getDestination().toString();
                    //  Returns InputChannel if the Reply Listener for the delegate has previously failed.
                    //  If the listener hasnt failed the getReplyInputChannel returns null
//                    InputChannel iC = getAnalysisEngineController().getReplyInputChannel(anEndpoint.getDelegateKey());
                    InputChannel iC = getAnalysisEngineController().getReplyInputChannel(anEndpoint.getDestination().toString());
                    if ( iC != null ) { 
                      try {
                        // Create a new Listener, new Temp Queue and associate the listener with the Input Channel
                    	// Also resets endpoint status to OK  
                        iC.createListener(anEndpoint.getDelegateKey(), anEndpoint);
                        iC.removeDelegateFromFailedList(masterEndpoint.getDelegateKey());
                      } catch( Exception exx) { 
                        throw new AsynchAEException(exx);
                      }
                    } else{
                    	throw new AsynchAEException("Aggregate:"+getAnalysisEngineController()+" Has not yet recovered a listener for delegate: "+anEndpoint.getDelegateKey());
                    }
                  } else if ( !masterEndpoint.isFreeCasEndpoint() ) {
                    //  In case this thread blocked while the reply queue listener was created, make sure
                    //  that this endpoint uses the most up-date reply queue destination
                    anEndpoint.setDestination(masterEndpoint.getDestination());
                  }
                }
              }
            }
          }
          
          endpointConnection = new JmsEndpointConnection_impl(brokerConnectionEntry, anEndpoint,
                  getAnalysisEngineController());
          brokerConnectionEntry.addEndpointConnection(key, endpointConnection);
//          long replyQueueInactivityTimeout = getInactivityTimeout(destination, brokerConnectionURL);
//          brokerConnectionEntry.getConnectionTimer().setInactivityTimeout(replyQueueInactivityTimeout);
//          // Start the FixedRate timer which wakes up at regular intervals defined by 'replyQueueInactivityTimeout'.
//          // The purpose is to find inactive jms sessions. All sessions found the be inactive will be
//          // closed.
//          brokerConnectionEntry.getConnectionTimer().startSessionReaperTimer(getAnalysisEngineController().getComponentName());

          // Connection is not in the cache, create a new connection, initialize it and cache it
          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
                    "getEndpointConnection", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                    "UIMAJMS_open_new_connection_to_endpoint__FINE",
                    new Object[] { getDestinationName(anEndpoint), brokerConnectionURL });
          }

          /**
           * Open connection to a broker, create JMS session and MessageProducer
           */
          endpointConnection.open();

          brokerConnectionEntry.getConnectionTimer().setConnectionCreationTimestamp(
                  endpointConnection.connectionCreationTimestamp);

          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
            UIMAFramework.getLogger(CLASS_NAME).logrb(
                    Level.FINE,
                    CLASS_NAME.getName(),
                    "getEndpointConnection",
                    JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                    "UIMAJMS_connection_opened_to_endpoint__FINE",
                    new Object[] { getAnalysisEngineController().getComponentName(), destination,
                      brokerConnectionURL });
          }
        } else {
          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
            UIMAFramework.getLogger(CLASS_NAME).logrb(
                    Level.FINE,
                    CLASS_NAME.getName(),
                    "getEndpointConnection",
                    JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                    "UIMAJMS_reusing_existing_connection__FINE",
                    new Object[] { getAnalysisEngineController().getComponentName(), destination,
                      brokerConnectionURL });
          }
          // Retrieve connection from the connection cache
          endpointConnection = brokerConnectionEntry.getEndpointConnection(key);
          // check the state of the connection and re-open it if necessary
          if (endpointConnection != null && !endpointConnection.isOpen()) {
            if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
              UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
                      "getEndpointConnection", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                      "UIMAJMS_connection_closed_reopening_endpoint__FINE",
                      new Object[] { destination });
            }
            endpointConnection.open();
            if ( endpointConnection.isOpen()) {
                brokerConnectionEntry.getConnectionTimer()
                .setConnectionCreationTimestamp(System.nanoTime());
                if ( getAnalysisEngineController() instanceof AggregateAnalysisEngineController &&
                		anEndpoint.getDelegateKey() != null ) {
                	Endpoint masterEndpoint = 
                		((AggregateAnalysisEngineController) getAnalysisEngineController()).lookUpEndpoint(
                				anEndpoint.getDelegateKey(), false);
                	masterEndpoint.setStatus(Endpoint.OK);
                }
            } 
          }
        }

    
    
    }
    catch (InterruptedException e) {
      } finally {
	    connectionSemaphore.release();	  
      }
    
    return endpointConnection;
  }