public void run()

in uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/engine/CPMEngine.java [1826:2506]


  public void run() {
    Thread.currentThread().setName("CPMEngine Thread");

    boolean consumerCompleted = false;
    boolean isStarted = false; // Indicates if all threads have been started

    if (isKilled()) {
      return;
    }
    // Single-threaded mode is enabled in the CPE descriptor. In the CpeConfig element check for the
    // value of deployAs
    // <deployAs>single-threaded</deployAs>
    if (singleThreadedCPE) {
      try {
        runSingleThreaded();
        return;
      } catch (Throwable t) {
        killed = true;
        t.printStackTrace();
        UIMAFramework.getLogger(this.getClass()).logrb(Level.SEVERE, this.getClass().getName(),
                "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
                "UIMA_CPM_exception_in_single_threaded_cpm__SEVERE",
                new Object[] { Thread.currentThread().getName(), t.getMessage() });
        return;
      } finally {
        executorService.cleanup();
        executorService.shutdown();
      }
    }

    try {

      isRunning = true;
      if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
        UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
                "initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_starting_cpe__FINEST",
                new Object[] { Thread.currentThread().getName() });
      }

      // How many entities to get for each fetch from the CollectionReader. Use default, otherwise
      // retrieve and override from ColectionReader descriptor.
      readerFetchSize = 1;
      if (collectionReader.getProcessingResourceMetaData().getConfigurationParameterSettings()
              .getParameterValue("fetchSize") != null) {
        readerFetchSize = (Integer) collectionReader.getProcessingResourceMetaData()
                .getConfigurationParameterSettings().getParameterValue("fetchSize");
      }
      if (System.getProperty("DEBUG_CONTROL") != null) {
        startDebugControlThread();
      }
      // CAS[] casList = null;

      if (!mixedCasProcessorTypeSupport && collectionReader instanceof CollectionReader) {
        mixedCasProcessorTypeSupport = true;
      }

      // When the CPE is configured to run exclusively with CasDataProcessor type components (no
      // CasObjectProcessors)
      // there is no need to instantiate TCAS objects. These would never be used and woud waste
      // memory.
      if (mixedCasProcessorTypeSupport) {
        // Instantiate container for TCAS Instances

        try {
          // Register all type systems with the CAS Manager
          registerTypeSystemsWithCasManager();
          if (poolSize == 0) // Not set in the CpeDescriptor
          {
            poolSize = readerFetchSize * (inputQueueSize + outputQueueSize)
                    * cpeFactory.getProcessingUnitThreadCount() + 3;
            // This is a hack to limit # of CASes. In WF env where the WF Store decides the size of
            // readerFetchSize
            // we have a problem with memory. If the store decides to return 1000 entities we will
            // need a LOT of
            // memory to handle this. So for WF limit the pool size to something more reasonable
            if (poolSize > 100) {
              System.err.println(
                      "CPMEngine.run()-CAS PoolSize exceeds hard limit(100). Redefining size to 60.");
              UIMAFramework.getLogger(this.getClass()).logrb(Level.CONFIG,
                      this.getClass().getName(), "initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
                      "UIMA_CPM_redefine_pool_size__CONFIG",
                      new Object[] { Thread.currentThread().getName() });
              poolSize = 60; // Hard limit
            }
          }

          if (UIMAFramework.getLogger().isLoggable(Level.CONFIG)) {
            UIMAFramework.getLogger(this.getClass()).logrb(Level.CONFIG, this.getClass().getName(),
                    "initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
                    "UIMA_CPM_show_cas_pool_size__CONFIG",
                    new Object[] { Thread.currentThread().getName(), String.valueOf(poolSize) });
          }
          casPool = new CPECasPool(poolSize, cpeFactory.getResourceManager().getCasManager(),
                  mPerformanceTuningSettings);
          callTypeSystemInit();

        } catch (Exception e) {
          isRunning = false;
          killed = true;
          UIMAFramework.getLogger(this.getClass()).logrb(Level.SEVERE, this.getClass().getName(),
                  "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
                  "UIMA_CPM_cp_failed_to_start__SEVERE",
                  new Object[] { Thread.currentThread().getName(), e.getMessage() });

          UIMAFramework.getLogger(this.getClass()).log(Level.FINER, "", e);
          notifyListenersWithException(e);
          return;
        }
      }
      // Instantiate work queue. This queue is shared among all processing units.
      // The Producer thread fills this queue with CAS'es and processing units
      // retrieve these Cas'es for analysis.
      workQueue = new BoundedWorkQueue(poolSize, "Input Queue", this);

      // Instantiate output queue. The Cas'es containing result of analysis are deposited to
      // this queue, and the CasConsumer Processing Unit retrieves them.
      if (consumerList != null && consumerList.size() > 0) {
        outputQueue = createOutputQueue(poolSize);
      }

      if (UIMAFramework.getLogger().isLoggable(Level.CONFIG)) {
        UIMAFramework.getLogger(this.getClass()).logrb(Level.CONFIG, this.getClass().getName(),
                "initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_create_producer__CONFIG",
                new Object[] { Thread.currentThread().getName() });

      }
      // Producer is responsible for filling work queue with Cas'es. Runs in a seperate thread until
      // all entities are processed or the CPM stops.
      producer = new ArtifactProducer(this, casPool);
      try {
        // Plugin custom timer for measuring performance of the CollectionReader
        producer.setUimaTimer(getTimer());
      } catch (Exception e) {
        // Use default Timer. Ignore the exception
        producer.setUimaTimer(new JavaTimer());
      }
      // indicate how many entities to process
      producer.setNumEntitiesToProcess(numToProcess);
      producer.setCollectionReader(collectionReader);
      producer.setWorkQueue(workQueue);
      // producer.setOutputQueue(outputQueue);

      // collect stats in shared instance
      producer.setCPMStatTable(stats);

      //
      for (int j = 0; j < statusCbL.size(); j++) {
        BaseStatusCallbackListener statCL = (BaseStatusCallbackListener) statusCbL.get(j);
        if (statCL != null) {
          statCL.initializationComplete();
        }
      }

      // Just in case check if the CPM has the right state to start
      if (isKilled()) {
        return;
      }

      // Nov 2005, postpone starting the Producer Thread until all other threads are up.
      // This prevents a problem when the Producer Thread starts, grabs all CASes, fills the
      // input queue and there is an exception BEFORE Processing Units starts. This may lead
      // to a hang, because the CR is waiting on the CAS Pool and no-one consumes the Input Queue.
      // Name the thread

      // Create Cas Consumer Thread
      if (consumerList != null && consumerList.size() > 0) {
        // Create a CasConsumer Processing Unit if there is at least one CasConsumer configured in a
        // CPE descriptor
        casConsumerPU = new ProcessingUnit(this, outputQueue, null);

        casConsumerPU.setProcessingUnitProcessTrace(procTr);
        casConsumerPU.setContainers(consumerList);
        casConsumerPU.setCasPool(casPool);
        casConsumerPU.setReleaseCASFlag(true);
        casConsumerPU.setCasConsumerPipelineIdentity();
        // Add Callback Listeners
        for (int j = 0; j < statusCbL.size(); j++) {
          BaseStatusCallbackListener statCL = (BaseStatusCallbackListener) statusCbL.get(j);
          if (statCL != null) {
            casConsumerPU.addStatusCallbackListener(statCL);
          }
        }
        // Notify Callback Listeners when done processing entity
        casConsumerPU.setNotifyListeners(true);
        // Add custom timer
        try {
          casConsumerPU.setUimaTimer(getTimer());
        } catch (Exception e) {
          // Use default Timer
          casConsumerPU.setUimaTimer(new JavaTimer());
        }
        // name the thread
        casConsumerPU.setName("[CasConsumer Pipeline Thread]::");
        // start the CasConsumer Thread
        casConsumerPUResult = executorService.submit(casConsumerPU);
        consumerThreadStarted = true;
      }
      if (UIMAFramework.getLogger().isLoggable(Level.CONFIG)) {
        UIMAFramework.getLogger(this.getClass()).logrb(Level.CONFIG, this.getClass().getName(),
                "initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_create_pus__CONFIG",
                new Object[] { Thread.currentThread().getName(),
                    String.valueOf(workQueue.getCurrentSize()) });
      }

      // Adjust number of pipelines. Adjustment may be necessary in deployments using exclusive
      // service access. The adjustment is
      // based on number of available services that the CPM will connect to. If a static
      // configuration calls for 5 processing
      // pipelines but only three services are available (assuming exclusive access ), the CPM will
      // reduce number of processing
      // pipelines to 3.
      for (int indx = 0; indx < annotatorList.size(); indx++) {
        ProcessingContainer prContainer = (ProcessingContainer) annotatorList.get(indx);
        CasProcessorConfiguration configuration = prContainer.getCasProcessorConfiguration();

        if (configuration == null) {
          UIMAFramework.getLogger(this.getClass()).logrb(Level.SEVERE, this.getClass().getName(),
                  "initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
                  "UIMA_CPM_cp_configuration_not_defined__SEVERE",
                  new Object[] { Thread.currentThread().getName(), prContainer.getName() });
          return;
        }
        String serviceAccess = configuration.getDeploymentParameter("service-access");
        if (serviceAccess != null && serviceAccess.equalsIgnoreCase("exclusive")) {
          if (prContainer.getPool() != null) {
            int totalInstanceCount = prContainer.getPool().getSize();

            if (totalInstanceCount == 0) {
              UIMAFramework.getLogger(this.getClass()).logrb(Level.SEVERE,
                      this.getClass().getName(), "initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
                      "UIMA_CPM_no_proxies__SEVERE",
                      new Object[] { Thread.currentThread().getName(), prContainer.getName() });
              return;
            }
            if (totalInstanceCount < concurrentThreadCount) {
              concurrentThreadCount = totalInstanceCount; // override
              UIMAFramework.getLogger(this.getClass()).logrb(Level.CONFIG,
                      this.getClass().getName(), "initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
                      "UIMA_CPM_reduce_pipelines__CONFIG",
                      new Object[] { Thread.currentThread().getName(), prContainer.getName() });
            }
          }
        }
      }

      // Setup Processing Pipelines
      processingUnits = new ProcessingUnit[concurrentThreadCount];
      processingUnitResults = new Future<?>[concurrentThreadCount];
      synchronized (this) {
        activeProcessingUnits = concurrentThreadCount; // keeps track of how many threads are still
        // active. -Adam
      }

      // Capture the state of the pipelines. Initially the state is -1, meaning Not Started
      processingThreadsState = new int[concurrentThreadCount];
      for (int inx = 0; inx < concurrentThreadCount; inx++) {
        processingThreadsState[inx] = -1; // Not Started
      }

      // Configure Processing Pipelines, and start each running in a seperate thread
      for (int i = 0; i < concurrentThreadCount; i++) {
        // casList = new CAS[readerFetchSize];
        if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
          UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
                  "initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
                  "UIMA_CPM_initialize_pipeline__FINEST",
                  new Object[] { Thread.currentThread().getName(), String.valueOf(i) });
        }
        // Plug in custom ProcessingUnit via -DPROCESSING_PIPELINE_IMPL=class
        // Initialize Processing Pipeline with input and output queues
        if (System.getProperty("PROCESSING_PIPELINE_IMPL") != null) {
          String puClass = System.getProperty("PROCESSING_PIPELINE_IMPL");
          try {
            processingUnits[i] = producePU(puClass);
            processingUnits[i].setInputQueue(workQueue);
            processingUnits[i].setOutputQueue(outputQueue);
            processingUnits[i].setCPMEngine(this);
          } catch (Exception e) {
            UIMAFramework.getLogger(this.getClass()).log(Level.SEVERE, e.getMessage(), e);
            if (dbgCtrlThread != null) {
              dbgCtrlThread.stop();
            }
            return; // / DONE HERE !!!
          }
        } else {
          processingUnits[i] = new ProcessingUnit(this, workQueue, outputQueue);
        }
        // If there are no consumers in the pipeline, instruct the pipeline to release a CAS at the
        // end of processing
        if (consumerList == null || consumerList.size() == 0) {
          processingUnits[i].setReleaseCASFlag(true);
        }

        if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
          UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
                  "initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
                  "UIMA_CPM_pipeline_impl_class__FINEST", new Object[] {
                      Thread.currentThread().getName(), processingUnits[i].getClass().getName() });
        }
        // Add tracing instance so that performance and stats are globally aggregated for all
        // processing pipelines
        processingUnits[i].setProcessingUnitProcessTrace(procTr);
        // Add all annotators to the processing pipeline
        processingUnits[i].setContainers(annotatorList);
        // pass initialized list of cases to processing units in case cas conversion is required
        // between
        // CasData and CASObject based annotators.
        processingUnits[i].setCasPool(casPool);
        try {
          processingUnits[i].setUimaTimer(getTimer());
        } catch (Exception e) {
          processingUnits[i].setUimaTimer(new JavaTimer());
        }
        // Add Callback Listeners
        for (int j = 0; j < statusCbL.size(); j++) {
          BaseStatusCallbackListener statCL = (BaseStatusCallbackListener) statusCbL.get(j);
          if (statCL != null) {
            processingUnits[i].addStatusCallbackListener(statCL);
          }
        }

        // Start the Processing Unit thread
        processingUnits[i].setName("[Procesing Pipeline#" + (i + 1) + " Thread]::");

        // Start the Processing Pipeline
        processingUnitResults[i] = executorService.submit(processingUnits[i]);
        processingThreadsState[i] = 1; // Started
      }

      producer.setProcessTrace(procTr);
      // Start the ArtifactProducer thread and the Collection Reader embedded therein. The
      // Collection Reader begins
      // processing and deposits CASes onto a work queue.
      producerResult = executorService.submit(producer);
      readerThreadStarted = true;

      // Indicate that ALL threads making up the CPE have been started
      isStarted = true;

      if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
        UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
                "initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
                "UIMA_CPM_started_pipelines__FINEST",
                new Object[] { Thread.currentThread().getName() });
      }

      // ==============================================================================================
      // Now, wait for ALL CPE threads to finish. Join each thread created and wait for each to
      // finish.
      // ==============================================================================================

      if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
        UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
                "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_join_threads__FINEST",
                new Object[] { Thread.currentThread().getName() });
      }
      // Join the producer as it knows when to stop processing. When it is done, it
      // simply terminates the thread. Once it terminates lets just make sure that
      // all threads finish and the work queue is completely depleted and all entities
      // are processed
      producerResult.get();
      if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
        UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
                "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_cr_thread_completed__FINEST",
                new Object[] { Thread.currentThread().getName() });
      }

      // Join each of the Processing Threads and wait for them to finish
      for (int i = 0; i < concurrentThreadCount; i++) {
        if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
          UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
                  "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_join_pu__FINEST",
                  new Object[] { Thread.currentThread().getName(), processingUnits[i].getName(),
                      String.valueOf(i) });
        }
        processingUnitResults[i].get();
        if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
          UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
                  "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_join_pu_complete__FINEST",
                  new Object[] { Thread.currentThread().getName(), processingUnits[i].getName(),
                      String.valueOf(i) });
        }
      }

      // Join the Consumer Thread and wait for it to finish
      if (casConsumerPU != null) {

        try {
          // Throw in a EOF token onto an output queue to indicate end of processing. The consumer
          // will stop the processing upon receiving this token
          Object[] eofToken = new Object[1];
          // only need one member in the array
          eofToken[0] = new EOFToken();
          if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
            UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
                    "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
                    "UIMA_CPM_placed_eof_in_queue__FINEST",
                    new Object[] { Thread.currentThread().getName(), outputQueue.getName() });
          }
          outputQueue.enqueue(eofToken);
          if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
            UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
                    "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
                    "UIMA_CPM_done_placed_eof_in_queue__FINEST",
                    new Object[] { Thread.currentThread().getName(), outputQueue.getName() });
          }
          // synchronized (outputQueue) { // redundant, the above enqueue does this
          // outputQueue.notifyAll();
          // }
          if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
            UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
                    "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
                    "UIMA_CPM_done_notifying_queue__FINEST",
                    new Object[] { Thread.currentThread().getName(), outputQueue.getName() });
          }
        } catch (Exception e) {
          e.printStackTrace();
          UIMAFramework.getLogger(this.getClass()).logrb(Level.SEVERE, this.getClass().getName(),
                  "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
                  "UIMA_CPM_exception_adding_eof__SEVERE",
                  new Object[] { Thread.currentThread().getName(), e.getMessage() });
          notifyListenersWithException(e);
        }
        if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
          UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
                  "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_join_cc__FINEST",
                  new Object[] { Thread.currentThread().getName() });

        }

        casConsumerPUResult.get();
        if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
          UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
                  "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_join_cc_completed__FINEST",
                  new Object[] { Thread.currentThread().getName() });
        }
      }
      consumerCompleted = true;

      if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
        UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
                "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_cc_completed__FINEST",
                new Object[] { Thread.currentThread().getName(), workQueue.getName(),
                    String.valueOf(workQueue.getCurrentSize()) });
      }
      boolean empty = false;

      while (!empty && outputQueue != null) {
        if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
          UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
                  "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_pus_completed__FINEST",
                  new Object[] { Thread.currentThread().getName(), outputQueue.getName(),
                      String.valueOf(outputQueue.getCurrentSize()) });
        }
        synchronized (outputQueue) {
          if (outputQueue.getCurrentSize() == 0) {
            if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
              UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST,
                      this.getClass().getName(), "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
                      "UIMA_CPM_pus_completed__FINEST",
                      new Object[] { Thread.currentThread().getName(), outputQueue.getName(),
                          String.valueOf(outputQueue.getCurrentSize()) });
            }
            break;
          }
        }
        if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
          UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
                  "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_consuming_queue__FINEST",
                  new Object[] { Thread.currentThread().getName(), outputQueue.getName(),
                      String.valueOf(outputQueue.getCurrentSize()) });
        }

        if (casConsumerPU != null) {
          casConsumerPU.consumeQueue();
        }
      }

      if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
        UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
                "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_cleaning_up_pus__FINEST",
                new Object[] { Thread.currentThread().getName() });

      }

      // Terminate Annotators and cleanup resources
      for (int i = 0; i < processingUnits.length; i++) {
        if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
          UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
                  "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_stop_processors__FINEST",
                  new Object[] { Thread.currentThread().getName(), String.valueOf(i) });
        }
        processingUnits[i].stopCasProcessors(false);
      }
      if (casConsumerPU != null) {
        // Terminate CasConsumers and cleanup
        if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
          UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
                  "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_stop_ccs__FINEST",
                  new Object[] { Thread.currentThread().getName() });
        }
        casConsumerPU.stopCasProcessors(false);
      }

      if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
        UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
                "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_engine_stopped__FINEST",
                new Object[] { Thread.currentThread().getName() });
      }
      if (dbgCtrlThread != null) {
        dbgCtrlThread.stop();
      }

      isRunning = false;

    } catch (Exception e) {
      isRunning = false;
      killed = true;
      UIMAFramework.getLogger(this.getClass()).logrb(Level.FINER, this.getClass().getName(),
              "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_exception__FINER",
              new Object[] { Thread.currentThread().getName(), e.getMessage() });

      UIMAFramework.getLogger(this.getClass()).log(Level.FINER, "", e);
      notifyListenersWithException(e);
      // The CPE has not been started successfully. Perhaps only partially started. Meaning, that
      // some of its threads are started and some not. This may lead to a memory leak as not started
      // threads are never garbage collected. If this is the state of the CPE (!isStarted) go
      // through
      // a cleanup cycle checking each thread and starting those that have not been started. All
      // CPE threads in their run() method MUST check the state of the CPE by calling
      // cpe.isRunning()
      // as the first thing in their run() methods. If this query returns false, all threads should
      // return from run() without doing any work. But at least they will be garbage collected.

      if (!isStarted) {
        // Cleanup not started threads

        // First the ArtifactProducer Thread
        if (producer != null && !producer.isRunning()) {
          try {
            if (!readerThreadStarted) {
              executorService.submit(producer);
            }
            producerResult.get();
          } catch (Exception ex1) {
            UIMAFramework.getLogger(this.getClass()).logrb(Level.SEVERE, this.getClass().getName(),
                    "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_cr_exception__SEVERE",
                    new Object[] { Thread.currentThread().getName(), ex1.getMessage() });

            UIMAFramework.getLogger(this.getClass()).log(Level.SEVERE, "", ex1);
            notifyListenersWithException(ex1);
          }
        }
        // Cleanup CasConsumer
        if (casConsumerPU != null && !casConsumerPU.isRunning()) {
          try {
            if (!consumerThreadStarted) {
              executorService.submit(casConsumerPU);
            }
            casConsumerPUResult.get();
          } catch (Exception ex1) {
            UIMAFramework.getLogger(this.getClass()).logrb(Level.SEVERE, this.getClass().getName(),
                    "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_cc_exception__SEVERE",
                    new Object[] { Thread.currentThread().getName(), ex1.getMessage() });

            UIMAFramework.getLogger(this.getClass()).log(Level.SEVERE, "", ex1);
            notifyListenersWithException(ex1);
          }
        }

        try {
          // Place EOF Token onto work queue to force PUs shutdown
          forcePUShutdown();

          // Cleanup Processing Threads
          for (int i = 0; processingUnits != null && i < concurrentThreadCount; i++) {
            if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
              UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST,
                      this.getClass().getName(), "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
                      "UIMA_CPM_join_pu__FINEST", new Object[] { Thread.currentThread().getName(),
                          processingUnits[i].getName(), String.valueOf(i) });
            }
            if (processingUnits[i] != null) {
              // In case the processing thread was created BUT not started we need to
              // start it to make sure it is cleaned up by the ThreadGroup. Not started
              // threads hang around in the ThreadGroup despite the fact that are started.
              // The run() method is instrumented to immediately exit since the CPE is
              // not running. So the thread only starts for a brief moment and than stops.
              // This code is only executed in case where the thread is NOT started
              // In such a case 'processingThreadsState[i] = -1'

              if (processingThreadsState[i] == -1 && !processingUnits[i].isRunning()) {
                executorService.submit(processingUnits[i]);
              }
              try {
                processingUnitResults[i].get();
                if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
                  UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST,
                          this.getClass().getName(), "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
                          "UIMA_CPM_join_pu_complete__FINEST",
                          new Object[] { Thread.currentThread().getName(),
                              processingUnits[i].getName(), String.valueOf(i) });
                }
              } catch (Exception ex1) {
                UIMAFramework.getLogger(this.getClass()).logrb(Level.FINER,
                        this.getClass().getName(), "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
                        "UIMA_CPM_exception__FINER",
                        new Object[] { Thread.currentThread().getName(), ex1.getMessage() });
                UIMAFramework.getLogger(this.getClass()).log(Level.FINER, "", ex1);
                notifyListenersWithException(ex1);
              }

            }
          }
        } catch (Exception ex) {
          UIMAFramework.getLogger(this.getClass()).logrb(Level.FINER, this.getClass().getName(),
                  "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_exception__FINER",
                  new Object[] { Thread.currentThread().getName(), ex.getMessage() });
          UIMAFramework.getLogger(this.getClass()).log(Level.FINER, "", ex);
          notifyListenersWithException(ex);
        }
      }
    } finally {
      if (!consumerCompleted && casConsumerPU != null) {
        if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
          UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
                  "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_join_cc__FINEST",
                  new Object[] { Thread.currentThread().getName() });
        }

        try {
          Object[] eofToken = new Object[1];
          // only need one member in the array
          eofToken[0] = new EOFToken();
          if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {

            UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
                    "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
                    "UIMA_CPM_placed_eof_in_queue__FINEST",
                    new Object[] { Thread.currentThread().getName(), outputQueue.getName() });
          }
          outputQueue.enqueue(eofToken);
          if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
            UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
                    "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
                    "UIMA_CPM_done_placed_eof_in_queue__FINEST",
                    new Object[] { Thread.currentThread().getName(), outputQueue.getName() });
          }
          // synchronized (outputQueue) { // redundant - the above enqueue does this
          // outputQueue.notifyAll();
          // }
          if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
            UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
                    "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
                    "UIMA_CPM_done_notifying_queue__FINEST",
                    new Object[] { Thread.currentThread().getName(), outputQueue.getName() });
          }
        } catch (Exception e) {
          UIMAFramework.getLogger(this.getClass()).logrb(Level.SEVERE, this.getClass().getName(),
                  "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
                  "UIMA_CPM_exception_adding_eof__SEVERE",
                  new Object[] { Thread.currentThread().getName() });
          notifyListenersWithException(e);
        }
        try {
          casConsumerPUResult.get();
        } catch (InterruptedException e) {

        } catch (ExecutionException e) {
          e.printStackTrace();
        }
        if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
          UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
                  "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_cc_completed__FINEST",
                  new Object[] { Thread.currentThread().getName() });
        }
      }

      executorService.shutdown();
    }
  }