in uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java [756:1305]
public void process(CAS aCAS, String aCasReferenceId, Endpoint anEndpoint) {
if (stopped) {
return;
}
List<AnalysisEnginePerformanceMetrics> beforeAnalysisManagementObjects = new ArrayList<AnalysisEnginePerformanceMetrics>();
List<AnalysisEnginePerformanceMetrics> afterAnalysisManagementObjects = new ArrayList<AnalysisEnginePerformanceMetrics>();
CasStateEntry parentCasStateEntry = null;
// If enabled, keep a reference to a timer which
// when it expires, will cause a JVM to dump a stack
StackDumpTimer stackDumpTimer = null;
try {
parentCasStateEntry = getLocalCache().lookupEntry(aCasReferenceId);
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_service_exception_WARNING", getComponentName());
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
"process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_exception__WARNING", e);
}
return;
}
long totalProcessTime = 0; // stored total time spent producing ALL CASes
boolean inputCASReturned = false;
boolean processingFailed = false;
// This is a primitive controller. No more processing is to be done on the Cas. Mark the
// destination as final and return CAS in reply.
anEndpoint.setFinal(true);
AnalysisEngine ae = null;
boolean clientUnreachable = false;
try {
// Checkout an instance of AE from the pool
ae = aeInstancePool.checkout();
// Get input CAS entry from the InProcess cache
long time = super.getCpuTime();
// Start the heap dump timer. This timer is only started if explicitly enabled
// via System property: -DheapDumpThreshold=<x> where is number of seconds the
// method is allowed to complete. If the method is not complete in allowed window
// the heap and stack trace dump of all threads will be produced.
stackDumpTimer = ifEnabledStartHeapDumpTimer();
AnalysisEngineManagement rootAem = ae.getManagementInterface();
if ( rootAem.getComponents().size() > 0 ) {
getLeafManagementObjects(rootAem, beforeAnalysisManagementObjects);
} else {
String path=produceUniqueName(rootAem);
beforeAnalysisManagementObjects.add(deepCopyMetrics(rootAem, path));
}
CasIterator casIterator = ae.processAndOutputNewCASes(aCAS);
if ( stackDumpTimer != null ) {
stackDumpTimer.cancel();
stackDumpTimer = null; // nullify timer instance so that we dont have to worry about
// it in case an exception happens below
}
// Store how long it took to call processAndOutputNewCASes()
totalProcessTime = (super.getCpuTime() - time);
long sequence = 1;
long hasNextTime = 0; // stores time in hasNext()
long getNextTime = 0; // stores time in next();
boolean moreCASesToProcess = true;
boolean casAbortedDueToExternalRequest = false;
while (moreCASesToProcess) {
long timeToProcessCAS = 0; // stores time in hasNext() and next() for each CAS
hasNextTime = super.getCpuTime();
// Start the heap dump timer. This timer is only started if explicitly enabled
// via System property: -DheapDumpThreshold=<x> where x is a number of seconds the
// method is allowed to complete. If the method is not complete in allowed window
// the heap and stack trace dump of all threads will be produced.
stackDumpTimer = ifEnabledStartHeapDumpTimer();
if (!casIterator.hasNext()) {
moreCASesToProcess = false;
// Measure how long it took to call hasNext()
timeToProcessCAS = (super.getCpuTime() - hasNextTime);
totalProcessTime += timeToProcessCAS;
if ( stackDumpTimer != null ) {
stackDumpTimer.cancel();
stackDumpTimer = null; // nullify timer instance so that we dont have to worry about
// it in case an exception happens below
}
break; // from while
}
if ( stackDumpTimer != null ) {
stackDumpTimer.cancel();
stackDumpTimer = null; // nullify timer instance so that we dont have to worry about
// it in case an exception happens below
}
// Measure how long it took to call hasNext()
timeToProcessCAS = (super.getCpuTime() - hasNextTime);
getNextTime = super.getCpuTime();
// Start the heap dump timer. This timer is only started if explicitly enabled
// via System property: -DheapDumpThreshold=<x> where is number of seconds the
// method is allowed to complete. If the method is not complete in allowed window
// the heap and stack trace dump of all threads will be produced.
stackDumpTimer = ifEnabledStartHeapDumpTimer();
CAS casProduced = casIterator.next();
if ( stackDumpTimer != null ) {
stackDumpTimer.cancel();
stackDumpTimer = null; // nullify timer instance so that we dont have to worry about
// it in case an exception happens below
}
// Add how long it took to call next()
timeToProcessCAS += (super.getCpuTime() - getNextTime);
// Add time to call hasNext() and next() to the running total
totalProcessTime += timeToProcessCAS;
casAbortedDueToExternalRequest = abortGeneratingCASes(aCasReferenceId);
// If the service is stopped or aborted, stop generating new CASes and just return the input
// CAS
if (stopped || casAbortedDueToExternalRequest) {
if (getInProcessCache() != null && getInProcessCache().getSize() > 0
&& getInProcessCache().entryExists(aCasReferenceId)) {
try {
// Set a flag on the input CAS to indicate that the processing was aborted
getInProcessCache().getCacheEntryForCAS(aCasReferenceId).setAborted(true);
} catch (Exception e) {
// An exception be be thrown here if the service is being stopped.
// The top level controller may have already cleaned up the cache
// and the getCacheEntryForCAS() will throw an exception. Ignore it
// here, we are shutting down.
} finally {
// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
// We are terminating the iterator here, release the internal CAS lock
// so that we can release the CAS. This approach may need to be changed
// as there may potentially be a problem with a Class Loader.
// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
((CASImpl) aCAS).enableReset(true);
try {
// We are either stopping the service or aborting input CAS due to explicit STOP
// request
// from a client. If a new CAS was produced, release it back to the pool.
if (casProduced != null) {
casProduced.release();
}
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.INFO,
getClass().getName(),
"process",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_cas_release_failed__INFO",
new Object[] { getComponentName(),
aCasReferenceId });
}
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.INFO,
getClass().getName(),
"process",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_stopped_producing_new_cases__INFO",
new Object[] { Thread.currentThread().getId(), getComponentName(),
aCasReferenceId });
}
}
}
if (casAbortedDueToExternalRequest) {
// The controller was told to stop generating new CASes. Just return the input CAS to
// the
// client
// throw new ResourceProcessException(new
// InterruptedException("Cas Multiplier:"+getComponentName()+" Aborted CAS:"+aCasReferenceId));
break; // break out of the cas producing loop and return an input CAS to the client
} else {
// The controller is stopping
return;
}
}
// OutOfTypeSystemData otsd = getInProcessCache().getOutOfTypeSystemData(aCasReferenceId);
MessageContext mContext = getInProcessCache()
.getMessageAccessorByReference(aCasReferenceId);
CacheEntry newEntry = getInProcessCache().register(casProduced, mContext /*, otsd*/);
// if this Cas Multiplier is not Top Level service, add new Cas Id to the private
// cache of the parent aggregate controller. The Aggregate needs to know about
// all CASes it has in play that were generated from the input CAS.
CasStateEntry childCasStateEntry = null;
if (!isTopLevelComponent()) {
newEntry.setNewCas(true, parentController.getComponentName());
// Create CAS state entry in the aggregate's local cache
childCasStateEntry = parentController.getLocalCache().createCasStateEntry(
newEntry.getCasReferenceId());
// Fetch the parent CAS state entry from the aggregate's local cache. We need to increment
// number of child CASes associated with it.
parentCasStateEntry = parentController.getLocalCache().lookupEntry(aCasReferenceId);
} else {
childCasStateEntry = getLocalCache().createCasStateEntry(newEntry.getCasReferenceId());
}
// Associate parent CAS (input CAS) with the new CAS.
childCasStateEntry.setInputCasReferenceId(aCasReferenceId);
// Increment number of child CASes generated from the input CAS
parentCasStateEntry.incrementSubordinateCasInPlayCount();
parentCasStateEntry.incrementOutstandingFlowCounter();
// Associate input CAS with the new CAS
newEntry.setInputCasReferenceId(aCasReferenceId);
newEntry.setCasSequence(sequence);
// Add to the cache how long it took to process the generated (subordinate) CAS
getCasStatistics(newEntry.getCasReferenceId()).incrementAnalysisTime(timeToProcessCAS);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINE,
getClass().getName(),
"process",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_produced_new_cas__FINE",
new Object[] { Thread.currentThread().getName(),
getUimaContextAdmin().getQualifiedContextName(),
newEntry.getCasReferenceId(), aCasReferenceId });
}
// Add the generated CAS to the outstanding CAS Map. Client notification will release
// this CAS back to its pool
synchronized (syncObject) {
if (isTopLevelComponent()) {
// Add the id of the generated CAS to the map holding outstanding CASes. This
// map will be referenced when a client sends Free CAS Notification. The map
// stores the id of the CAS both as a key and a value. Map is used to facilitate
// quick lookup
cmOutstandingCASes.put(newEntry.getCasReferenceId(), newEntry.getCasReferenceId());
}
// Increment number of CASes processed by this service
sequence++;
}
try {
CacheEntry cacheEntry =
getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
if ( isTopLevelComponent() && cacheEntry.isWarmUp() && "WarmupDelegate".equals(anEndpoint.getDelegateKey())) {
localCache.lookupEntry(newEntry.getCasReferenceId()).setDropped(true);
localCache.remove(newEntry.getCasReferenceId());
// Remove Stats from the global Map associated with the new CAS
// These stats for this CAS were added to the response message
// and are no longer needed
dropCasStatistics(newEntry.getCasReferenceId());
return;
}
} catch( Exception exx ) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
"process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_exception__WARNING", exx);
}
if (!anEndpoint.isRemote()) {
UimaTransport transport = getTransport(anEndpoint.getEndpoint());
UimaMessage message = transport.produceMessage(AsynchAEMessage.Process,
AsynchAEMessage.Request, getName());
message.addStringProperty(AsynchAEMessage.CasReference, newEntry.getCasReferenceId());
message.addStringProperty(AsynchAEMessage.InputCasReference, aCasReferenceId);
message.addLongProperty(AsynchAEMessage.CasSequence, sequence);
ServicePerformance casStats = getCasStatistics(aCasReferenceId);
message.addLongProperty(AsynchAEMessage.TimeToSerializeCAS, casStats
.getRawCasSerializationTime());
message.addLongProperty(AsynchAEMessage.TimeToDeserializeCAS, casStats
.getRawCasDeserializationTime());
message.addLongProperty(AsynchAEMessage.TimeInProcessCAS, casStats.getRawAnalysisTime());
long iT = getIdleTimeBetweenProcessCalls(AsynchAEMessage.Process);
message.addLongProperty(AsynchAEMessage.IdleTime, iT);
if (!stopped) {
transport.getUimaMessageDispatcher(anEndpoint.getEndpoint()).dispatch(message);
dropStats(newEntry.getCasReferenceId(), getName());
}
} else {
// Send generated CAS to the remote client
if (!stopped) {
getOutputChannel().sendReply(newEntry, anEndpoint);
// Check for delivery failure. The client may have terminated while an input CAS was being processed
if ( childCasStateEntry.deliveryToClientFailed() ) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "process",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_delivery_to_client_failed_INFO",
new Object[] { getComponentName(), aCasReferenceId });
}
clientUnreachable = true;
if ( cmOutstandingCASes.containsKey(childCasStateEntry.getCasReferenceId())) {
cmOutstandingCASes.remove(childCasStateEntry.getCasReferenceId());
}
// Stop generating new CASes. We failed to send a CAS to a client. Most likely
// the client has terminated.
moreCASesToProcess = false; // exit the while loop
dropCAS(childCasStateEntry.getCasReferenceId(), true);
}
}
}
// Remove new CAS state entry from the local cache if this is a top level primitive.
// If not top level, the client (an Aggregate) will remove this entry when this new
// generated CAS reaches Final State.
if (isTopLevelComponent()) {
try {
localCache.lookupEntry(newEntry.getCasReferenceId()).setDropped(true);
} catch (Exception e) {
}
localCache.remove(newEntry.getCasReferenceId());
}
// Remove Stats from the global Map associated with the new CAS
// These stats for this CAS were added to the response message
// and are no longer needed
dropCasStatistics(newEntry.getCasReferenceId());
} // while
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINEST,
getClass().getName(),
"process",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_completed_analysis__FINEST",
new Object[] { Thread.currentThread().getName(), getComponentName(),
aCasReferenceId, (double) (super.getCpuTime() - time) / (double) 1000000 });
}
// check if this is a warm up CAS. Such CAS is internally created with
// a main purpose of warming up analytics.
CacheEntry cacheEntry =
getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
if ( isTopLevelComponent() && cacheEntry.isWarmUp()) {
// if ( cacheEntry.getThreadCompletionSemaphore() != null ) {
// cacheEntry.getThreadCompletionSemaphore().release();
// }
// we are in the warm state which means the pipelines have been initialized
// and we are sending CASes to warm up/prime the analytics. Since we
// reached the end of the flow here, just return now. There is nothing
// else to do with the CAS.
return;
}
getMonitor().resetCountingStatistic("", Monitor.ProcessErrorCount);
// Set total number of children generated from this CAS
// Store total time spent processing this input CAS
getCasStatistics(aCasReferenceId).incrementAnalysisTime(totalProcessTime);
// Fetch AE's management information that includes per component performance stats
// These stats are internally maintained in a Map. If the AE is an aggregate
// the Map will contain AnalysisEngineManagement instance for each AE.
AnalysisEngineManagement aem = ae.getManagementInterface();
if ( aem.getComponents().size() > 0) {
// Flatten the hierarchy by recursively (if this AE is an aggregate) extracting
// primitive AE's AnalysisEngineManagement instance and placing it in
// afterAnalysisManagementObjects List.
getLeafManagementObjects(aem, afterAnalysisManagementObjects);
} else {
String path=produceUniqueName(aem);
afterAnalysisManagementObjects.add(deepCopyMetrics(aem, path));
}
// Create a List to hold per CAS analysisTime and total number of CASes processed
// by each AE. This list will be serialized and sent to the client
List<AnalysisEnginePerformanceMetrics> performanceList =
new ArrayList<AnalysisEnginePerformanceMetrics>();
// Diff the before process() performance metrics with post process performance
// metrics
for (AnalysisEnginePerformanceMetrics after : afterAnalysisManagementObjects) {
for( AnalysisEnginePerformanceMetrics before: beforeAnalysisManagementObjects) {
if ( before.getUniqueName().equals(after.getUniqueName())) {
boolean found = false;
AnalysisEnginePerformanceMetrics metrics = null;
for( AnalysisEnginePerformanceMetrics met : parentCasStateEntry.getAEPerformanceList() ) {
String un = after.getUniqueName();
if ( un.indexOf("Components") >= -1 ) {
un = un.substring(un.indexOf("/"));
}
if ( met.getUniqueName().equals(un)) {
long at = after.getAnalysisTime()- before.getAnalysisTime();
metrics = new AnalysisEnginePerformanceMetrics(after.getName(),
un,//after.getUniqueName(),
met.getAnalysisTime()+at,
after.getNumProcessed());
found = true;
parentCasStateEntry.getAEPerformanceList().remove(met);
break;
}
}
if ( !found ) {
String un = after.getUniqueName();
if ( un.indexOf("Components") >= -1 ) {
un = un.substring(un.indexOf("/"));
}
metrics = new AnalysisEnginePerformanceMetrics(after.getName(),
un,//after.getUniqueName(),
after.getAnalysisTime()- before.getAnalysisTime(),
after.getNumProcessed());
}
performanceList.add(metrics);
break;
}
}
}
parentCasStateEntry.getAEPerformanceList().addAll(performanceList);
if (!anEndpoint.isRemote()) {
inputCASReturned = true;
UimaTransport transport = getTransport(anEndpoint.getEndpoint());
if (getInProcessCache() != null && getInProcessCache().getSize() > 0
&& getInProcessCache().entryExists(aCasReferenceId)) {
try {
CacheEntry ancestor =
getInProcessCache().
getTopAncestorCasEntry(getInProcessCache().getCacheEntryForCAS(aCasReferenceId));
if ( ancestor != null ) {
ancestor.addDelegateMetrics(getKey(), performanceList);
}
} catch (Exception e) {
// An exception be be thrown here if the service is being stopped.
// The top level controller may have already cleaned up the cache
// and the getCacheEntryForCAS() will throw an exception. Ignore it
// here, we are shutting down.
}
}
UimaMessage message = transport.produceMessage(AsynchAEMessage.Process,
AsynchAEMessage.Response, getName());
message.addStringProperty(AsynchAEMessage.CasReference, aCasReferenceId);
ServicePerformance casStats = getCasStatistics(aCasReferenceId);
message.addLongProperty(AsynchAEMessage.TimeToSerializeCAS, casStats
.getRawCasSerializationTime());
message.addLongProperty(AsynchAEMessage.TimeToDeserializeCAS, casStats
.getRawCasDeserializationTime());
message.addLongProperty(AsynchAEMessage.TimeInProcessCAS, casStats.getRawAnalysisTime());
long iT = getIdleTimeBetweenProcessCalls(AsynchAEMessage.Process);
message.addLongProperty(AsynchAEMessage.IdleTime, iT);
// Send reply back to the client. Use internal (non-jms) transport
if (!stopped) {
transport.getUimaMessageDispatcher(anEndpoint.getEndpoint()).dispatch(message);
dropStats(aCasReferenceId, getName());
}
} else {
try {
List<AnalysisEnginePerformanceMetrics> perfMetrics =
new ArrayList<AnalysisEnginePerformanceMetrics>();
String aeName = getMetaData().getName();
CacheEntry entry =
getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
for( AnalysisEnginePerformanceMetrics m : performanceList ) {
// System.out.println("...............BEFORE: Name:"+m.getName()+" UniqueName:"+m.getUniqueName()+" How Many="+m.getNumProcessed());
boolean aggregate = m.getUniqueName().startsWith("/"+aeName);
int pos = m.getUniqueName().indexOf("/",1);
String uName = m.getUniqueName();
if ( pos > -1 && aeInstancePool.size() > 1 && aeName != null && aggregate) {
String st = m.getUniqueName().substring(pos);
uName = "/"+aeName+st;
}
AnalysisEnginePerformanceMetrics newMetrics =
new AnalysisEnginePerformanceMetrics(m.getName(),uName,m.getAnalysisTime(), m.getNumProcessed());
// System.out.println("... Metrics - AE:"+metrics.getUniqueName()+" AE Analysis Time:"+metrics.getAnalysisTime());
perfMetrics.add(newMetrics);
// System.out.println("...............AFTER: Name:"+newMetrics.getName()+" UniqueName:"+newMetrics.getUniqueName()+" How Many="+newMetrics.getNumProcessed());
}
entry.addDelegateMetrics(getKey(), perfMetrics); //performanceList);
} catch (Exception e) {
// An exception be be thrown here if the service is being stopped.
// The top level controller may have already cleaned up the cache
// and the getCacheEntryForCAS() will throw an exception. Ignore it
// here, we are shutting down.
}
if (!stopped && !clientUnreachable ) {
getOutputChannel().sendReply(getInProcessCache().getCacheEntryForCAS(aCasReferenceId), anEndpoint);
}
inputCASReturned = true;
}
// Remove input CAS state entry from the local cache
if (!isTopLevelComponent()) {
localCache.lookupEntry(aCasReferenceId).setDropped(true);
localCache.remove(aCasReferenceId);
}
} catch (Throwable e) {
if ( e instanceof OutOfMemoryError ) {
e.printStackTrace();
System.err.println("\n\n\n\t!!!!! UIMA AS Service Caught Java Error While in process() method. Exiting via System.exit(2)\n\n\n");
System.err.flush();
System.exit(2);
}
if ( stackDumpTimer != null ) {
stackDumpTimer.cancel();
}
processingFailed = true;
ErrorContext errorContext = new ErrorContext();
errorContext.add(AsynchAEMessage.CasReference, aCasReferenceId);
errorContext.add(AsynchAEMessage.Command, AsynchAEMessage.Process);
errorContext.add(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
errorContext.add(AsynchAEMessage.Endpoint, anEndpoint);
// Handle the exception. Pass reference to the PrimitiveController instance
getErrorHandlerChain().handle(e, errorContext, this);
} finally {
dropCasStatistics(aCasReferenceId);
if (ae != null) {
try {
aeInstancePool.checkin(ae);
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_service_exception_WARNING", getComponentName());
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
"process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_exception__WARNING", e);
}
}
}
// drop the CAS if it has been successfully processed. If there was a failure, the Error
// Handler
// will drop the CAS
if (isTopLevelComponent() && !processingFailed) {
// Release CASes produced from the input CAS if the input CAS has been aborted
if (abortGeneratingCASes(aCasReferenceId) || clientUnreachable ) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "process",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_remove_cache_entry__INFO",
new Object[] { getComponentName(), aCasReferenceId });
}
getInProcessCache().releaseCASesProducedFromInputCAS(aCasReferenceId);
} else if (inputCASReturned && isTopLevelComponent()) {
if ( clientUnreachable ) {
((CASImpl) aCAS).enableReset(true);
}
// Remove input CAS cache entry if the CAS has been sent to the client
dropCAS(aCasReferenceId, true);
localCache.dumpContents();
}
}
}
}