in core/src/main/java/flex/messaging/endpoints/AbstractEndpoint.java [874:962]
public Message serviceMessage(Message message) {
if (isManaged()) {
((EndpointControl) getControl()).incrementServiceMessageCount();
}
try {
FlexContext.setThreadLocalEndpoint(this);
Message ack = null;
// Make sure this message is timestamped.
if (message.getTimestamp() == 0) {
message.setTimestamp(System.currentTimeMillis());
}
// Reset the endpoint header for inbound messages to the id for this endpoint
// to guarantee that it's correct. Don't allow clients to spoof this.
// However, if the endpoint id is passed as null we need to tag the message to
// skip channel/endpoint validation at the destination level (MessageBroker.inspectChannel()).
if (message.getHeader(Message.ENDPOINT_HEADER) != null)
message.setHeader(Message.VALIDATE_ENDPOINT_HEADER, Boolean.TRUE);
message.setHeader(Message.ENDPOINT_HEADER, getId());
if (message instanceof CommandMessage) {
CommandMessage command = (CommandMessage) message;
// Apply channel endpoint level constraint; always allow login commands through.
int operation = command.getOperation();
if (operation != CommandMessage.LOGIN_OPERATION)
checkSecurityConstraint(message);
// Handle general (not Consumer specific) poll requests here.
// We need to fetch all outbound messages for client subscriptions over this endpoint.
// We identify these general poll messages by their operation and a null clientId.
if (operation == CommandMessage.POLL_OPERATION && message.getClientId() == null) {
verifyFlexClientSupport(command);
FlexClient flexClient = FlexContext.getFlexClient();
ack = handleFlexClientPollCommand(flexClient, command);
} else if (operation == CommandMessage.DISCONNECT_OPERATION) {
ack = handleChannelDisconnect(command);
} else if (operation == CommandMessage.TRIGGER_CONNECT_OPERATION) {
ack = new AcknowledgeMessage();
((AcknowledgeMessage) ack).setCorrelationId(message.getMessageId());
boolean needsConfig = false;
if (command.getHeader(CommandMessage.NEEDS_CONFIG_HEADER) != null)
needsConfig = ((Boolean) (command.getHeader(CommandMessage.NEEDS_CONFIG_HEADER)));
// Send configuration information only if the client requested.
if (needsConfig) {
ConfigMap serverConfig = getMessageBroker().describeServices(this);
if (serverConfig.size() > 0)
ack.setBody(serverConfig);
}
} else {
// Block a subset of commands for legacy clients that need to be recompiled to
// interop with a 2.5+ server.
if (operation == CommandMessage.SUBSCRIBE_OPERATION || operation == CommandMessage.POLL_OPERATION)
verifyFlexClientSupport(command);
ack = getMessageBroker().routeCommandToService((CommandMessage) message, this);
// Look for client advertised features on initial connect.
if (operation == CommandMessage.CLIENT_PING_OPERATION || operation == CommandMessage.LOGIN_OPERATION) {
Number clientVersion = (Number) command.getHeader(CommandMessage.MESSAGING_VERSION);
handleClientMessagingVersion(clientVersion);
// Also respond by advertising the messaging version on the
// acknowledgement.
ack.setHeader(CommandMessage.MESSAGING_VERSION, new Double(messagingVersion));
}
}
} else {
// Block any AsyncMessages from a legacy client.
if (message instanceof AsyncMessage)
verifyFlexClientSupport(message);
// Apply channel endpoint level constraint.
checkSecurityConstraint(message);
ack = getMessageBroker().routeMessageToService(message, this);
}
return ack;
} finally {
FlexContext.setThreadLocalEndpoint(null);
}
}