in uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java [634:831]
private void handleProcessRequestWithCASReference(MessageContext aMessageContext)
throws AsynchAEException {
boolean isNewCAS = false;
String newCASProducedBy = null;
try {
// This is only used when handling CASes produced by CAS Multiplier
String inputCasReferenceId = null;
CAS cas = null;
CasStateEntry cse = null;
String casReferenceId = getCasReferenceId(aMessageContext);
if ((cse = getController().getLocalCache().lookupEntry(casReferenceId)) == null) {
// Create a new entry in the local cache for the CAS received from the remote
cse = getController().getLocalCache().createCasStateEntry(casReferenceId);
}
// Check if this Cas has been sent from a Cas Multiplier. If so, its sequence will be > 0
if (aMessageContext.propertyExists(AsynchAEMessage.CasSequence)) {
isNewCAS = true;
Endpoint casMultiplierEndpoint = aMessageContext.getEndpoint();
if (casMultiplierEndpoint == null) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
"handleProcessRequestWithCASReference",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_no_endpoint_for_reply__INFO",
new Object[] { casReferenceId });
}
return;
}
// Get the id of the parent Cas
inputCasReferenceId = aMessageContext
.getMessageStringProperty(AsynchAEMessage.InputCasReference);
if (cse.getInputCasReferenceId() == null) {
cse.setInputCasReferenceId(inputCasReferenceId);
}
if (getController() instanceof AggregateAnalysisEngineController) {
CasStateEntry parentCasEntry = getController().getLocalCache().lookupEntry(
inputCasReferenceId);
// Check if the parent CAS is in a failed state first
if (parentCasEntry != null && parentCasEntry.isFailed()) {
// handle CAS release
getController().process(null, casReferenceId);
return;
}
String delegateKey = ((AggregateAnalysisEngineController) getController())
.lookUpDelegateKey(aMessageContext.getEndpoint().getEndpoint());
Delegate delegate = ((AggregateAnalysisEngineController) getController())
.lookupDelegate(delegateKey);
cse.setLastDelegate(delegate);
newCASProducedBy = delegate.getKey();
casMultiplierEndpoint.setIsCasMultiplier(true);
try {
// Save the endpoint of the CM which produced the Cas
getController().getInProcessCache().setCasProducer(casReferenceId, newCASProducedBy);
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
if ( getController() != null ) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"handleProcessRequestWithCASReference", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_service_exception_WARNING", getController().getComponentName());
}
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"handleProcessRequestWithCASReference",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING",
e);
}
return;
}
// Safety check. The input Cas should not be null here
if (inputCasReferenceId != null) {
try {
Endpoint endp = null;
// Located the origin of the parent Cas. The produced Cas will inherit the origin from
// its parent.
// Once the origin is identified, save the origin using the produced Cas id as a key.
if (endp == null) {
boolean gotTheEndpoint = false;
String parentCasId = inputCasReferenceId;
// Loop through the parent tree until an origin is found
while (!gotTheEndpoint) {
// Check if the current parent has an associated origin
endp = ((AggregateAnalysisEngineController) getController())
.getMessageOrigin(parentCasId);
// Check if there is an origin. If so, we are done
if (endp != null) {
break;
}
// The current parent has no origin, get its parent and try again
CacheEntry entry = getController().getInProcessCache().getCacheEntryForCAS(
parentCasId);
parentCasId = entry.getInputCasReferenceId();
// Check if we reached the top of the hierarchy tree. If so, we have no origin.
// This should
// never be the case. Every Cas must have an origin
if (parentCasId == null) {
break;
}
}
}
// If origin not found log it as this indicates an error
if (endp == null) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
"handleProcessRequestWithCASReference",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_msg_origin_not_found__INFO",
new Object[] { getController().getComponentName(), inputCasReferenceId });
}
} else {
((AggregateAnalysisEngineController) getController()).addMessageOrigin(
casReferenceId, endp);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINEST,
CLASS_NAME.getName(),
"handleProcessRequestWithCASReference",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_msg_origin_added__FINEST",
new Object[] { getController().getComponentName(), casReferenceId,
newCASProducedBy });
}
}
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
if ( getController() != null ) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"handleProcessRequestWithCASReference", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_service_exception_WARNING", getController().getComponentName());
}
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
"handleProcessRequestWithCASReference",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", e);
}
}
} else {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.INFO,
CLASS_NAME.getName(),
"handleProcessRequestWithCASReference",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_input_cas_invalid__INFO",
new Object[] { getController().getComponentName(), newCASProducedBy,
casReferenceId });
}
}
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
"handleProcessRequestWithCASReference", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_new_cas__FINE", new Object[] { casReferenceId, newCASProducedBy });
}
aMessageContext.getEndpoint().setEndpoint(casMultiplierEndpoint.getEndpoint());
aMessageContext.getEndpoint().setServerURI(casMultiplierEndpoint.getServerURI());
} else {
if (getController() instanceof AggregateAnalysisEngineController) {
((AggregateAnalysisEngineController) getController()).addMessageOrigin(casReferenceId,
aMessageContext.getEndpoint());
}
}
cas = getController().getInProcessCache().getCasByReference(casReferenceId);
long arrivalTime = System.nanoTime();
getController().saveTime(arrivalTime, casReferenceId, getController().getName());// aMessageContext.getEndpointName());
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
"handleProcessRequestWithCASReference", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_analyzing_cas__FINE", new Object[] { casReferenceId });
}
// Save Process command in the client endpoint.
cacheProcessCommandInClientEndpoint();
if (getController().isStopped()) {
return;
}
if (isNewCAS) {
invokeProcess(cas, inputCasReferenceId, casReferenceId, aMessageContext, newCASProducedBy);
} else {
invokeProcess(cas, casReferenceId, null, aMessageContext, newCASProducedBy);
}
} catch (AsynchAEException e) {
throw e;
} catch (Exception e) {
throw new AsynchAEException(e);
}
}