in uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java [1100:1234]
protected void handleMetadataReply(Message message) throws Exception {
serviceDelegate.cancelDelegateGetMetaTimer();
serviceDelegate.setState(Delegate.OK_STATE);
// check if the reply msg contains replyTo destination. It will be
// added by the Cas Multiplier to the getMeta reply
if (message.getJMSReplyTo() != null) {
serviceDelegate.setFreeCasDestination(message.getJMSReplyTo());
}
// Check if this is a reply for a Ping sent in response to a timeout
if (serviceDelegate.isAwaitingPingReply()) {
System.out.println("------------------------ Client Received GetMeta Ping Reply");
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.INFO,
CLASS_NAME.getName(),
"handleMetadataReply",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_rcvd_ping_reply__INFO",
new Object[] {
message.getStringProperty(AsynchAEMessage.MessageFrom),
message.getStringProperty(AsynchAEMessage.ServerIP)});
}
// reset the state of the service. The client received its ping reply
serviceDelegate.resetAwaitingPingReply();
String casReferenceId = null;
if (serviceDelegate.getCasPendingReplyListSize() > 0 || serviceDelegate.getCasPendingDispatchListSize() > 0) {
serviceDelegate.restartTimerForOldestCasInOutstandingList();
// We got a reply to GetMeta ping. Send all CASes that have been
// placed on the pending dispatch queue to a service.
while( serviceDelegate.getState()==Delegate.OK_STATE && (casReferenceId = serviceDelegate.removeOldestFromPendingDispatchList()) != null ) {
ClientRequest cachedRequest = (ClientRequest) clientCache.get(casReferenceId);
if (cachedRequest != null) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.INFO,
CLASS_NAME.getName(),
"handleMetadataReply",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_dispatch_delayed_cas__INFO",
new Object[] { casReferenceId, String.valueOf(cachedRequest.cas.hashCode())});
}
sendCAS(cachedRequest.getCAS(), cachedRequest,null);
}
}
} else {
ProcessTrace pt = new ProcessTrace_impl();
UimaASProcessStatusImpl status = new UimaASProcessStatusImpl(pt);
notifyListeners(null, status, AsynchAEMessage.GetMeta);
}
// Handled Ping reply
return;
}
int payload = ((Integer) message.getIntProperty(AsynchAEMessage.Payload)).intValue();
removeFromCache(uniqueIdentifier);
try {
if (AsynchAEMessage.Exception == payload) {
ProcessTrace pt = new ProcessTrace_impl();
UimaASProcessStatusImpl status = new UimaASProcessStatusImpl(pt);
Exception exception = retrieveExceptionFromMessage(message);
clientSideJmxStats.incrementMetaErrorCount();
status.addEventStatus("GetMeta", "Failed", exception);
notifyListeners(null, status, AsynchAEMessage.GetMeta);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.INFO,
CLASS_NAME.getName(),
"handleMetadataReply",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_received_exception_msg_INFO",
new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom),
getBrokerURI(),
message.getStringProperty(AsynchAEMessage.CasReference), exception });
}
abort = true;
initialized = false;
} else {
// Check serialization supported by the service against client configuration.
// If the client is configured to use Binary serialization *but* the service
// doesnt support it, change the client serialization to xmi. Old services will
// not return in a reply the type of serialization supported which implies "xmi".
// New services *always* return "binary" or "compressedBinaryXXX"
// as a default serialization. The client
// however may still want to serialize messages using xmi though.
if (!message.propertyExists(AsynchAEMessage.SERIALIZATION)) {
// Dealing with an old service here, check if there is a mismatch with the
// client configuration. If the client is configured with binary serialization
// override this and change serialization to "xmi".
if (getSerialFormat() != SerialFormat.XMI) {
// Override configured serialization
setSerialFormat(SerialFormat.XMI);
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"handleMetadataReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_client_serialization_ovveride__WARNING", new Object[] {});
}
} else {
final int c = message.getIntProperty(AsynchAEMessage.SERIALIZATION);
if (getSerialFormat() != SerialFormat.XMI) {
// don't override if XMI - because the remote may have different type system
setSerialFormat((c == AsynchAEMessage.XMI_SERIALIZATION) ? SerialFormat.XMI :
(c == AsynchAEMessage.BINARY_SERIALIZATION) ? SerialFormat.BINARY :
SerialFormat.COMPRESSED_FILTERED);
}
}
String meta = ((TextMessage) message).getText();
ByteArrayInputStream bis = new ByteArrayInputStream(meta.getBytes("UTF-8"));
XMLInputSource in1 = new XMLInputSource(bis, null);
// Adam - store ResouceMetaData in field so we can return it from getMetaData().
resourceMetadata = (ProcessingResourceMetaData) UIMAFramework.getXMLParser()
.parseResourceMetaData(in1);
// if remote delegate, save type system
if (!brokerURI.startsWith("vm:")) { // test if remote
setRemoteTypeSystem(AggregateAnalysisEngineController_impl.getTypeSystemImpl(resourceMetadata));
}
casMultiplierDelegate = resourceMetadata.getOperationalProperties().getOutputsNewCASes();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
"handleMetadataReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_handling_meta_reply_FINEST",
new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom), meta });
}
// check the state of the client
if ( running && asynchManager != null ) {
// Merge the metadata only if the client is still running
asynchManager.addMetadata(resourceMetadata);
}
}
} catch (Exception e) {
throw e;
} finally {
getMetaSemaphore.release();
}
}