in uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java [630:838]
public boolean send(final Message aMessage, long msgSize, boolean startTimer, boolean failOnJMSException) {
String destinationName = "";
String target = "Delegate";
int msgType = 0;
int command = 0;
try {
msgType = aMessage.getIntProperty(AsynchAEMessage.MessageType);
command = aMessage.getIntProperty(AsynchAEMessage.Command);
boolean newCAS = false;
if ( aMessage.propertyExists(AsynchAEMessage.CasSequence) &&
aMessage.getLongProperty(AsynchAEMessage.CasSequence) > 0 ) {
newCAS = true;
}
if ( msgType == AsynchAEMessage.Response || (msgType == AsynchAEMessage.Request && newCAS) ) {
target = "Client";
}
Endpoint masterEndpoint = null;
if ( delegateEndpoint != null && delegateEndpoint.getDelegateKey() != null ) {
masterEndpoint = ((AggregateAnalysisEngineController) controller).lookUpEndpoint(
delegateEndpoint.getDelegateKey(), false);
// Endpoint is marked as FAILED by the aggregate when it detects broker connection
// failure. In such an event the aggregate stops the listener on the delegate
// reply queue.
if ( msgType == AsynchAEMessage.Request && command == AsynchAEMessage.Process &&
masterEndpoint != null && masterEndpoint.getStatus() == Endpoint.FAILED) {
HashMap<Object, Object> map = new HashMap<Object, Object>();
Delegate delegate = ((AggregateAnalysisEngineController) controller).lookupDelegate(delegateEndpoint.getDelegateKey());
// Cancel Delegate timer before entering Error Handler
if ( delegate != null ) {
delegate.cancelDelegateTimer();
}
// Handle the Connection error in the ProcessErrorHandler
map.put(AsynchAEMessage.Command, AsynchAEMessage.Process);
map.put(AsynchAEMessage.CasReference, aMessage.getStringProperty(AsynchAEMessage.CasReference));
map.put(AsynchAEMessage.Endpoint, masterEndpoint);
Exception e = new DelegateConnectionLostException("Controller:"+controller.getComponentName()+" Lost Connection to "+target+ ":"+masterEndpoint.getDelegateKey());
// Handle error in ProcessErrorHandler
((BaseAnalysisEngineController)controller).handleError(map, e);
return true; // return true as if this was successful send
}
}
if ( !isOpen() ) {
if (delayCasDelivery(msgType, aMessage, command)) {
// Return true as if the CAS was sent
return true;
}
}
// Stop messages and replies are sent to the endpoint provided in the destination object
if ((command == AsynchAEMessage.Stop || command == AsynchAEMessage.ReleaseCAS || isReplyEndpoint)
&& delegateEndpoint.getDestination() != null) {
destinationName = ((ActiveMQDestination) delegateEndpoint.getDestination())
.getPhysicalName();
if (UIMAFramework.getLogger().isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "send",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_msg_to_endpoint__FINE",
new Object[] { destinationName });
}
logMessageSize(aMessage, msgSize, destinationName);
synchronized (producer) {
// create amq async callback listener to detect jms msg delivery problems
AsyncCallback onComplete = createAMQCallbackListener(command, aMessage);
// if the msg cannot be delivered due to invalid destination, the send does
// not fail since we are using AMQ async sends. To detect delivery issues
// we use callback listener where such conditions are detected and handled
((ActiveMQMessageProducer)producer).send((Destination) delegateEndpoint.getDestination(), aMessage, onComplete);
}
} else {
destinationName = ((ActiveMQQueue) producer.getDestination()).getPhysicalName();
if (UIMAFramework.getLogger().isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "send",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_msg_to_endpoint__FINE",
new Object[] { destinationName });
}
logMessageSize(aMessage, msgSize, destinationName);
// If in ParallelStep its possible to receive a reply from one of the delegates in parallel
// step *before* a CAS is dispatched to all of the delegates. This can cause a problem
// as replies are merged which causes the CAS to be in an inconsistent state.
// The following code calls dispatchCasToParallelDelegate() which count down
// a java latch. The same latch is used when receiving replies. If the latch is non zero
// the code blocks a thread from performing deserialization.
if ( msgType == AsynchAEMessage.Request && command == AsynchAEMessage.Process ) {
String casReferenceId = aMessage.getStringProperty(AsynchAEMessage.CasReference);
CasStateEntry casStateEntry = controller.getLocalCache().lookupEntry(casReferenceId);
if ( casStateEntry.getNumberOfParallelDelegates() > 0) {
casStateEntry.dispatchedCasToParallelDelegate();
}
}
synchronized (producer) {
// create amq async callback listener to detect jms msg delivery problems
AsyncCallback onComplete = createAMQCallbackListener(command, aMessage);
// if the msg cannot be delivered due to invalid destination, the send does
// not fail since we are using AMQ async sends. To detect delivery issues
// we use callback listener where such conditions are detected and handled
((ActiveMQMessageProducer)producer).send(aMessage, onComplete);
}
}
// Starts a timer on a broker connection. Every time a new message
// is sent to a destination managed by the broker the timer is
// restarted. The main purpose of the timer is to close connections
// that are not used.
if (startTimer) {
// brokerDestinations.getConnectionTimer().startTimer(connectionCreationTimestamp,
// delegateEndpoint);
}
// record the time when this dispatches sent a message. This time will be used
// to find inactive sessions.
lastDispatchTimestamp.set(System.currentTimeMillis());
// Succeeded sending the CAS
return true;
} catch (Exception e) {
// if a client terminates with an outstanding request, the service will not
// be able to deliver a reply. Just log the fact that the reply queue is
// no longer available.
if ( e instanceof InvalidDestinationException && "Client".equals(target) ) {
if ( delegateEndpoint != null ) {
endpointName = ((ActiveMQDestination) delegateEndpoint.getDestination())
.getPhysicalName();
}
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
"send", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_invalid_destination__INFO",
new Object[] { controller.getComponentName(),endpointName });
if ( command == AsynchAEMessage.ServiceInfo ) {
return false;
}
if ( (msgType == AsynchAEMessage.Response || msgType == AsynchAEMessage.Request ) &&
command == AsynchAEMessage.Process ) {
String casReferenceId="";
try {
casReferenceId = aMessage.getStringProperty(AsynchAEMessage.CasReference);
} catch( Exception exx ) {
String key = "";
String endpointName = "";
if ( delegateEndpoint != null ) {
key = delegateEndpoint.getDelegateKey();
endpointName = ((ActiveMQDestination) delegateEndpoint.getDestination())
.getPhysicalName();
}
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"send", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_service_delivery_exception__WARNING",new Object[] { controller.getComponentName(), key, endpointName});
}
CasStateEntry casStateEntry = controller.getLocalCache().lookupEntry(casReferenceId);
// Mark the CAS as failed so that the CAS is released and cache cleaned up
casStateEntry.setDeliveryToClientFailed();
}
return true; // expect the client can go away at any time. Not an error
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
String key = "";
String endpointName = "";
if ( delegateEndpoint != null ) {
key = delegateEndpoint.getDelegateKey();
endpointName = ((ActiveMQDestination) delegateEndpoint.getDestination())
.getPhysicalName();
}
if ( "Client".equals(target) ) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"send", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_service_delivery_to_client_exception__WARNING",
new Object[] { controller.getComponentName(),endpointName });
} else {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"send", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_service_delivery_exception__WARNING",new Object[] { controller.getComponentName(), key, endpointName});
}
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"send", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_exception__WARNING", e);
}
// If the controller has been stopped no need to send messages
if (controller.isStopped()) {
return false;
} else {
if (e instanceof JMSException) {
handleJmsException((JMSException) e);
// whoever called this method is interested in knowing that there was JMS Exception
if ( failOnJMSException ) {
return false;
}
} else {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"send", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_service_exception_WARNING", controller.getComponentName());
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "send",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_exception__WARNING",
e);
}
}
}
}
//brokerDestinations.getConnectionTimer().stopTimer();
// Failed here
return false;
}