public void afterPropertiesSet()

in uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java [798:940]


  public void afterPropertiesSet(final boolean propagate) {
    if (endpoint != null) {
			// Override the prefetch size. The dd2spring always sets this to 1 which 
			// may effect the throughput of a service. Change the prefetch size to
			// number of consumer threads defined in DD.
      if ( cc > 1 && endpoint.isTempReplyDestination() && connectionFactory instanceof ActiveMQConnectionFactory ) {
        ((ActiveMQConnectionFactory)connectionFactory).getPrefetchPolicy().setQueuePrefetch(cc);
      }
      
      // Endpoint has been plugged in from spring xml. This means this is a listener
      // for a reply queue. We need to rewire things a bit. First make Spring use
      // one thread to make sure we receive messages in order. To fix a race condition
      // where a parent CAS is processed first instead of its last child, we need to
      // assure that we get the child first. We need to update the counter of the
      // parent CAS to reflect that there is another child. In the race condition that
      // was observed, the parent was being processed first in one thread. The parent
      // reached the final step and subsequently was dropped. Subsequent to that, a
      // child CAS processed on another thread begun executing and failed since a look
      // on its parent resulted in CAS Not Found In Cache Exception.
      // Make sure Spring uses one thread
      super.setConcurrentConsumers(1);
      if (cc > 1) {
        try {
          String prefix = endpoint.getDelegateKey()+" Reply Thread";
          concurrentListener = new ConcurrentMessageListener(cc, ml, getDestinationName(), threadGroup,prefix);
          super.setMessageListener(concurrentListener);
        } catch (Exception e) {
          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
            if ( controller != null ) {
              UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
                      "afterPropertiesSet", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                      "UIMAEE_service_exception_WARNING", controller.getComponentName());
            }

            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
                    "afterPropertiesSet", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                    "UIMAJMS_exception__WARNING", e);
          }
          return;
        }
      } else {
        pluginThreadPool = true;
      }
    } else {
         super.setConcurrentConsumers(1);
    	//if ( targetedListener ) {
     	    //super.setConcurrentConsumers(1);
    	//} else {
        //    super.setConcurrentConsumers(cc);
    	//}
      pluginThreadPool = true;
    }
    Thread t = new Thread(threadGroup, new Runnable() {
      public void run() {
        Destination destination = __listenerRef.getDestination();
         try {
          // Wait until the connection factory is injected by Spring
          while (connectionFactory == null) {
            try {
              Thread.sleep(50);
            } catch (InterruptedException ex) {
            }
          }
          System.setProperty("BrokerURI", ((ActiveMQConnectionFactory) connectionFactory)
                  .getBrokerURL());
          boolean done = false;
          // Wait for controller to be injected by Uima AS
          if (isActiveMQDestination() && !isGetMetaListener()
                  && !((ActiveMQDestination) destination).isTemporary()) {
            // Add self to InputChannel
            connectWithInputChannel();
            // Wait for InputChannel to plug in a controller
            done = true;
            while (controller == null)
              try {
                Thread.sleep(50);
              } catch (InterruptedException ex) {
              }
            ;
          }
          // Plug in connection Factory to Spring's Listener
          __listenerRef.injectConnectionFactory();

          if ( pluginThreadPool ) {
            setUimaASThreadPoolExecutor(cc);
          }
          
          // Initialize the TaskExecutor. This call injects a custom Thread Pool into the
          // TaskExecutor provided in the spring xml. The custom thread pool initializes
          // an instance of AE in a dedicated thread
          if ( !isGetMetaListener()) {
            initializeTaskExecutor(cc);
          }
//          if ( threadPoolExecutor == null ) {
//              // Plug in TaskExecutor to Spring's Listener
//              __listenerRef.injectTaskExecutor();
//          }
          if ( propagate ) {
            // Notify Spring Listener that all properties are ready
            __listenerRef.allPropertiesSet();
          }
          if (isActiveMQDestination() && destination != null) {
            destinationName = ((ActiveMQDestination) destination).getPhysicalName();
          }
          if (!done) {
            connectWithInputChannel();
            done = true;
          }
          if (concurrentListener != null) {
            concurrentListener.setAnalysisEngineController(controller);
          }
          // Save number of concurrent consumers on the temp reply queue in case we need to
          // recreate a new listener on a new temp queue created during recovery
          if (endpoint != null && controller instanceof AggregateAnalysisEngineController) {
            Delegate delegate = ((AggregateAnalysisEngineController) controller)
                    .lookupDelegate(endpoint.getDelegateKey());
            if (delegate != null) {
              delegate.getEndpoint().setConcurrentReplyConsumers(cc);
            }
          }
          //  Show ready message on the console only if this listener is *not* listening
          //  on an input queue. Input queue listeners are not started until the service
          //  is fully initialized
          if (__listenerRef.getMessageListener() == null && getDestination() != null) {
            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, this.getClass().getName(),
                    "afterPropertiesSet", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                    "UIMAJMS_listener_ready__INFO",
                    new Object[] {controller.getComponentName(),  getBrokerUrl(), getDestination() });
          } 
        } catch (Exception e) {
         
          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
                  "afterPropertiesSet", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                  "UIMAJMS_jms_listener_failed_WARNING",
                  new Object[] { destination, getBrokerUrl(), e });
          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
                  "afterPropertiesSet", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                  "UIMAJMS_exception__WARNING", e);
        }
      }
    });
    t.start();
  }