public boolean send()

in uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java [630:838]


  public boolean send(final Message aMessage, long msgSize, boolean startTimer, boolean failOnJMSException) {
    String destinationName = "";
    String target = "Delegate";
    int msgType = 0;
    int command = 0;
    try {
      msgType = aMessage.getIntProperty(AsynchAEMessage.MessageType);
      command = aMessage.getIntProperty(AsynchAEMessage.Command);
      boolean newCAS = false;
      if ( aMessage.propertyExists(AsynchAEMessage.CasSequence) &&
              aMessage.getLongProperty(AsynchAEMessage.CasSequence) > 0 ) {
        newCAS = true;
      }
      
      if ( msgType == AsynchAEMessage.Response || (msgType == AsynchAEMessage.Request && newCAS) ) {
        target = "Client";
      }
      Endpoint masterEndpoint = null;
      if ( delegateEndpoint != null && delegateEndpoint.getDelegateKey() != null ) {
        masterEndpoint = ((AggregateAnalysisEngineController) controller).lookUpEndpoint(
                delegateEndpoint.getDelegateKey(), false);
        // Endpoint is marked as FAILED by the aggregate when it detects broker connection
        // failure. In such an event the aggregate stops the listener on the delegate
        // reply queue.
        if ( msgType == AsynchAEMessage.Request && command == AsynchAEMessage.Process &&
             masterEndpoint != null && masterEndpoint.getStatus() == Endpoint.FAILED) {
          HashMap<Object, Object> map = new HashMap<Object, Object>();
          Delegate delegate = ((AggregateAnalysisEngineController) controller).lookupDelegate(delegateEndpoint.getDelegateKey());
          //  Cancel Delegate timer before entering Error Handler
          if ( delegate != null ) {
            delegate.cancelDelegateTimer();
          }
          //  Handle the Connection error in the ProcessErrorHandler
          map.put(AsynchAEMessage.Command, AsynchAEMessage.Process);
          map.put(AsynchAEMessage.CasReference, aMessage.getStringProperty(AsynchAEMessage.CasReference));
          map.put(AsynchAEMessage.Endpoint, masterEndpoint);
          Exception e = new DelegateConnectionLostException("Controller:"+controller.getComponentName()+" Lost Connection to "+target+ ":"+masterEndpoint.getDelegateKey());
          //  Handle error in ProcessErrorHandler
          ((BaseAnalysisEngineController)controller).handleError(map, e);
          return true; // return true as if this was successful send 
        }
      }

      if ( !isOpen() ) {
        if (delayCasDelivery(msgType, aMessage, command)) {
          // Return true as if the CAS was sent
          return true;
        }
      }
      // Stop messages and replies are sent to the endpoint provided in the destination object
      if ((command == AsynchAEMessage.Stop || command == AsynchAEMessage.ReleaseCAS || isReplyEndpoint)
              && delegateEndpoint.getDestination() != null) {
        destinationName = ((ActiveMQDestination) delegateEndpoint.getDestination())
                .getPhysicalName();
        if (UIMAFramework.getLogger().isLoggable(Level.FINE)) {
          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "send",
                  JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_msg_to_endpoint__FINE",
                  new Object[] { destinationName });
        }
        logMessageSize(aMessage, msgSize, destinationName);
        synchronized (producer) {
            // create amq async callback listener to detect jms msg delivery problems
        	AsyncCallback onComplete = createAMQCallbackListener(command, aMessage);
        	// if the msg cannot be delivered due to invalid destination, the send does
        	// not fail since we are using AMQ async sends. To detect delivery issues
        	// we use callback listener where such conditions are detected and handled
        	((ActiveMQMessageProducer)producer).send((Destination) delegateEndpoint.getDestination(), aMessage, onComplete);
        }
      } else {
        destinationName = ((ActiveMQQueue) producer.getDestination()).getPhysicalName();
        if (UIMAFramework.getLogger().isLoggable(Level.FINE)) {
          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "send",
                  JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_msg_to_endpoint__FINE",
                  new Object[] { destinationName });
        }
        logMessageSize(aMessage, msgSize, destinationName);
   	    // If in ParallelStep its possible to receive a reply from one of the delegates in parallel 
  	    // step *before* a CAS is dispatched to all of the delegates. This can cause a problem
  	    // as replies are merged which causes the CAS to be in an inconsistent state.
  	    // The following code calls dispatchCasToParallelDelegate() which count down
  	    // a java latch. The same latch is used when receiving replies. If the latch is non zero
  	    // the code blocks a thread from performing deserialization.
  	    if ( msgType == AsynchAEMessage.Request && command == AsynchAEMessage.Process ) {
  		  String casReferenceId = aMessage.getStringProperty(AsynchAEMessage.CasReference);
  		  CasStateEntry casStateEntry = controller.getLocalCache().lookupEntry(casReferenceId);
  		  if ( casStateEntry.getNumberOfParallelDelegates() > 0) {
  			  casStateEntry.dispatchedCasToParallelDelegate();
  		  }
  	    }

        synchronized (producer) {
            // create amq async callback listener to detect jms msg delivery problems
        	AsyncCallback onComplete = createAMQCallbackListener(command, aMessage);
        	// if the msg cannot be delivered due to invalid destination, the send does
        	// not fail since we are using AMQ async sends. To detect delivery issues
        	// we use callback listener where such conditions are detected and handled
        	((ActiveMQMessageProducer)producer).send(aMessage, onComplete);
        }

      }
      // Starts a timer on a broker connection. Every time a new message
      // is sent to a destination managed by the broker the timer is
      // restarted. The main purpose of the timer is to close connections
      // that are not used.
      if (startTimer) {
//        brokerDestinations.getConnectionTimer().startTimer(connectionCreationTimestamp,
//                delegateEndpoint);
      }
      // record the time when this dispatches sent a message. This time will be used
      // to find inactive sessions.
	  lastDispatchTimestamp.set(System.currentTimeMillis());
      // Succeeded sending the CAS
	  return true;
    } catch (Exception e) {
    	
    	 // if a client terminates with an outstanding request, the service will not
        // be able to deliver a reply. Just log the fact that the reply queue is
        // no longer available.
      if ( e instanceof InvalidDestinationException && "Client".equals(target) ) {
          if ( delegateEndpoint != null ) {
            endpointName = ((ActiveMQDestination) delegateEndpoint.getDestination())
            		.getPhysicalName();
          }
    	  UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
                  "send", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                  "UIMAJMS_invalid_destination__INFO",
                  new Object[] { controller.getComponentName(),endpointName });
    	  if ( command == AsynchAEMessage.ServiceInfo ) {
    		  return false;
    	  }
     	  if ( (msgType == AsynchAEMessage.Response || msgType == AsynchAEMessage.Request ) &&
    			  command == AsynchAEMessage.Process ) {
    		  String casReferenceId="";
    		  try {
    		     casReferenceId = aMessage.getStringProperty(AsynchAEMessage.CasReference);
    		  } catch( Exception exx ) {
    		        String key = "";
    		        String endpointName = "";
    		        if ( delegateEndpoint != null ) {
    		          key = delegateEndpoint.getDelegateKey();
    		          endpointName = ((ActiveMQDestination) delegateEndpoint.getDestination())
    		          .getPhysicalName();
    		        }
    	          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
    	                  "send", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
    	                  "UIMAEE_service_delivery_exception__WARNING",new Object[] { controller.getComponentName(), key, endpointName});
    		  }
   		      CasStateEntry casStateEntry = controller.getLocalCache().lookupEntry(casReferenceId);
   		      // Mark the CAS as failed so that the CAS is released and cache cleaned up
    		  casStateEntry.setDeliveryToClientFailed();
    	  }
    	  return true;  // expect the client can go away at any time. Not an error
      }    	
    	
    	
      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
    	  
        String key = "";
        String endpointName = "";
        if ( delegateEndpoint != null ) {
          key = delegateEndpoint.getDelegateKey();
          endpointName = ((ActiveMQDestination) delegateEndpoint.getDestination())
          .getPhysicalName();
          
            }
        if ( "Client".equals(target) ) {
          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
                  "send", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                  "UIMAEE_service_delivery_to_client_exception__WARNING",
                  new Object[] { controller.getComponentName(),endpointName });
        } else {

          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
                  "send", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                  "UIMAEE_service_delivery_exception__WARNING",new Object[] { controller.getComponentName(), key, endpointName});
        }

        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
                "send", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                "UIMAJMS_exception__WARNING", e);
      }
      // If the controller has been stopped no need to send messages
      if (controller.isStopped()) {
        return false;
       } else {
        if (e instanceof JMSException) {
          handleJmsException((JMSException) e);
          //	whoever called this method is interested in knowing that there was JMS Exception
          if ( failOnJMSException ) {
        	  return false;
          }
        } else {
          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
                    "send", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                    "UIMAEE_service_exception_WARNING", controller.getComponentName());

            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "send",
                    JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_exception__WARNING",
                    e);
          }
        }

      }
    }
    //brokerDestinations.getConnectionTimer().stopTimer();
    // Failed here
    return false;
  }