protected void dispatchMessage()

in uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java [248:424]


	protected void dispatchMessage(PendingMessage pm, BaseUIMAAsynchronousEngineCommon_impl engine,
			boolean casProcessRequest) throws Exception {
		SharedConnection sc = engine.lookupConnection(engine.getBrokerURI());
		ClientRequest cacheEntry = null;
		boolean doCallback = false;
		boolean addTimeToLive = true;
		Session jmsSession = null;

		// Check the environment for existence of NoTTL tag. If present,
		// the deployer of the service wants to disable message expiration.
		if (System.getProperty("NoTTL") != null) {
			addTimeToLive = false;
		}
		try {
			// long t1 = System.currentTimeMillis();
			jmsSession = sc.getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);

			// Request JMS Message from the concrete implementation
			Message message = null;
			// Determine if this a CAS Process Request
			// boolean casProcessRequest = isProcessRequest(pm);
			// Only Process request can be serialized as binary
			if (casProcessRequest && (engine.getSerialFormat() != SerialFormat.XMI)) {
				message = jmsSession.createBytesMessage();
			} else {
				message = jmsSession.createTextMessage();
			}
			// get the producer initialized from a valid connection
			// producer = getMessageProducer();

			Destination d = null;
			String selector = null;
			// UIMA-AS ver 2.10.0 + sends Free Cas request to a service targeted queue
			// instead of a temp queue. Regular queues can be recovered in case of
			// a broker restart. The test below will be true for UIMA-AS v. 2.10.0 +.
			// Code in JmsOutputChannel will add the selector if the service is a CM.
			if (pm.get(AsynchAEMessage.TargetingSelector) != null) {
				selector = (String) pm.get(AsynchAEMessage.TargetingSelector);
			}
			if (selector == null && (pm.getMessageType() == AsynchAEMessage.ReleaseCAS
					|| pm.getMessageType() == AsynchAEMessage.Stop)) {
				d = (Destination) pm.get(AsynchAEMessage.Destination);

			} else {
				d = jmsSession.createQueue(destinationName);
			}
			MessageProducer mProducer = jmsSession.createProducer(d);
			mProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
			// System.out.println(">>>>>>> Time to create and initialize JMS
			// Sesssion:"+(System.currentTimeMillis()-t1));
			super.initializeMessage(pm, message);
			String destination = ((ActiveMQDestination) d).getPhysicalName();
			if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
				UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "run",
						JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_msg_to_endpoint__FINE",
						new Object[] {
								UimaMessageValidator.decodeIntToString(AsynchAEMessage.Command,
										message.getIntProperty(AsynchAEMessage.Command)),
								UimaMessageValidator.decodeIntToString(AsynchAEMessage.MessageType,
										message.getIntProperty(AsynchAEMessage.MessageType)),
								destination });
			}
			if (casProcessRequest) {
				cacheEntry = (ClientRequest) engine.getCache().get(pm.get(AsynchAEMessage.CasReference));
				if (cacheEntry != null) {
					// CAS cas = cacheEntry.getCAS();
					// enable logging
					if (System.getProperty("UimaAsCasTracking") != null) {
						message.setStringProperty("UimaAsCasTracking", "enable");
					}
					// Target specific service instance if targeting for the CAS is provided
					// by the client application
					if (cacheEntry.getTargetServiceId() != null) {
						// System.out.println("------------Client Sending CAS to Service Instance With
						// Id:"+cacheEntry.getTargetServiceId());;
						message.setStringProperty(UimaAsynchronousEngine.TargetSelectorProperty,
								cacheEntry.getTargetServiceId());
					}
					// Use Process Timeout value for the time-to-live property in the
					// outgoing JMS message. When this time is exceeded
					// while the message sits in a queue, the JMS Server will remove it from
					// the queue. What happens with the expired message depends on the
					// configuration. Most JMS Providers create a special dead-letter queue
					// where all expired messages are placed. NOTE: In ActiveMQ expired msgs in the
					// DLQ
					// are not auto evicted yet and accumulate taking up memory.
					long timeoutValue = cacheEntry.getProcessTimeout();

					if (timeoutValue > 0 && addTimeToLive) {
						// Set high time to live value
						message.setJMSExpiration(10 * timeoutValue);
					}
					if (pm.getMessageType() == AsynchAEMessage.Process) {
						cacheEntry.setCASDepartureTime(System.nanoTime());
					}
					cacheEntry.setCASDepartureTime(System.nanoTime());

					doCallback = true;

				} else {
					if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {

						UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "run",
								JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_failed_cache_lookup__WARNING",
								new Object[] { pm.get(AsynchAEMessage.CasReference),
										UimaMessageValidator.decodeIntToString(AsynchAEMessage.Command,
												message.getIntProperty(AsynchAEMessage.Command)),
										UimaMessageValidator.decodeIntToString(AsynchAEMessage.MessageType,
												message.getIntProperty(AsynchAEMessage.MessageType)),
										destination });
					}
					return;  // no cacheEntry so just return
				}

			}
			// start timers
			if (casProcessRequest) {
				CAS cas = cacheEntry.getCAS();

				// Add the cas to a list of CASes pending reply. Also start the timer if
				// necessary
				engine.serviceDelegate.addCasToOutstandingList(cacheEntry.getCasReferenceId(), cas.hashCode(),
						engine.timerPerCAS); // true=timer per cas
				if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
					UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "sendCAS",
							JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_cas_added_to_pending_FINE",
							new Object[] { cacheEntry.getCasReferenceId(), String.valueOf(cas.hashCode()),
									engine.serviceDelegate.toString() });
				}
			} else if (pm.getMessageType() == AsynchAEMessage.GetMeta
					&& engine.serviceDelegate.getGetMetaTimeout() > 0) {
				// timer for PING has been started in sendCAS()
				if (!engine.serviceDelegate.isAwaitingPingReply()) {
					engine.serviceDelegate.startGetMetaRequestTimer();
				}
			} else {
				doCallback = false; // dont call onBeforeMessageSend() callback on CPC
			}
			// Dispatch asynchronous request to Uima AS service
			mProducer.send(message);

			if (doCallback) {
				UimaASProcessStatus status = new UimaASProcessStatusImpl(new ProcessTrace_impl(), cacheEntry.getCAS(),
						cacheEntry.getCasReferenceId());
				// Notify engine before sending a message
				if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
					UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "run",
							JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_calling_onBeforeMessageSend__FINE",
							new Object[] { pm.get(AsynchAEMessage.CasReference),
									String.valueOf(cacheEntry.getCAS().hashCode()) });
				}
				// Note the callback is a misnomer. The callback is made *after* the send now
				// Application receiving this callback can consider the CAS as delivere to a
				// queue
				engine.onBeforeMessageSend(status);

			}
		} catch( Exception e) {
            if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
                UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
                        "run", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                        "UIMAEE_exception__WARNING", e);
            }
			if (casProcessRequest) {
				addCasToOutstandingList((String)pm.get(AsynchAEMessage.CasReference));
			}
		} finally {
			if (jmsSession != null) {
				try {
					jmsSession.close();
				} catch (Exception eee) {

				}
			}
		}

	}