in uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java [1057:1236]
public void process(CAS aCAS, String aCasReferenceId) {
boolean handlingDelayedStep = false;
// First check if there are outstanding steps to be called before consulting the Flow
// Controller.
// This could be the case if a previous step was a parallel step and it contained collocated
// delegates.
if (!isStopped()) {
if (abortGeneratingCASes(aCasReferenceId)) {
// Force delegate Cas Multipliers to Stop generating new CASes
super.stopCasMultipliers();
}
try {
CacheEntry entry = null;
if ( getInProcessCache().entryExists(aCasReferenceId) ) {
entry = getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
}
CasStateEntry casStateEntry = getLocalCache().lookupEntry(aCasReferenceId);
// Check if this CAS should be aborted due to previous error on this CAS or its
// parent. If this is the case the method will move the CAS to the final state
// where it will be dropped. If the CAS is an input CAS, it will be returned to
// the client with an exception
if (abortProcessingCas(casStateEntry, entry)) {
// This CAS was aborted, we are done here
return;
}
if ( entry == null ) {
throw new AsynchAEException("Cas Not Found In CasManager Cache. CasReferenceId::"
+ aCasReferenceId + " is Invalid");
}
// Check if this is an input CAS from the client. If not, check if last
// delegate handling this CAS was a Cas Multiplier configured to process
// parent CAS last
if (casStateEntry.getLastDelegate() != null) {
// Fetch the endpoint corresponding to the last Delegate handling the CAS
Endpoint lastDelegateEndpoint = casStateEntry.getLastDelegate().getEndpoint();
// Check if this delegate is a Cas Multiplier and the parent CAS is to be processed last
casStateEntry.setReplyReceived();
if (lastDelegateEndpoint.isCasMultiplier()){
// The following blocks until all child CASes acquire their Flow objects from the Flow
// Controller. Release the semaphore immediately after acquiring it. This semaphore is
// no longer needed. This synchronization is only necessary for blocking the parent
// CAS until all child CASes acquire their Flow objects.
casStateEntry.acquireFlowSemaphore();
casStateEntry.releaseFlowSemaphore();
if ( lastDelegateEndpoint.processParentLast()) {
synchronized (super.finalStepMux) {
// Determine if the CAS should be held until all its children leave this aggregate.
if (casStateEntry.getSubordinateCasInPlayCount() > 0) {
// This input CAS has child CASes still in play. It will remain in the cache
// until the last of the child CASes is released. Only than, the input CAS is
// is allowed to continue into the next step in the flow.
// The CAS has to be in final state
casStateEntry.setState(CacheEntry.FINAL_STATE);
// The input CAS will be interned until all children leave this aggregate
return;
}
}
}
}
}
// if we are here entry is not null. The above throws an exception if an entry is not
// found in the cache. First check if there is a delayedSingleStepList in the cache.
// If there is one, it means that a parallel step contained collocated delegate(s)
// The parallel step may only contain remote delegates. All collocated delegates
// were removed from the parallel step and added to the delayedSingleStepList in
// parallelStep() method.
List delayedSingleStepList = entry.getDelayedSingleStepList();
if (delayedSingleStepList != null && delayedSingleStepList.size() > 0) {
handlingDelayedStep = true;
// Reset number of parallel delegates back to one. This is done only if the previous step
// was a parallel step.
synchronized (parallelStepMux) {
if (casStateEntry.getNumberOfParallelDelegates() > 1) {
casStateEntry.setNumberOfParallelDelegates(1);
}
}
// Remove a delegate endpoint from the single step list cached in the CAS entry
Endpoint endpoint = (Endpoint_impl) entry.getDelayedSingleStepList().remove(0);
// send the CAS to a collocated delegate from the delayed single step list.
dispatchProcessRequest(entry, endpoint, true);
}
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_service_exception_WARNING", getComponentName());
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "process",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING",
e);
}
} finally {
// If just handled the delayed step, return as there is nothing else to do
if (handlingDelayedStep) {
return;
}
}
}
FlowContainer flow = null;
try {
if (aCasReferenceId != null) {
try {
// Check if a Flow object has been previously generated for the Cas.
if (flowMap.containsKey(aCasReferenceId)) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
"process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_retrieve_flow_object__FINEST",
new Object[] { getComponentName(), aCasReferenceId });
}
flow = (FlowContainer) flowMap.get(aCasReferenceId);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
"process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_retrieved_flow_object_ok__FINEST",
new Object[] { getComponentName(), aCasReferenceId });
}
} else {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
"process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_new_flow_object__FINEST", new Object[] { aCasReferenceId });
}
synchronized (flowControllerContainer) {
flow = flowControllerContainer.computeFlow(aCAS);
}
// Save the Flow Object in a cache. Flow exists in the cache
// until the CAS is fully processed or it is
// explicitly deleted when processing of this CAS cannot
// continue
flowMap.put(aCasReferenceId, flow);
// Check if the local cache already contains an entry for the Cas id.
// A colocated Cas Multiplier may have already registered this CAS
// in the parent's controller
if (localCache.lookupEntry(aCasReferenceId) == null) {
// Add this Cas Id to the local cache. Every input CAS goes through here
localCache.createCasStateEntry(aCasReferenceId);
}
}
} catch (Exception ex) {
// Any error here is automatic termination
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_service_exception_WARNING", getComponentName());
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_exception__WARNING", ex);
}
sendReplyWithShutdownException(aCasReferenceId);
handleAction(ErrorHandler.TERMINATE, null, null);
return;
}
if (!isStopped()) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
"process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_executing_step_input_cas__FINEST",
new Object[] { getComponentName(), aCasReferenceId });
}
// Execute a step in the flow. false means that this CAS has not
// been produced by CAS Multiplier
executeFlowStep(flow, aCasReferenceId, false);
} else {
synchronized (flowControllerContainer) {
flow.aborted();
}
}
}
} catch (Exception e) {
HashMap map = new HashMap();
map.put(AsynchAEMessage.Command, AsynchAEMessage.Process);
map.put(AsynchAEMessage.CasReference, aCasReferenceId);
handleError(map, e);
}
}