in uimaj-cpe/src/main/java/org/apache/uima/collection/impl/cpm/engine/CPMEngine.java [1826:2506]
public void run() {
Thread.currentThread().setName("CPMEngine Thread");
boolean consumerCompleted = false;
boolean isStarted = false; // Indicates if all threads have been started
if (isKilled()) {
return;
}
// Single-threaded mode is enabled in the CPE descriptor. In the CpeConfig element check for the
// value of deployAs
// <deployAs>single-threaded</deployAs>
if (singleThreadedCPE) {
try {
runSingleThreaded();
return;
} catch (Throwable t) {
killed = true;
t.printStackTrace();
UIMAFramework.getLogger(this.getClass()).logrb(Level.SEVERE, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_exception_in_single_threaded_cpm__SEVERE",
new Object[] { Thread.currentThread().getName(), t.getMessage() });
return;
} finally {
executorService.cleanup();
executorService.shutdown();
}
}
try {
isRunning = true;
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_starting_cpe__FINEST",
new Object[] { Thread.currentThread().getName() });
}
// How many entities to get for each fetch from the CollectionReader. Use default, otherwise
// retrieve and override from ColectionReader descriptor.
readerFetchSize = 1;
if (collectionReader.getProcessingResourceMetaData().getConfigurationParameterSettings()
.getParameterValue("fetchSize") != null) {
readerFetchSize = (Integer) collectionReader.getProcessingResourceMetaData()
.getConfigurationParameterSettings().getParameterValue("fetchSize");
}
if (System.getProperty("DEBUG_CONTROL") != null) {
startDebugControlThread();
}
// CAS[] casList = null;
if (mixedCasProcessorTypeSupport == false && collectionReader instanceof CollectionReader) {
mixedCasProcessorTypeSupport = true;
}
// When the CPE is configured to run exclusively with CasDataProcessor type components (no
// CasObjectProcessors)
// there is no need to instantiate TCAS objects. These would never be used and woud waste
// memory.
if (mixedCasProcessorTypeSupport) {
// Instantiate container for TCAS Instances
try {
// Register all type systems with the CAS Manager
registerTypeSystemsWithCasManager();
if (poolSize == 0) // Not set in the CpeDescriptor
{
poolSize = readerFetchSize * (inputQueueSize + outputQueueSize)
* cpeFactory.getProcessingUnitThreadCount() + 3;
// This is a hack to limit # of CASes. In WF env where the WF Store decides the size of
// readerFetchSize
// we have a problem with memory. If the store decides to return 1000 entities we will
// need a LOT of
// memory to handle this. So for WF limit the pool size to something more reasonable
if (poolSize > 100) {
System.err.println(
"CPMEngine.run()-CAS PoolSize exceeds hard limit(100). Redefining size to 60.");
UIMAFramework.getLogger(this.getClass()).logrb(Level.CONFIG,
this.getClass().getName(), "initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_redefine_pool_size__CONFIG",
new Object[] { Thread.currentThread().getName() });
poolSize = 60; // Hard limit
}
}
if (UIMAFramework.getLogger().isLoggable(Level.CONFIG)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.CONFIG, this.getClass().getName(),
"initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_show_cas_pool_size__CONFIG",
new Object[] { Thread.currentThread().getName(), String.valueOf(poolSize) });
}
casPool = new CPECasPool(poolSize, cpeFactory.getResourceManager().getCasManager(),
mPerformanceTuningSettings);
callTypeSystemInit();
} catch (Exception e) {
isRunning = false;
killed = true;
UIMAFramework.getLogger(this.getClass()).logrb(Level.SEVERE, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_cp_failed_to_start__SEVERE",
new Object[] { Thread.currentThread().getName(), e.getMessage() });
UIMAFramework.getLogger(this.getClass()).log(Level.FINER, "", e);
notifyListenersWithException(e);
return;
}
}
// Instantiate work queue. This queue is shared among all processing units.
// The Producer thread fills this queue with CAS'es and processing units
// retrieve these Cas'es for analysis.
workQueue = new BoundedWorkQueue(poolSize, "Input Queue", this);
// Instantiate output queue. The Cas'es containing result of analysis are deposited to
// this queue, and the CasConsumer Processing Unit retrieves them.
if (consumerList != null && consumerList.size() > 0) {
outputQueue = createOutputQueue(poolSize);
}
if (UIMAFramework.getLogger().isLoggable(Level.CONFIG)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.CONFIG, this.getClass().getName(),
"initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_create_producer__CONFIG",
new Object[] { Thread.currentThread().getName() });
}
// Producer is responsible for filling work queue with Cas'es. Runs in a seperate thread until
// all entities are processed or the CPM stops.
producer = new ArtifactProducer(this, casPool);
try {
// Plugin custom timer for measuring performance of the CollectionReader
producer.setUimaTimer(getTimer());
} catch (Exception e) {
// Use default Timer. Ignore the exception
producer.setUimaTimer(new JavaTimer());
}
// indicate how many entities to process
producer.setNumEntitiesToProcess(numToProcess);
producer.setCollectionReader(collectionReader);
producer.setWorkQueue(workQueue);
// producer.setOutputQueue(outputQueue);
// collect stats in shared instance
producer.setCPMStatTable(stats);
//
for (int j = 0; j < statusCbL.size(); j++) {
BaseStatusCallbackListener statCL = (BaseStatusCallbackListener) statusCbL.get(j);
if (statCL != null) {
statCL.initializationComplete();
}
}
// Just in case check if the CPM has the right state to start
if (isKilled()) {
return;
}
// Nov 2005, postpone starting the Producer Thread until all other threads are up.
// This prevents a problem when the Producer Thread starts, grabs all CASes, fills the
// input queue and there is an exception BEFORE Processing Units starts. This may lead
// to a hang, because the CR is waiting on the CAS Pool and no-one consumes the Input Queue.
// Name the thread
// Create Cas Consumer Thread
if (consumerList != null && consumerList.size() > 0) {
// Create a CasConsumer Processing Unit if there is at least one CasConsumer configured in a
// CPE descriptor
casConsumerPU = new ProcessingUnit(this, outputQueue, null);
casConsumerPU.setProcessingUnitProcessTrace(procTr);
casConsumerPU.setContainers(consumerList);
casConsumerPU.setCasPool(casPool);
casConsumerPU.setReleaseCASFlag(true);
casConsumerPU.setCasConsumerPipelineIdentity();
// Add Callback Listeners
for (int j = 0; j < statusCbL.size(); j++) {
BaseStatusCallbackListener statCL = (BaseStatusCallbackListener) statusCbL.get(j);
if (statCL != null) {
casConsumerPU.addStatusCallbackListener(statCL);
}
}
// Notify Callback Listeners when done processing entity
casConsumerPU.setNotifyListeners(true);
// Add custom timer
try {
casConsumerPU.setUimaTimer(getTimer());
} catch (Exception e) {
// Use default Timer
casConsumerPU.setUimaTimer(new JavaTimer());
}
// name the thread
casConsumerPU.setName("[CasConsumer Pipeline Thread]::");
// start the CasConsumer Thread
casConsumerPUResult = executorService.submit(casConsumerPU);
consumerThreadStarted = true;
}
if (UIMAFramework.getLogger().isLoggable(Level.CONFIG)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.CONFIG, this.getClass().getName(),
"initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_create_pus__CONFIG",
new Object[] { Thread.currentThread().getName(),
String.valueOf(workQueue.getCurrentSize()) });
}
// Adjust number of pipelines. Adjustment may be necessary in deployments using exclusive
// service access. The adjustment is
// based on number of available services that the CPM will connect to. If a static
// configuration calls for 5 processing
// pipelines but only three services are available (assuming exclusive access ), the CPM will
// reduce number of processing
// pipelines to 3.
for (int indx = 0; indx < annotatorList.size(); indx++) {
ProcessingContainer prContainer = (ProcessingContainer) annotatorList.get(indx);
CasProcessorConfiguration configuration = prContainer.getCasProcessorConfiguration();
if (configuration == null) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.SEVERE, this.getClass().getName(),
"initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_cp_configuration_not_defined__SEVERE",
new Object[] { Thread.currentThread().getName(), prContainer.getName() });
return;
}
String serviceAccess = configuration.getDeploymentParameter("service-access");
if (serviceAccess != null && serviceAccess.equalsIgnoreCase("exclusive")) {
if (prContainer.getPool() != null) {
int totalInstanceCount = prContainer.getPool().getSize();
if (totalInstanceCount == 0) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.SEVERE,
this.getClass().getName(), "initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_no_proxies__SEVERE",
new Object[] { Thread.currentThread().getName(), prContainer.getName() });
return;
}
if (totalInstanceCount < concurrentThreadCount) {
concurrentThreadCount = totalInstanceCount; // override
UIMAFramework.getLogger(this.getClass()).logrb(Level.CONFIG,
this.getClass().getName(), "initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_reduce_pipelines__CONFIG",
new Object[] { Thread.currentThread().getName(), prContainer.getName() });
}
}
}
}
// Setup Processing Pipelines
processingUnits = new ProcessingUnit[concurrentThreadCount];
processingUnitResults = new Future<?>[concurrentThreadCount];
synchronized (this) {
activeProcessingUnits = concurrentThreadCount; // keeps track of how many threads are still
// active. -Adam
}
// Capture the state of the pipelines. Initially the state is -1, meaning Not Started
processingThreadsState = new int[concurrentThreadCount];
for (int inx = 0; inx < concurrentThreadCount; inx++) {
processingThreadsState[inx] = -1; // Not Started
}
// Configure Processing Pipelines, and start each running in a seperate thread
for (int i = 0; i < concurrentThreadCount; i++) {
// casList = new CAS[readerFetchSize];
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_initialize_pipeline__FINEST",
new Object[] { Thread.currentThread().getName(), String.valueOf(i) });
}
// Plug in custom ProcessingUnit via -DPROCESSING_PIPELINE_IMPL=class
// Initialize Processing Pipeline with input and output queues
if (System.getProperty("PROCESSING_PIPELINE_IMPL") != null) {
String puClass = System.getProperty("PROCESSING_PIPELINE_IMPL");
try {
processingUnits[i] = producePU(puClass);
processingUnits[i].setInputQueue(workQueue);
processingUnits[i].setOutputQueue(outputQueue);
processingUnits[i].setCPMEngine(this);
} catch (Exception e) {
UIMAFramework.getLogger(this.getClass()).log(Level.SEVERE, e.getMessage(), e);
if (dbgCtrlThread != null) {
dbgCtrlThread.stop();
}
return; // / DONE HERE !!!
}
} else {
processingUnits[i] = new ProcessingUnit(this, workQueue, outputQueue);
}
// If there are no consumers in the pipeline, instruct the pipeline to release a CAS at the
// end of processing
if (consumerList == null || consumerList.size() == 0) {
processingUnits[i].setReleaseCASFlag(true);
}
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_pipeline_impl_class__FINEST", new Object[] {
Thread.currentThread().getName(), processingUnits[i].getClass().getName() });
}
// Add tracing instance so that performance and stats are globally aggregated for all
// processing pipelines
processingUnits[i].setProcessingUnitProcessTrace(procTr);
// Add all annotators to the processing pipeline
processingUnits[i].setContainers(annotatorList);
// pass initialized list of cases to processing units in case cas conversion is required
// between
// CasData and CASObject based annotators.
processingUnits[i].setCasPool(casPool);
try {
processingUnits[i].setUimaTimer(getTimer());
} catch (Exception e) {
processingUnits[i].setUimaTimer(new JavaTimer());
}
// Add Callback Listeners
for (int j = 0; j < statusCbL.size(); j++) {
BaseStatusCallbackListener statCL = (BaseStatusCallbackListener) statusCbL.get(j);
if (statCL != null) {
processingUnits[i].addStatusCallbackListener(statCL);
}
}
// Start the Processing Unit thread
processingUnits[i].setName("[Procesing Pipeline#" + (i + 1) + " Thread]::");
// Start the Processing Pipeline
processingUnitResults[i] = executorService.submit(processingUnits[i]);
processingThreadsState[i] = 1; // Started
}
producer.setProcessTrace(procTr);
// Start the ArtifactProducer thread and the Collection Reader embedded therein. The
// Collection Reader begins
// processing and deposits CASes onto a work queue.
producerResult = executorService.submit(producer);
readerThreadStarted = true;
// Indicate that ALL threads making up the CPE have been started
isStarted = true;
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"initialize", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_started_pipelines__FINEST",
new Object[] { Thread.currentThread().getName() });
}
// ==============================================================================================
// Now, wait for ALL CPE threads to finish. Join each thread created and wait for each to
// finish.
// ==============================================================================================
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_join_threads__FINEST",
new Object[] { Thread.currentThread().getName() });
}
// Join the producer as it knows when to stop processing. When it is done, it
// simply terminates the thread. Once it terminates lets just make sure that
// all threads finish and the work queue is completely depleted and all entities
// are processed
producerResult.get();
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_cr_thread_completed__FINEST",
new Object[] { Thread.currentThread().getName() });
}
// Join each of the Processing Threads and wait for them to finish
for (int i = 0; i < concurrentThreadCount; i++) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_join_pu__FINEST",
new Object[] { Thread.currentThread().getName(), processingUnits[i].getName(),
String.valueOf(i) });
}
processingUnitResults[i].get();
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_join_pu_complete__FINEST",
new Object[] { Thread.currentThread().getName(), processingUnits[i].getName(),
String.valueOf(i) });
}
}
// Join the Consumer Thread and wait for it to finish
if (casConsumerPU != null) {
try {
// Throw in a EOF token onto an output queue to indicate end of processing. The consumer
// will stop the processing upon receiving this token
Object[] eofToken = new Object[1];
// only need one member in the array
eofToken[0] = new EOFToken();
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_placed_eof_in_queue__FINEST",
new Object[] { Thread.currentThread().getName(), outputQueue.getName() });
}
outputQueue.enqueue(eofToken);
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_done_placed_eof_in_queue__FINEST",
new Object[] { Thread.currentThread().getName(), outputQueue.getName() });
}
// synchronized (outputQueue) { // redundant, the above enqueue does this
// outputQueue.notifyAll();
// }
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_done_notifying_queue__FINEST",
new Object[] { Thread.currentThread().getName(), outputQueue.getName() });
}
} catch (Exception e) {
e.printStackTrace();
UIMAFramework.getLogger(this.getClass()).logrb(Level.SEVERE, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_exception_adding_eof__SEVERE",
new Object[] { Thread.currentThread().getName(), e.getMessage() });
notifyListenersWithException(e);
}
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_join_cc__FINEST",
new Object[] { Thread.currentThread().getName() });
}
casConsumerPUResult.get();
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_join_cc_completed__FINEST",
new Object[] { Thread.currentThread().getName() });
}
}
consumerCompleted = true;
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_cc_completed__FINEST",
new Object[] { Thread.currentThread().getName(), workQueue.getName(),
String.valueOf(workQueue.getCurrentSize()) });
}
boolean empty = false;
while (!empty && outputQueue != null) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_pus_completed__FINEST",
new Object[] { Thread.currentThread().getName(), outputQueue.getName(),
String.valueOf(outputQueue.getCurrentSize()) });
}
synchronized (outputQueue) {
if (outputQueue.getCurrentSize() == 0) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST,
this.getClass().getName(), "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_pus_completed__FINEST",
new Object[] { Thread.currentThread().getName(), outputQueue.getName(),
String.valueOf(outputQueue.getCurrentSize()) });
}
break;
}
}
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_consuming_queue__FINEST",
new Object[] { Thread.currentThread().getName(), outputQueue.getName(),
String.valueOf(outputQueue.getCurrentSize()) });
}
if (casConsumerPU != null) {
casConsumerPU.consumeQueue();
}
}
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_cleaning_up_pus__FINEST",
new Object[] { Thread.currentThread().getName() });
}
// Terminate Annotators and cleanup resources
for (int i = 0; i < processingUnits.length; i++) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_stop_processors__FINEST",
new Object[] { Thread.currentThread().getName(), String.valueOf(i) });
}
processingUnits[i].stopCasProcessors(false);
}
if (casConsumerPU != null) {
// Terminate CasConsumers and cleanup
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_stop_ccs__FINEST",
new Object[] { Thread.currentThread().getName() });
}
casConsumerPU.stopCasProcessors(false);
}
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_engine_stopped__FINEST",
new Object[] { Thread.currentThread().getName() });
}
if (dbgCtrlThread != null) {
dbgCtrlThread.stop();
}
isRunning = false;
} catch (Exception e) {
isRunning = false;
killed = true;
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINER, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_exception__FINER",
new Object[] { Thread.currentThread().getName(), e.getMessage() });
UIMAFramework.getLogger(this.getClass()).log(Level.FINER, "", e);
notifyListenersWithException(e);
// The CPE has not been started successfully. Perhaps only partially started. Meaning, that
// some of its threads are started and some not. This may lead to a memory leak as not started
// threads are never garbage collected. If this is the state of the CPE (!isStarted) go
// through
// a cleanup cycle checking each thread and starting those that have not been started. All
// CPE threads in their run() method MUST check the state of the CPE by calling
// cpe.isRunning()
// as the first thing in their run() methods. If this query returns false, all threads should
// return from run() without doing any work. But at least they will be garbage collected.
if (!isStarted) {
// Cleanup not started threads
// First the ArtifactProducer Thread
if (producer != null && !producer.isRunning()) {
try {
if (!readerThreadStarted) {
executorService.submit(producer);
}
producerResult.get();
} catch (Exception ex1) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.SEVERE, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_cr_exception__SEVERE",
new Object[] { Thread.currentThread().getName(), ex1.getMessage() });
UIMAFramework.getLogger(this.getClass()).log(Level.SEVERE, "", ex1);
notifyListenersWithException(ex1);
}
}
// Cleanup CasConsumer
if (casConsumerPU != null && !casConsumerPU.isRunning()) {
try {
if (!consumerThreadStarted) {
executorService.submit(casConsumerPU);
}
casConsumerPUResult.get();
} catch (Exception ex1) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.SEVERE, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_cc_exception__SEVERE",
new Object[] { Thread.currentThread().getName(), ex1.getMessage() });
UIMAFramework.getLogger(this.getClass()).log(Level.SEVERE, "", ex1);
notifyListenersWithException(ex1);
}
}
try {
// Place EOF Token onto work queue to force PUs shutdown
forcePUShutdown();
// Cleanup Processing Threads
for (int i = 0; processingUnits != null && i < concurrentThreadCount; i++) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST,
this.getClass().getName(), "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_join_pu__FINEST", new Object[] { Thread.currentThread().getName(),
processingUnits[i].getName(), String.valueOf(i) });
}
if (processingUnits[i] != null) {
// In case the processing thread was created BUT not started we need to
// start it to make sure it is cleaned up by the ThreadGroup. Not started
// threads hang around in the ThreadGroup despite the fact that are started.
// The run() method is instrumented to immediately exit since the CPE is
// not running. So the thread only starts for a brief moment and than stops.
// This code is only executed in case where the thread is NOT started
// In such a case 'processingThreadsState[i] = -1'
if (processingThreadsState[i] == -1 && !processingUnits[i].isRunning()) {
executorService.submit(processingUnits[i]);
}
try {
processingUnitResults[i].get();
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST,
this.getClass().getName(), "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_join_pu_complete__FINEST",
new Object[] { Thread.currentThread().getName(),
processingUnits[i].getName(), String.valueOf(i) });
}
} catch (Exception ex1) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINER,
this.getClass().getName(), "process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_exception__FINER",
new Object[] { Thread.currentThread().getName(), ex1.getMessage() });
UIMAFramework.getLogger(this.getClass()).log(Level.FINER, "", ex1);
notifyListenersWithException(ex1);
}
}
}
} catch (Exception ex) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINER, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_exception__FINER",
new Object[] { Thread.currentThread().getName(), ex.getMessage() });
UIMAFramework.getLogger(this.getClass()).log(Level.FINER, "", ex);
notifyListenersWithException(ex);
}
}
} finally {
if (!consumerCompleted && casConsumerPU != null) {
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_join_cc__FINEST",
new Object[] { Thread.currentThread().getName() });
}
try {
Object[] eofToken = new Object[1];
// only need one member in the array
eofToken[0] = new EOFToken();
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_placed_eof_in_queue__FINEST",
new Object[] { Thread.currentThread().getName(), outputQueue.getName() });
}
outputQueue.enqueue(eofToken);
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_done_placed_eof_in_queue__FINEST",
new Object[] { Thread.currentThread().getName(), outputQueue.getName() });
}
// synchronized (outputQueue) { // redundant - the above enqueue does this
// outputQueue.notifyAll();
// }
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_done_notifying_queue__FINEST",
new Object[] { Thread.currentThread().getName(), outputQueue.getName() });
}
} catch (Exception e) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.SEVERE, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE,
"UIMA_CPM_exception_adding_eof__SEVERE",
new Object[] { Thread.currentThread().getName() });
notifyListenersWithException(e);
}
try {
casConsumerPUResult.get();
} catch (InterruptedException e) {
} catch (ExecutionException e) {
e.printStackTrace();
}
if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(this.getClass()).logrb(Level.FINEST, this.getClass().getName(),
"process", CPMUtils.CPM_LOG_RESOURCE_BUNDLE, "UIMA_CPM_cc_completed__FINEST",
new Object[] { Thread.currentThread().getName() });
}
}
executorService.shutdown();
}
}