private void setUimaASThreadPoolExecutor()

in uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java [1338:1453]


  private void setUimaASThreadPoolExecutor(int consumerCount) throws Exception {
	  if ( isGetMetaListener() ) {
		    super.setMessageListener(ml);
	  } else if ( isFreeCasQueueListener()) {
		    super.setMessageListener(ml);
   	  } else if (endpoint != null && endpoint.isTempReplyDestination()) {
		    super.setMessageListener(ml);
	  } else {
		  if ( isProcessListener() ) { //controller != null && controller instanceof PrimitiveAnalysisEngineController ) {

		  // Singleton handler shared by Process CAS listener and a targeted listener. The handler
		  // onMessage() is called by Spring when a message with a matching selector is available.
		  // When onMessage() is called, it adds a message to the Priority Queue
		//  PriorityMessageHandler h = PriorityMessageHandler.getInstance();
			//	System.out.println("+++++++++++++ Controller:"+controller.getComponentName()+" Listener Scaleout="+cc+" Selector:"+super.getMessageSelector());

		  //super.setMessageListener(h);
		  // targeted listener should not have its own thread pool because it needs to use
		  // threads created by the Process Cas Listener. Each of these threads is pinned to
		  // a dedicated AE initialized at startup. Contract says that each AE process() will be called
		  // on the same thread that initialized it. The targeted listener and process listener share
		  // the same handler where CASes are pushed onto a Blocking Priority Queue for processing.
				if (!targetedListener && !isFreeCasQueueListener()) {
			//		System.out.println(">>>>>>>>>>> Controller:"+controller.getComponentName()+" Listener Scaleout="+cc+" Selector:"+super.getMessageSelector());
					PriorityMessageHandler h = new PriorityMessageHandler(cc);

					super.setMessageListener(h);
					try {
						while (controller.getInputChannel() == null) {
							synchronized (h) {
								h.wait(100);
							}
						}
					} catch (Exception e) {
						e.printStackTrace();
					}
					//if (isPrimitiveService()) {
						latchToCountNumberOfTerminatedThreads = new CountDownLatch(consumerCount);
						// Create a Custom Thread Factory. Provide it with an instance of
						// PrimitiveController so that every thread can call it to initialize
						// the next available instance of a AE.
						tf = new UimaAsPriorityBasedThreadFactory(threadGroup,
								controller, latchToCountNumberOfTerminatedThreads);
								//(PrimitiveAnalysisEngineController) controller, latchToCountNumberOfTerminatedThreads);
						((UimaAsPriorityBasedThreadFactory) tf).withQueue(h.getQueue())
								.withChannel(controller.getInputChannel());

						((UimaAsPriorityBasedThreadFactory) tf).setDaemon(true);
						if ( taskExecutor == null ) {  // true for aggregates
							taskExecutor = new ThreadPoolTaskExecutor();
						}
						// This ThreadExecutor will use custom thread factory instead of defult one
						((ThreadPoolTaskExecutor) taskExecutor).setThreadFactory(tf);
						// Initialize the thread pool
						((ThreadPoolTaskExecutor) taskExecutor).initialize();
						// Make sure all threads are started. This forces each thread to call
						// PrimitiveController to initialize the next instance of AE
						((ThreadPoolTaskExecutor) taskExecutor).getThreadPoolExecutor().prestartAllCoreThreads();
//					} else {
//						
//					}
			  }
		  } else {
			  if ( controller.getInputChannel() == null ) {
				  //System.out.println("............. Error - JmsInputChannel not set yet...");
			  } else if ( !(controller.getInputChannel() instanceof MessageListener ||
					  controller.getInputChannel() instanceof SessionAwareMessageListener) ) {
				  //System.out.println("............. Error - wrong MessageListener type - Getting this:"+controller.getInputChannel().getClass().getName());
			  }
			  super.setMessageListener(controller.getInputChannel());
		  }
	  }
   
    // create task executor with custom thread pool for:
    // 1) GetMeta request processing
    // 2) ReleaseCAS request
    if ( !targetedListener && taskExecutor == null ) {
      UimaAsThreadFactory utf = new UimaAsThreadFactory(threadGroup);
      utf.setDaemon(false);
//      tf.defineUsageAs(UsedFor.GetMetaHandling);//setForGetMetaHandling();
      if ( isFreeCasQueueListener()) {
        utf.setThreadNamePrefix(controller.getComponentName()+" - FreeCASRequest Thread");
      } else if ( isGetMetaListener()  ) {
        utf.setThreadNamePrefix(super.getBeanName()+" - Thread");
      } else if ( getDestination() != null && getMessageSelector() != null ) {
        utf.setThreadNamePrefix(controller.getComponentName() + " Process Thread");
      } else if ( endpoint != null && endpoint.isTempReplyDestination() ) {
        utf.setThreadNamePrefix(super.getBeanName()+" - Thread");
      } else { 
        throw new Exception("Unknown Context Detected in setUimaASThreadPoolExecutor()");
      }
      ExecutorService es = Executors.newFixedThreadPool(consumerCount,utf);
      if ( es instanceof ThreadPoolExecutor ) {
          threadPoolExecutor = (ThreadPoolExecutor)es;
          super.setTaskExecutor(es);
      }
    }
    /*
    else {
        UimaAsThreadFactory tf = new UimaAsThreadFactory(threadGroup);
        tf.setDaemon(true);
        if ( isFreeCasQueueListener()) {
          tf.setThreadNamePrefix(controller.getComponentName()+" - FreeCASRequest Thread");
        } else if ( isGetMetaListener()  ) {
          tf.setThreadNamePrefix(super.getBeanName()+" - Thread");
        } else if ( getDestination() != null && getMessageSelector() != null ) {
          tf.setThreadNamePrefix(controller.getComponentName() + " Process Thread");
        } else if ( endpoint != null && endpoint.isTempReplyDestination() ) {
          tf.setThreadNamePrefix(super.getBeanName()+" - Thread");
        } else { 
          throw new Exception("Unknown Context Detected in setUimaASThreadPoolExecutor()");
        }
        
    }
    */
  }