public void run()

in uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/engine/ArtifactProducer.java [658:893]


  public void run() {
    Thread.currentThread().setName("[CollectionReader Thread]::");

    boolean crEventCompleted = false; // this flag is used to mark the
    // ProcessTrace event
    if (!cpm.isRunning()) {
      UIMAFramework.getLogger(this.getClass()).logrb(Level.WARNING, this.getClass().getName(),
              "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_cpm_not_running__WARNING",
              new Object[] { Thread.currentThread().getName() });
      return;
    }

    Object[] casObjectList = null;
    // Check if
    if (endOfProcessingReached()) {
      if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
        UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
                "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_cr_done_producing__FINEST",
                new Object[] { Thread.currentThread().getName() });
      }
      placeEOFToken();
      if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
        UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
                "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_eof_marker_enqueued__FINEST",
                new Object[] { Thread.currentThread().getName() });
      }
      return;
    }
    isRunning = true;
    ProcessTrace localTrace = new ProcessTrace_impl(cpm.getPerformanceTuningSettings());
    while (cpm.isRunning()) {

      casList = null;
      casObjectList = null;
      synchronized (cpm.lockForPause) {
        if (cpm.isPaused()) {
          try {
            if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
              UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST,
                      this.getClass().getName(), "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
                      "UIMA_CPM_pausing_cr__FINEST",
                      new Object[] { Thread.currentThread().getName() });
            }
            // Wait until resumed
            cpm.lockForPause.wait();
          } catch (Exception e) {
          }
          if (!cpm.isRunning()) {
            break;
          }
          if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
            UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
                    "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_resume_cr__FINEST",
                    new Object[] { Thread.currentThread().getName() });
          }
        }
      }

      try {
        if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
          UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
                  "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_call_hasnext__FINEST",
                  new Object[] { Thread.currentThread().getName() });
        }
        threadState = 1004; // Entering hasNext()

        // start the CR event
        localTrace.startEvent(collectionReader.getProcessingResourceMetaData().getName(), "Process",
                "");
        crEventCompleted = false;
        if (collectionReader.hasNext()) {
          localTrace.endEvent(collectionReader.getProcessingResourceMetaData().getName(), "Process",
                  "success");
          crEventCompleted = true;

          if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
            UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
                    "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_get_cas_from_cr__FINEST",
                    new Object[] { Thread.currentThread().getName() });
          }
          casObjectList = readNext(readerFetchSize);
          if (casObjectList != null) {
            if (casObjectList instanceof CAS[]) {
              boolean releasedCas = false;
              for (int i = 0; i < casObjectList.length && casObjectList[i] != null; i++) {
                ChunkMetadata meta = CPMUtils.getChunkMetadata((CAS) casObjectList[i]);
                if (meta != null) {
                  if (timedoutDocs.containsKey(meta.getDocId())) {
                    notifyListeners(casList[i],
                            new ResourceProcessException(new SkipCasException(
                                    "Dropping CAS due chunk Timeout. Doc Id::" + meta.getDocId()
                                            + " Sequence:" + meta.getSequence())));

                    casPool.releaseCas((CAS) casObjectList[i]);
                    // synchronized (casPool) { // redundant, releaseCas call does this
                    // casPool.notifyAll();
                    // }
                    releasedCas = true;
                  }
                }
              }
              if (releasedCas) {
                continue;
              }
            }
            if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
              UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST,
                      this.getClass().getName(), "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
                      "UIMA_CPM_place_cas_in_queue__FINEST", new Object[] {
                          Thread.currentThread().getName(), String.valueOf(casObjectList.length) });
            }
            // Prevent processing of new CASes if the CPM has been
            // killed hard. Allow processing of CASes
            // while the CPM is in normal shutdown state.
            // (Moved this code inside if (casObjectList != null)
            // block to avoid NullPointerException. -Adam
            if (cpm.isRunning() || (!cpm.isRunning() && !cpm.isHardKilled())) {
              threadState = 1005; // Entering enqueue
              workQueue.enqueue(casObjectList);
              // synchronized (workQueue) { // redundant, enqueue does this
              // workQueue.notifyAll();
              // }
              threadState = 1006; // Done Entering enqueue
              entityCount += casObjectList.length;
              if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
                UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST,
                        this.getClass().getName(), "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
                        "UIMA_CPM_placed_cas_in_queue__FINEST",
                        new Object[] { Thread.currentThread().getName(),
                            String.valueOf(casObjectList.length) });
              }
            } else {
              break; // CPM has been killed
            }
          } else {
            if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
              UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST,
                      this.getClass().getName(), "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
                      "UIMA_CPM_terminate_cr_thread__FINEST",
                      new Object[] { Thread.currentThread().getName() });
            }
            break; // Null should not be returned from getNext
            // unless the CPM is in shutdown mode
          }
        } else {
          if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
            UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
                    "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_processed_all__FINEST",
                    new Object[] { Thread.currentThread().getName() });
          }
          // Stops the CPM and all of the running threads.
          // cpm.stopIt(); APL - don't stop, just terminate this
          // thread, which CPMEngine has joined on
          break;

        }
        // Check if the CollectionReader retrieved expected number of
        // entities
        if (endOfProcessingReached()) {
          threadState = 1010; // End of processing

          if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
            UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
                    "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
                    "UIMA_CPM_end_of_processing__FINEST",
                    new Object[] { Thread.currentThread().getName() });
          }
          break;
        }
      } catch (Exception e) {
        // The following conditional is true if hasNext() has failed
        if (!crEventCompleted) {
          localTrace.endEvent(collectionReader.getProcessingResourceMetaData().getName(), "Process",
                  "failure");
        }
        // e.printStackTrace();
        // changed from FINER to WARNING: https://issues.apache.org/jira/browse/UIMA-2440
        if (UIMAFramework.getLogger().isLoggable(Level.WARNING)) {
          UIMAFramework.getLogger(this.getClass()).logrb(Level.WARNING, this.getClass().getName(),
                  "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_exception__WARNING",
                  new Object[] { Thread.currentThread().getName(), e.getMessage() });

          UIMAFramework.getLogger(this.getClass()).log(Level.WARNING, e.getMessage(), e);
        }
        if (casList == null) {
          notifyListeners(null, e);
        } else {
          // Notify Listeners and release CAS's back to the cas pool.
          for (int i = 0; casList != null && i < casList.length; i++) {
            if (casList[i] != null) {
              notifyListeners(casList[i], e);
              casPool.releaseCas(casList[i]);
              casList[i] = null;
              // synchronized (casPool) { // redundant, releaseCas does this
              // casPool.notifyAll();
              // }

            } else {
              notifyListeners(null, e);
            }
            casList = null;
          }

        }
      } finally {
        // Clear all events
        synchronized (globalSharedProcessTrace) {
          globalSharedProcessTrace.aggregate(localTrace);
        }
        localTrace.clear();
      }
      if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
        UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
                "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
                "UIMA_CPM_show_cpm_running_status__FINEST",
                new Object[] { Thread.currentThread().getName(), String.valueOf(cpm.isRunning()) });
      }
    }

    if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
      UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
              "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
              "UIMA_CPM_show_cpm_running_status__FINEST",
              new Object[] { Thread.currentThread().getName(), String.valueOf(cpm.isRunning()) });
    }
    // Done with processing. Create a "special" EOF token and place it in
    // the queue.
    // Consumers of the queue must interpret this token as End Of File
    // event, meaning
    // end of processing. Such components must do appropriate cleanup and
    // terminate.
    placeEOFToken();
    isRunning = false;
    // Interrupt any waiting threads
    Thread.currentThread().interrupt();
  }