public Message serviceMessage()

in core/src/main/java/flex/messaging/endpoints/AbstractEndpoint.java [874:962]


    public Message serviceMessage(Message message) {
        if (isManaged()) {
            ((EndpointControl) getControl()).incrementServiceMessageCount();
        }

        try {
            FlexContext.setThreadLocalEndpoint(this);
            Message ack = null;

            // Make sure this message is timestamped.
            if (message.getTimestamp() == 0) {
                message.setTimestamp(System.currentTimeMillis());
            }

            // Reset the endpoint header for inbound messages to the id for this endpoint
            // to guarantee that it's correct. Don't allow clients to spoof this.
            // However, if the endpoint id is passed as null we need to tag the message to
            // skip channel/endpoint validation at the destination level (MessageBroker.inspectChannel()).
            if (message.getHeader(Message.ENDPOINT_HEADER) != null)
                message.setHeader(Message.VALIDATE_ENDPOINT_HEADER, Boolean.TRUE);
            message.setHeader(Message.ENDPOINT_HEADER, getId());

            if (message instanceof CommandMessage) {
                CommandMessage command = (CommandMessage) message;

                // Apply channel endpoint level constraint; always allow login commands through.
                int operation = command.getOperation();
                if (operation != CommandMessage.LOGIN_OPERATION)
                    checkSecurityConstraint(message);

                // Handle general (not Consumer specific) poll requests here.
                // We need to fetch all outbound messages for client subscriptions over this endpoint.
                // We identify these general poll messages by their operation and a null clientId.
                if (operation == CommandMessage.POLL_OPERATION && message.getClientId() == null) {
                    verifyFlexClientSupport(command);


                    FlexClient flexClient = FlexContext.getFlexClient();
                    ack = handleFlexClientPollCommand(flexClient, command);
                } else if (operation == CommandMessage.DISCONNECT_OPERATION) {
                    ack = handleChannelDisconnect(command);
                } else if (operation == CommandMessage.TRIGGER_CONNECT_OPERATION) {
                    ack = new AcknowledgeMessage();
                    ((AcknowledgeMessage) ack).setCorrelationId(message.getMessageId());

                    boolean needsConfig = false;
                    if (command.getHeader(CommandMessage.NEEDS_CONFIG_HEADER) != null)
                        needsConfig = ((Boolean) (command.getHeader(CommandMessage.NEEDS_CONFIG_HEADER)));

                    // Send configuration information only if the client requested.
                    if (needsConfig) {
                        ConfigMap serverConfig = getMessageBroker().describeServices(this);
                        if (serverConfig.size() > 0)
                            ack.setBody(serverConfig);
                    }
                } else {
                    // Block a subset of commands for legacy clients that need to be recompiled to
                    // interop with a 2.5+ server.
                    if (operation == CommandMessage.SUBSCRIBE_OPERATION || operation == CommandMessage.POLL_OPERATION)
                        verifyFlexClientSupport(command);

                    ack = getMessageBroker().routeCommandToService((CommandMessage) message, this);

                    // Look for client advertised features on initial connect.
                    if (operation == CommandMessage.CLIENT_PING_OPERATION || operation == CommandMessage.LOGIN_OPERATION) {
                        Number clientVersion = (Number) command.getHeader(CommandMessage.MESSAGING_VERSION);
                        handleClientMessagingVersion(clientVersion);

                        // Also respond by advertising the messaging version on the
                        // acknowledgement.
                        ack.setHeader(CommandMessage.MESSAGING_VERSION, new Double(messagingVersion));
                    }
                }
            } else {
                // Block any AsyncMessages from a legacy client.
                if (message instanceof AsyncMessage)
                    verifyFlexClientSupport(message);

                // Apply channel endpoint level constraint.
                checkSecurityConstraint(message);

                ack = getMessageBroker().routeMessageToService(message, this);
            }

            return ack;
        } finally {
            FlexContext.setThreadLocalEndpoint(null);
        }
    }