in uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java [406:632]
private void handleProcessRequestFromRemoteClient(MessageContext aMessageContext)
throws AsynchAEException {
CacheEntry entry = null;
String casReferenceId = null;
// Check if there is a cargo in the message
if (aMessageContext.getMessageIntProperty(AsynchAEMessage.Payload) == AsynchAEMessage.XMIPayload
&& aMessageContext.getStringMessage() == null) {
return; // No XMI just return
}
CasStateEntry inputCasStateEntry = null;
try {
String newCASProducedBy = null;
// Get the CAS Reference Id of the input CAS
// Fetch id of the CAS from the message. If it doesnt exist the method will create an entry in
// the log file and return null
casReferenceId = getCasReferenceId(aMessageContext);
if (casReferenceId == null) {
return; // Invalid message. Nothing to do
}
// Initially make both equal
String inputCasReferenceId = casReferenceId;
// Destination where Free Cas Notification will be sent if the CAS came from a Cas Multiplier
Endpoint freeCasEndpoint = null;
// CASes generated by a Cas Multiplier will have a CasSequence property set.
if (aMessageContext.propertyExists(AsynchAEMessage.CasSequence)) {
// Fetch the name of the Cas Multiplier's input queue
// String cmEndpointName = aMessageContext.getEndpoint().getEndpoint();
String cmEndpointName = aMessageContext
.getMessageStringProperty(AsynchAEMessage.MessageFrom);
newCASProducedBy = ((AggregateAnalysisEngineController) getController())
.lookUpDelegateKey(cmEndpointName);
// Fetch an ID of the parent CAS
inputCasReferenceId = aMessageContext
.getMessageStringProperty(AsynchAEMessage.InputCasReference);
// Fetch Cache entry for the parent CAS
CacheEntry inputCasCacheEntry = getController().getInProcessCache().getCacheEntryForCAS(
inputCasReferenceId);
// Fetch an endpoint where Free CAS Notification must be sent.
// This endpoint is unique per CM instance. Meaning, each
// instance of CM will have an endpoint where it expects Free CAS
// notifications.
freeCasEndpoint = aMessageContext.getEndpoint();
// Clone an endpoint where Free Cas Request will be sent
freeCasEndpoint = (Endpoint) ((Endpoint_impl) freeCasEndpoint).clone();
if (getController() instanceof AggregateAnalysisEngineController) {
inputCasStateEntry = ((AggregateAnalysisEngineController) getController())
.getLocalCache().lookupEntry(inputCasReferenceId);
// Associate Free Cas Notification Endpoint with an input Cas
inputCasStateEntry.setFreeCasNotificationEndpoint(freeCasEndpoint);
}
computeStats(aMessageContext, inputCasReferenceId);
// Reset the destination
aMessageContext.getEndpoint().setDestination(null);
// This CAS came in from a CAS Multiplier. Treat it differently than the
// input CAS. In case the Aggregate needs to send this CAS to the
// client, retrieve the client destination by looking up the client endpoint
// using input CAS reference id. CASes generated by the CAS multiplier will have
// the same Cas Reference id.
Endpoint replyToEndpoint = inputCasCacheEntry.getMessageOrigin();
// The message context contains a Cas Multiplier endpoint. Since
// we dont want to send a generated CAS back to the CM, override
// with an endpoint provided by the client of
// this service. Client endpoint is attached to an input Cas cache entry.
if ( replyToEndpoint != null ) {
aMessageContext.getEndpoint().setEndpoint(replyToEndpoint.getEndpoint());
aMessageContext.getEndpoint().setServerURI(replyToEndpoint.getServerURI());
}
// Before sending a CAS to Cas Multiplier, the aggregate has
// saved the CM key in the CAS cache entry. Fetch the key
// of the CM so that we can ask the right Shadow Cas Pool for
// a new CAS. Every Shadow Cas Pool has a unique id which
// corresponds to a Cas Multiplier key.
// newCASProducedBy = inputCasCacheEntry.getCasMultiplierKey();
if (getController() instanceof AggregateAnalysisEngineController) {
Endpoint casMultiplierEndpoint = ((AggregateAnalysisEngineController) getController())
.lookUpEndpoint(newCASProducedBy, false);
if (casMultiplierEndpoint != null) {
// Save the URL of the broker managing the Free Cas Notification queue.
// This is needed when we try to establish a connection to the broker.
freeCasEndpoint.setServerURI(casMultiplierEndpoint.getServerURI());
}
}
} else if (getController().isTopLevelComponent()) {
if (getController() instanceof AggregateAnalysisEngineController) {
((AggregateAnalysisEngineController) getController()).addMessageOrigin(casReferenceId,
aMessageContext.getEndpoint());
}
}
// To prevent processing multiple messages with the same CasReferenceId, check the CAS cache
// to see if the message with a given CasReferenceId is already being processed. It is, the
// message contains the same request possibly issued by the caller due to timeout. Also this
// mechanism helps with dealing with scenario when this service is not up when the client
// sends
// request. The client can keep re-sending the same request until its timeout thresholds are
// exceeded. By that time, there may be multiple messages in this service queue with the same
// CasReferenceId. When the service finally comes back up, it will have multiple messages in
// its queue possibly from the same client. Only the first message for any given
// CasReferenceId
// should be processed.
CasStateEntry cse = null;
if (!getController().getInProcessCache().entryExists(casReferenceId)) {
if (getController().getLocalCache().lookupEntry(casReferenceId) == null) {
// Create a new entry in the local cache for the CAS received from the remote
cse = getController().getLocalCache().createCasStateEntry(casReferenceId);
// Check if this CAS is a child
if (aMessageContext.propertyExists(AsynchAEMessage.CasSequence)) {
cse.setInputCasReferenceId(inputCasReferenceId);
}
} else {
cse = getController().getLocalCache().lookupEntry(casReferenceId);
}
if (getController() instanceof AggregateAnalysisEngineController
&& aMessageContext.propertyExists(AsynchAEMessage.CasSequence)) {
String delegateInputQueueName = aMessageContext
.getMessageStringProperty(AsynchAEMessage.MessageFrom);
String delegateKey = ((AggregateAnalysisEngineController) getController())
.lookUpDelegateKey(delegateInputQueueName); // aMessageContext.getEndpoint().getEndpoint());
if (delegateKey != null) {
Delegate delegate = ((AggregateAnalysisEngineController) getController())
.lookupDelegate(delegateKey);
// Save the last delegate handling this CAS
cse.setLastDelegate(delegate);
// If there is one thread receiving messages from Cas Multiplier increment number of
// child Cases
// of the parent CAS. If there are more threads (consumers) a special object
// ConcurrentMessageListener
// has already incremented the count. This special object enforces order of processing
// for CASes
// coming in from the Cas Multiplier.
if (!delegate.hasConcurrentConsumersOnReplyQueue()) {
inputCasStateEntry.incrementSubordinateCasInPlayCount();
}
}
}
try {
entry = deserializeCASandRegisterWithCache(casReferenceId, freeCasEndpoint,
newCASProducedBy, aMessageContext);
} catch( Exception ex) {
// Mark this CAS as failed.
if ( cse != null ) {
cse.setFailed();
}
throw ex;
}
if (getController().isStopped() || entry == null || entry.getCas() == null) {
if (entry != null) {
// The Controller is in shutdown state, release the CAS
getController().dropCAS(entry.getCasReferenceId(), true);
entry = null;
}
return;
}
// *****************************************************************
// Process the CAS
// *****************************************************************
invokeProcess(entry.getCas(), inputCasReferenceId, casReferenceId, aMessageContext,
newCASProducedBy);
/**
* Below comments apply to UIMA AS aggregate only.
* CAS has been handed off to a delegate. Now block the receiving thread until
* the CAS is processed or there is a timeout or error. Fetch this thread's ThreadLocal
* semaphore to block the thread. It will be unblocked when the aggregate is done with
* the CAS.
*/
if (!getController().isPrimitive() &&
cse != null && !cse.isSubordinate() ) {
try {
synchronized(lock) {
while( !getController().isStopped()) {
if ( entry.getThreadCompletionSemaphore() != null) {
boolean gotIt = entry.getThreadCompletionSemaphore().tryAcquire(500, TimeUnit.MILLISECONDS);
if ( gotIt ) {
break;
}
} else {
break;
}
}
}
} catch( InterruptedException ex) {
}
}
} else {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
"handleProcessRequestFromRemoteClient", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_duplicate_request__INFO", new Object[] { casReferenceId });
}
}
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
if ( getController() != null ) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"handleProcessRequestFromRemoteClient", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_service_exception_WARNING", getController().getComponentName());
}
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
"handleProcessRequestFromRemoteClient", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_exception__WARNING", e);
}
ErrorContext errorContext = new ErrorContext();
errorContext.add(AsynchAEMessage.Endpoint, aMessageContext.getEndpoint());
errorContext.add(AsynchAEMessage.Command, AsynchAEMessage.Process);
errorContext.add(AsynchAEMessage.CasReference, casReferenceId);
if (entry != null) {
getController().dropCAS(entry.getCas());
}
getController().getErrorHandlerChain().handle(e, errorContext, getController());
}
}