public synchronized Object dequeue()

in uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/engine/SequencedQueue.java [131:384]


  public synchronized Object dequeue() {
    // Check if there is anything in the queue
    if (numberElementsInQueue == 0) {
      if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
        UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
                "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_queue_empty__FINEST",
                new Object[] { Thread.currentThread().getName(), getName() });
      }
      return null;
    }
    Object anObject = null;
    int queueIndex = 0;
    int queueSize = queue.size();
    // Expected chunk sequence. This is relevant when the queue is in chunk mode.
    int chunkSequence = nextChunkMetadata.getSequence() + 1;

    if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
      UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
              "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
              "UIMA_CPM_expected_chunk_sequenece__FINEST", new Object[] {
                  Thread.currentThread().getName(), getName(), String.valueOf(chunkSequence) });
    }
    try {
      // This does not remove the object from the queue
      anObject = queue.get(queueIndex);
      if (anObject instanceof Object[] && ((Object[]) anObject)[0] instanceof EOFToken) {
        anObject = queue.remove(queueIndex);
        numberElementsInQueue--;
        notifyAll();
        if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
          UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
                  "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_got_eof_token__FINEST",
                  new Object[] { Thread.currentThread().getName(), getName() });
        }
        return anObject;
      }
      // Cycle through the queue until no more entries found
      while (queueIndex < queue.size()) {
        // get the next entry in the queue
        anObject = queue.get(queueIndex);
        if (anObject instanceof WorkUnit && ((WorkUnit) anObject).get() instanceof CAS[]) {
          // Create metadata from the CAS. This convenience object is used internally and keeps
          // track of the last chunks sequence processed here
          ChunkMetadata chunkMetadata = CPMUtils
                  .getChunkMetadata(((CAS[]) ((WorkUnit) anObject).get())[0]);
          // Chunking is not strictly required. In such cases the sequence metadata will not be in
          // the CAS and thus there is no ChunkMetaData
          if (chunkMetadata == null) {
            if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
              UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST,
                      this.getClass().getName(), "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
                      "UIMA_CPM_chunk_meta_is_null__FINEST",
                      new Object[] { Thread.currentThread().getName(), getName() });
            }
            break;
          }
          // Check if the current CAS is part of the chunk sequence that has already timedout. The
          // sequence times out if
          // the expected sequence is not received in a given time interval. The code uses a small
          // cache of timed out
          // sequences with a key being the document id. Each entry in the cache has an associated
          // timer thread. This thread
          // allows to limit the life of the entry in the cache so that it doesnt grow. When the
          // timer expires, it removes
          // associated entry (document id) from the cache. Timeouts are only meaningfull for
          // chunks. isOneOfMany() determines
          // if the current CAS is part of a larger chunk sequence.
          if (chunkMetadata.isOneOfMany() && sequenceTimedOut(chunkMetadata)) {
            if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
              UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST,
                      this.getClass().getName(), "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
                      "UIMA_CPM_sequence_timed_out__FINEST",
                      new Object[] { Thread.currentThread().getName(), getName(),
                          String.valueOf(chunkMetadata.getSequence()) });
            }
            return timedOutCas(queueIndex);
          }
          if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
            UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
                    "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_iscas__FINEST",
                    new Object[] { Thread.currentThread().getName(), getName() });
          }
          // The queue gets into a chunk state IFF the CAS is part of the larger document that has
          // been "chopped" up into smaller chunks.
          if (chunkState) {
            if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
              UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST,
                      this.getClass().getName(), "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
                      "UIMA_CPM_in_chunk_state__FINEST",
                      new Object[] { Thread.currentThread().getName(), getName(),
                          nextChunkMetadata.getDocId(), chunkMetadata.getDocId(),
                          String.valueOf(chunkSequence),
                          String.valueOf(chunkMetadata.getSequence()) });
            }
            // Is it the expected sequence?
            if (chunkMetadata.getSequence() == chunkSequence) {
              // Make sure to cross-reference with expected document id. This CAS could be part of a
              // different document!
              if (chunkSequence > 1
                      && !nextChunkMetadata.getDocId().equalsIgnoreCase(chunkMetadata.getDocId())) {
                // Sequence number is a match BUT this sequence belongs to another document. So skip
                // and get the next CAS from the queue
                queueIndex++;
                continue;
              }
              // The sequence is a match and the sequence is the last we should expect. Change to
              // non-chunkState and reinitialize
              if (chunkMetadata.isLast()) {
                if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
                  UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST,
                          this.getClass().getName(), "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
                          "UIMA_CPM_change_chunk_state__FINEST",
                          new Object[] { Thread.currentThread().getName(), getName() });
                }
                chunkState = false;
                nextChunkMetadata = new ChunkMetadata("", 0, false);
              } else {
                // The sequence is not the last one, so save the metadata. This metadata will be
                // used during the next call to this method.
                // With this metada we know what is the next expected sequence for the current
                // document.
                nextChunkMetadata = chunkMetadata;
              }
              break;
            }
          } else {
            // Currently NOT in a chunk state
            if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
              UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST,
                      this.getClass().getName(), "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
                      "UIMA_CPM_not_in_chunk_state__FINEST",
                      new Object[] { Thread.currentThread().getName(), getName() });
            }

            if (chunkMetadata.isOneOfMany()) // sequence > 0
            {
              if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
                UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST,
                        this.getClass().getName(), "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
                        "UIMA_CPM_begin_chunk_state__FINEST",
                        new Object[] { Thread.currentThread().getName(), getName() });
              }
              chunkState = true;
              if (chunkMetadata.getSequence() == chunkSequence) {
                if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
                  UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST,
                          this.getClass().getName(), "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
                          "UIMA_CPM_in_chunk_state__FINEST",
                          new Object[] { Thread.currentThread().getName(), getName(),
                              nextChunkMetadata.getDocId(), chunkMetadata.getDocId(),
                              String.valueOf(chunkSequence),
                              String.valueOf(chunkMetadata.getSequence()) });
                }

                if (sequenceTimedOut(chunkMetadata)) {
                  if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
                    UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST,
                            this.getClass().getName(), "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
                            "UIMA_CPM_sequence_timed_out__FINEST",
                            new Object[] { Thread.currentThread().getName(), getName(),
                                String.valueOf(chunkMetadata.getSequence()) });
                  }
                  return timedOutCas(queueIndex);
                }

                nextChunkMetadata = chunkMetadata;
                break;
              }
              // Entered chunkState, so we expect the CAS with the first sequence id. So far not
              // found, maybe the next iteration will
              // be successfull.
              nextChunkMetadata = new ChunkMetadata(chunkMetadata.getDocId(), 1, false);
            } else {
              // The CAS is not part of any sequence (its sequence# == 0 ).
              break;
            }
          }
        } else {
          if (anObject == null) {
            if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
              UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST,
                      this.getClass().getName(), "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
                      "UIMA_CPM_null_cas__FINEST",
                      new Object[] { Thread.currentThread().getName(), getName() });
            }
          } else {
            if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
              UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST,
                      this.getClass().getName(), "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
                      "UIMA_CPM_not_cas__FINEST", new Object[] { Thread.currentThread().getName(),
                          getName(), anObject.getClass().getName() });
            }
            break;
          }
          queueIndex++;
          break;
        }

        // Increment the queue pointer
        queueIndex++;
      }

    } catch (Exception e) {
      e.printStackTrace();
      if (UIMAFramework.getLogger().isLoggable(Level.SEVERE)) {
        UIMAFramework.getLogger(this.getClass()).log(Level.SEVERE, "", e);
      }
    }

    if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
      UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
              "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_done_scanning_q__FINEST",
              new Object[] { Thread.currentThread().getName(), getName() });
    }
    // We scanned the queue and the expected sequence chunk has not been found.
    if (queueIndex == queueSize) {
      if (chunkSequence > 0) {
        if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
          UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
                  "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
                  "UIMA_CPM_expecte_seq_not_found__FINEST", new Object[] {
                      Thread.currentThread().getName(), getName(), String.valueOf(queue.size()) });
        }
        // Reset expected sequence to the same number. The caller most likely will sleep for awhile
        // and retry. During the retry we need to
        // look for the same sequence we failed to find during this iteration.
        nextChunkMetadata = new ChunkMetadata(nextChunkMetadata.getDocId(), chunkSequence - 1,
                false);
      }
      if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
        UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
                "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
                "UIMA_CPM_expecte_seq_not_found__FINEST", new Object[] {
                    Thread.currentThread().getName(), getName(), String.valueOf(queue.size()) });
      }
      // Return null to indicate the expected CAS was not found. It is the responsibility of the
      // caller to wait and invoke this method again.
      return null;
    }

    // The expected sequence has been found. Remove the CAS from the queue and return it to the
    // caller.
    anObject = queue.remove(queueIndex);
    // Reduce # of objects in the queue
    numberElementsInQueue--;
    notifyAll();
    if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
      UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
              "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_show_queue_capacity__FINEST",
              new Object[] { Thread.currentThread().getName(), getName(), String.valueOf(queueSize),
                  String.valueOf(numberElementsInQueue) });
    }
    return anObject;
  }