in uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/engine/ArtifactProducer.java [658:893]
public void run() {
Thread.currentThread().setName("[CollectionReader Thread]::");
boolean crEventCompleted = false; // this flag is used to mark the
// ProcessTrace event
if (!cpm.isRunning()) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.WARNING, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_cpm_not_running__WARNING",
new Object[] { Thread.currentThread().getName() });
return;
}
Object[] casObjectList = null;
// Check if
if (endOfProcessingReached()) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_cr_done_producing__FINEST",
new Object[] { Thread.currentThread().getName() });
}
placeEOFToken();
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_eof_marker_enqueued__FINEST",
new Object[] { Thread.currentThread().getName() });
}
return;
}
isRunning = true;
ProcessTrace localTrace = new ProcessTrace_impl(cpm.getPerformanceTuningSettings());
while (cpm.isRunning()) {
casList = null;
casObjectList = null;
synchronized (cpm.lockForPause) {
if (cpm.isPaused()) {
try {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST,
this.getClass().getName(), "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_pausing_cr__FINEST",
new Object[] { Thread.currentThread().getName() });
}
// Wait until resumed
cpm.lockForPause.wait();
} catch (Exception e) {
}
if (!cpm.isRunning()) {
break;
}
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_resume_cr__FINEST",
new Object[] { Thread.currentThread().getName() });
}
}
}
try {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_call_hasnext__FINEST",
new Object[] { Thread.currentThread().getName() });
}
threadState = 1004; // Entering hasNext()
// start the CR event
localTrace.startEvent(collectionReader.getProcessingResourceMetaData().getName(), "Process",
"");
crEventCompleted = false;
if (collectionReader.hasNext()) {
localTrace.endEvent(collectionReader.getProcessingResourceMetaData().getName(), "Process",
"success");
crEventCompleted = true;
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_get_cas_from_cr__FINEST",
new Object[] { Thread.currentThread().getName() });
}
casObjectList = readNext(readerFetchSize);
if (casObjectList != null) {
if (casObjectList instanceof CAS[]) {
boolean releasedCas = false;
for (int i = 0; i < casObjectList.length && casObjectList[i] != null; i++) {
ChunkMetadata meta = CPMUtils.getChunkMetadata((CAS) casObjectList[i]);
if (meta != null) {
if (timedoutDocs.containsKey(meta.getDocId())) {
notifyListeners(casList[i],
new ResourceProcessException(new SkipCasException(
"Dropping CAS due chunk Timeout. Doc Id::" + meta.getDocId()
+ " Sequence:" + meta.getSequence())));
casPool.releaseCas((CAS) casObjectList[i]);
// synchronized (casPool) { // redundant, releaseCas call does this
// casPool.notifyAll();
// }
releasedCas = true;
}
}
}
if (releasedCas) {
continue;
}
}
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST,
this.getClass().getName(), "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_place_cas_in_queue__FINEST", new Object[] {
Thread.currentThread().getName(), String.valueOf(casObjectList.length) });
}
// Prevent processing of new CASes if the CPM has been
// killed hard. Allow processing of CASes
// while the CPM is in normal shutdown state.
// (Moved this code inside if (casObjectList != null)
// block to avoid NullPointerException. -Adam
if (cpm.isRunning() || (!cpm.isRunning() && !cpm.isHardKilled())) {
threadState = 1005; // Entering enqueue
workQueue.enqueue(casObjectList);
// synchronized (workQueue) { // redundant, enqueue does this
// workQueue.notifyAll();
// }
threadState = 1006; // Done Entering enqueue
entityCount += casObjectList.length;
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST,
this.getClass().getName(), "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_placed_cas_in_queue__FINEST",
new Object[] { Thread.currentThread().getName(),
String.valueOf(casObjectList.length) });
}
} else {
break; // CPM has been killed
}
} else {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST,
this.getClass().getName(), "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_terminate_cr_thread__FINEST",
new Object[] { Thread.currentThread().getName() });
}
break; // Null should not be returned from getNext
// unless the CPM is in shutdown mode
}
} else {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_processed_all__FINEST",
new Object[] { Thread.currentThread().getName() });
}
// Stops the CPM and all of the running threads.
// cpm.stopIt(); APL - don't stop, just terminate this
// thread, which CPMEngine has joined on
break;
}
// Check if the CollectionReader retrieved expected number of
// entities
if (endOfProcessingReached()) {
threadState = 1010; // End of processing
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_end_of_processing__FINEST",
new Object[] { Thread.currentThread().getName() });
}
break;
}
} catch (Exception e) {
// The following conditional is true if hasNext() has failed
if (!crEventCompleted) {
localTrace.endEvent(collectionReader.getProcessingResourceMetaData().getName(), "Process",
"failure");
}
// e.printStackTrace();
// changed from FINER to WARNING: https://issues.apache.org/jira/browse/UIMA-2440
if (UIMAFramework.getLogger().isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.WARNING, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_exception__WARNING",
new Object[] { Thread.currentThread().getName(), e.getMessage() });
UIMAFramework.getLogger(this.getClass()).log(Level.WARNING, e.getMessage(), e);
}
if (casList == null) {
notifyListeners(null, e);
} else {
// Notify Listeners and release CAS's back to the cas pool.
for (int i = 0; casList != null && i < casList.length; i++) {
if (casList[i] != null) {
notifyListeners(casList[i], e);
casPool.releaseCas(casList[i]);
casList[i] = null;
// synchronized (casPool) { // redundant, releaseCas does this
// casPool.notifyAll();
// }
} else {
notifyListeners(null, e);
}
casList = null;
}
}
} finally {
// Clear all events
synchronized (globalSharedProcessTrace) {
globalSharedProcessTrace.aggregate(localTrace);
}
localTrace.clear();
}
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_show_cpm_running_status__FINEST",
new Object[] { Thread.currentThread().getName(), String.valueOf(cpm.isRunning()) });
}
}
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_show_cpm_running_status__FINEST",
new Object[] { Thread.currentThread().getName(), String.valueOf(cpm.isRunning()) });
}
// Done with processing. Create a "special" EOF token and place it in
// the queue.
// Consumers of the queue must interpret this token as End Of File
// event, meaning
// end of processing. Such components must do appropriate cleanup and
// terminate.
placeEOFToken();
isRunning = false;
// Interrupt any waiting threads
Thread.currentThread().interrupt();
}