in core/src/main/java/flex/messaging/services/MessageService.java [223:301]
public Object serviceMessage(Message message, boolean throttle, MessageDestination dest) {
if (managed)
incrementMessageCount(false, message);
if (throttle) {
// Throttle the inbound message; also attempts to prevent duplicate messages sent by a client.
dest = (MessageDestination) getDestination(message);
ThrottleManager throttleManager = dest.getThrottleManager();
if (throttleManager != null && throttleManager.throttleIncomingMessage(message))
return null; // Message throttled.
}
// Block any sent messages that have a subtopic header containing
// wildcards - wildcards are only supported in subscribe/unsubscribe
// commands (see serviceCommand() and manageSubscriptions()).
Object subtopicObj = message.getHeader(AsyncMessage.SUBTOPIC_HEADER_NAME);
List<Subtopic> subtopics = null;
if (subtopicObj != null) {
if (subtopicObj instanceof Object[])
subtopicObj = Arrays.asList((Object[]) subtopicObj);
if (subtopicObj instanceof String) {
String subtopicString = (String) subtopicObj;
if (subtopicString != null && subtopicString.length() > 0) {
if (dest == null)
dest = (MessageDestination) getDestination(message);
Subtopic subtopic = testProducerSubtopic(dest.getServerSettings().getSubtopicSeparator(), subtopicString);
if (subtopics == null)
subtopics = new ArrayList<Subtopic>();
subtopics.add(subtopic);
}
} else if (subtopicObj instanceof List) {
@SuppressWarnings("unchecked")
List<String> subtopicList = (List<String>) subtopicObj;
String subtopicSeperator = null;
for (String subtopicString : subtopicList) {
if (subtopicString != null && subtopicString.length() > 0) {
if (dest == null)
dest = (MessageDestination) getDestination(message);
subtopicSeperator = dest.getServerSettings().getSubtopicSeparator();
Subtopic subtopic = testProducerSubtopic(subtopicSeperator, subtopicString);
if (subtopics == null)
subtopics = new ArrayList<Subtopic>();
subtopics.add(subtopic);
}
}
}
}
// Override TTL if there was one specifically configured for this destination
if (dest == null)
dest = (MessageDestination) getDestination(message);
ServerSettings destServerSettings = dest.getServerSettings();
if (destServerSettings.getMessageTTL() >= 0)
message.setTimeToLive(destServerSettings.getMessageTTL());
long start = 0;
if (debug)
start = System.currentTimeMillis();
// Give MessagingAdapter a chance to block the send.
ServiceAdapter adapter = dest.getAdapter();
if (adapter instanceof MessagingAdapter) {
MessagingSecurityConstraintManager manager = ((MessagingAdapter) adapter).getSecurityConstraintManager();
if (manager != null)
manager.assertSendAuthorization();
}
MessagePerformanceUtils.markServerPreAdapterTime(message);
Object result = adapter.invoke(message);
MessagePerformanceUtils.markServerPostAdapterTime(message);
if (debug) {
long end = System.currentTimeMillis();
Log.getLogger(TIMING_LOG_CATEGORY).debug("After invoke service: " + getId() + "; execution time = " + (end - start) + "ms");
}
return result;
}