protected void handleFlexClientStreamingOpenRequest()

in core/src/main/java/flex/messaging/endpoints/BaseStreamingHTTPEndpoint.java [599:904]


    protected void handleFlexClientStreamingOpenRequest(HttpServletRequest req, HttpServletResponse res, FlexClient flexClient) {
        FlexSession session = FlexContext.getFlexSession();
        if (canStream && session.canStream) {
            // If canStream/session.canStream is true it means we currently have
            // less than the max number of allowed streaming threads, per endpoint/session.

            // We need to protect writes/reads to the stream count with the endpoint's lock.
            // Also, we have to be careful to handle the case where two threads get to this point when only
            // one streaming spot remains; one thread will win and the other needs to fault.
            boolean thisThreadCanStream;
            synchronized (lock) {
                ++streamingClientsCount;
                if (streamingClientsCount == maxStreamingClients) {
                    thisThreadCanStream = true; // This thread got the last spot.
                    canStream = false;
                } else if (streamingClientsCount > maxStreamingClients) {
                    thisThreadCanStream = false; // This thread was beaten out for the last spot.
                    --streamingClientsCount; // Decrement the count because we're not going to grant the streaming right to the client.
                } else {
                    // We haven't hit the limit yet, allow this thread to stream.
                    thisThreadCanStream = true;
                }
            }

            // If the thread cannot wait due to endpoint streaming connection
            // limit, inform the client and return.
            if (!thisThreadCanStream) {
                String errorMessage = "Endpoint with id '" + getId() + "' cannot grant streaming connection to FlexClient with id '"
                        + flexClient.getId() + "' because " + MAX_STREAMING_CLIENTS + " limit of '"
                        + maxStreamingClients + "' has been reached.";
                if (Log.isError())
                    log.error(errorMessage);
                try {
                    errorMessage = "Endpoint with id '" + getId() + "' cannot grant streaming connection to FlexClient with id '"
                            + flexClient.getId() + "' because " + MAX_STREAMING_CLIENTS + " limit has been reached.";
                    res.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, errorMessage);
                } catch (IOException ignore) {
                }
                return;
            }

            // Setup for specific user agents.
            byte[] kickStartBytesToStream = null;
            String userAgentValue = req.getHeader(UserAgentManager.USER_AGENT_HEADER_NAME);
            UserAgentSettings agentSettings = userAgentManager.match(userAgentValue);
            if (agentSettings != null) {
                synchronized (session) {
                    session.maxConnectionsPerSession = agentSettings.getMaxPersistentConnectionsPerSession();
                }

                int kickStartBytes = agentSettings.getKickstartBytes();
                if (kickStartBytes > 0) {
                    // Determine the minimum number of actual bytes that need to be sent to
                    // kickstart, taking into account transfer-encoding overhead.
                    try {
                        int chunkLengthHeaderSize = Integer.toHexString(kickStartBytes).getBytes("ASCII").length;
                        int chunkOverhead = chunkLengthHeaderSize + 4; // 4 for the 2 wrapping CRLF tokens.
                        int minimumKickstartBytes = kickStartBytes - chunkOverhead;
                        kickStartBytesToStream = new byte[(minimumKickstartBytes > 0) ? minimumKickstartBytes :
                                kickStartBytes];
                    } catch (UnsupportedEncodingException ignore) {
                        kickStartBytesToStream = new byte[kickStartBytes];
                    }
                    Arrays.fill(kickStartBytesToStream, NULL_BYTE);
                }
            }

            // Now, check with the session before granting the streaming connection.
            synchronized (session) {
                ++session.streamingConnectionsCount;
                if (session.maxConnectionsPerSession == FlexSession.MAX_CONNECTIONS_PER_SESSION_UNLIMITED) {
                    thisThreadCanStream = true;
                } else if (session.streamingConnectionsCount == session.maxConnectionsPerSession) {
                    thisThreadCanStream = true; // This thread got the last spot in the session.
                    session.canStream = false;
                } else if (session.streamingConnectionsCount > session.maxConnectionsPerSession) {
                    thisThreadCanStream = false; // This thread was beaten out for the last spot.
                    --session.streamingConnectionsCount;
                    synchronized (lock) {
                        // Decrement the endpoint count because we're not going to grant the streaming right to the client.
                        --streamingClientsCount;
                    }
                } else {
                    // We haven't hit the limit yet, allow this thread to stream.
                    thisThreadCanStream = true;
                }
            }

            // If the thread cannot wait due to session streaming connection
            // limit, inform the client and return.
            if (!thisThreadCanStream) {
                if (Log.isError())
                    log.error("Endpoint with id '" + getId() + "' cannot grant streaming connection to FlexClient with id '"
                            + flexClient.getId() + "' because " + UserAgentManager.MAX_PERSISTENT_CONNECTIONS_PER_SESSION + " limit of '" + session.maxConnectionsPerSession
                            + ((agentSettings != null) ? "' for user-agent '" + agentSettings.getMatchOn() + "'" : "") + " has been reached.");
                try {
                    // Return an HTTP status code 400.
                    String errorMessage = "The server cannot grant streaming connection to this client because limit has been reached.";
                    res.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, errorMessage);
                } catch (IOException ignore) {
                    // NOWARN
                }
                return;
            }

            Thread currentThread = Thread.currentThread();
            String threadName = currentThread.getName();
            EndpointPushNotifier notifier = null;
            boolean suppressIOExceptionLogging = false; // Used to suppress logging for IO exception.
            try {
                currentThread.setName(threadName + STREAMING_THREAD_NAME_EXTENSION);

                // Open and commit response headers and get output stream.
                if (addNoCacheHeaders)
                    addNoCacheHeaders(req, res);
                res.setContentType(getResponseContentType());
                res.setHeader("Transfer-Encoding", "chunked");
                res.setHeader("Connection", "close");
                ServletOutputStream os = res.getOutputStream();
                res.flushBuffer();

                // If kickstart-bytes are specified, stream them.
                if (kickStartBytesToStream != null) {
                    if (Log.isDebug())
                        log.debug("Endpoint with id '" + getId() + "' is streaming " + kickStartBytesToStream.length
                                + " bytes (not counting chunk encoding overhead) to kick-start the streaming connection for FlexClient with id '"
                                + flexClient.getId() + "'.");

                    streamChunk(kickStartBytesToStream, os, res);
                }

                // Setup serialization and type marshalling contexts
                setThreadLocals();

                // Activate streaming helper for this connection.
                // Watch out for duplicate stream issues.
                try {
                    notifier = new EndpointPushNotifier(this, flexClient);
                } catch (MessageException me) {
                    if (me.getNumber() == 10033) // It's a duplicate stream request from the same FlexClient. Leave the current stream in place and fault this.
                    {
                        if (Log.isWarn())
                            log.warn("Endpoint with id '" + getId() + "' received a duplicate streaming connection request from, FlexClient with id '"
                                    + flexClient.getId() + "'. Faulting request.");

                        // Rollback counters and send an error response.
                        synchronized (lock) {
                            --streamingClientsCount;
                            canStream = (streamingClientsCount < maxStreamingClients);
                            synchronized (session) {
                                --session.streamingConnectionsCount;
                                session.canStream = (session.maxConnectionsPerSession == FlexSession.MAX_CONNECTIONS_PER_SESSION_UNLIMITED
                                        || session.streamingConnectionsCount < session.maxConnectionsPerSession);
                            }
                        }
                        try {
                            res.sendError(HttpServletResponse.SC_BAD_REQUEST);
                        } catch (IOException ignore) {
                            // NOWARN
                        }
                        return; // Exit early.
                    }
                }
                if (connectionIdleTimeoutMinutes > 0)
                    notifier.setIdleTimeoutMinutes(connectionIdleTimeoutMinutes);
                notifier.setLogCategory(getLogCategory());
                monitorTimeout(notifier);
                currentStreamingRequests.put(notifier.getNotifierId(), notifier);

                // Push down an acknowledgement for the 'connect' request containing the unique id for this specific stream.
                AcknowledgeMessage connectAck = new AcknowledgeMessage();
                connectAck.setBody(notifier.getNotifierId());
                connectAck.setCorrelationId(BaseStreamingHTTPEndpoint.OPEN_COMMAND);
                ArrayList toPush = new ArrayList(1);
                toPush.add(connectAck);
                streamMessages(toPush, os, res);

                // Output session level streaming count.
                if (Log.isDebug())
                    Log.getLogger(FlexSession.FLEX_SESSION_LOG_CATEGORY).info("Number of streaming clients for FlexSession with id '" + session.getId() + "' is " + session.streamingConnectionsCount + ".");

                // Output endpoint level streaming count.
                if (Log.isDebug())
                    log.debug("Number of streaming clients for endpoint with id '" + getId() + "' is " + streamingClientsCount + ".");

                // And cycle in a wait-notify loop with the aid of the helper until it
                // is closed, we're interrupted or the act of streaming data to the client fails.
                while (!notifier.isClosed()) {
                    try {
                        // Drain any messages that might have been accumulated
                        // while the previous drain was being processed.
                        List<AsyncMessage> messages = null;
                        synchronized (notifier.pushNeeded) {
                            messages = notifier.drainMessages();
                        }
                        streamMessages(messages, os, res);

                        synchronized (notifier.pushNeeded) {
                            notifier.pushNeeded.wait(serverToClientHeartbeatMillis);

                            messages = notifier.drainMessages();
                        }
                        // If there are no messages to send to the client, send an null
                        // byte as a heartbeat to make sure the client is still valid.
                        if (messages == null && serverToClientHeartbeatMillis > 0) {
                            try {
                                os.write(NULL_BYTE);
                                res.flushBuffer();
                            } catch (IOException e) {
                                if (Log.isWarn())
                                    log.warn("Endpoint with id '" + getId() + "' is closing the streaming connection to FlexClient with id '"
                                            + flexClient.getId() + "' because endpoint encountered a socket write error" +
                                            ", possibly due to an unresponsive FlexClient.", e);
                                break; // Exit the wait loop.
                            }
                        }
                        // Otherwise stream the messages to the client.
                        else {
                            // Update the last time notifier was used to drain messages.
                            // Important for idle timeout detection.
                            notifier.updateLastUse();

                            streamMessages(messages, os, res);
                        }
                    } catch (InterruptedException e) {
                        if (Log.isWarn())
                            log.warn("Streaming thread '" + threadName + "' for endpoint with id '" + getId() + "' has been interrupted and the streaming connection will be closed.");
                        os.close();
                        break; // Exit the wait loop.
                    }

                    // Update the FlexClient last use time to prevent FlexClient from
                    // timing out when the client is still subscribed. It is important
                    // to do this outside synchronized(notifier.pushNeeded) to avoid
                    // thread deadlock!
                    flexClient.updateLastUse();
                }
                if (Log.isDebug())
                    log.debug("Streaming thread '" + threadName + "' for endpoint with id '" + getId() + "' is releasing connection and returning to the request handler pool.");
                suppressIOExceptionLogging = true;
                // Terminate the response.
                streamChunk(null, os, res);
            } catch (IOException e) {
                if (Log.isWarn() && !suppressIOExceptionLogging)
                    log.warn("Streaming thread '" + threadName + "' for endpoint with id '" + getId() + "' is closing connection due to an IO error.", e);
            } finally {
                currentThread.setName(threadName);

                // We're done so decrement the counts for streaming threads,
                // and update the canStream flag if necessary.
                synchronized (lock) {
                    --streamingClientsCount;
                    canStream = (streamingClientsCount < maxStreamingClients);
                    synchronized (session) {
                        --session.streamingConnectionsCount;
                        session.canStream = (session.maxConnectionsPerSession == FlexSession.MAX_CONNECTIONS_PER_SESSION_UNLIMITED
                                || session.streamingConnectionsCount < session.maxConnectionsPerSession);
                    }
                }

                if (notifier != null && currentStreamingRequests != null) {
                    currentStreamingRequests.remove(notifier.getNotifierId());
                    notifier.close();
                }

                // Output session level streaming count.
                if (Log.isDebug())
                    Log.getLogger(FlexSession.FLEX_SESSION_LOG_CATEGORY).info("Number of streaming clients for FlexSession with id '" + session.getId() + "' is " + session.streamingConnectionsCount + ".");

                // Output endpoint level streaming count.
                if (Log.isDebug())
                    log.debug("Number of streaming clients for endpoint with id '" + getId() + "' is " + streamingClientsCount + ".");
            }
        }
        // Otherwise, client's streaming connection open request could not be granted.
        else {
            if (Log.isError()) {
                String logString = null;
                if (!canStream) {
                    logString = "Endpoint with id '" + getId() + "' cannot grant streaming connection to FlexClient with id '"
                            + flexClient.getId() + "' because " + MAX_STREAMING_CLIENTS + " limit of '"
                            + maxStreamingClients + "' has been reached.";
                } else if (!session.canStream) {
                    logString = "Endpoint with id '" + getId() + "' cannot grant streaming connection to FlexClient with id '"
                            + flexClient.getId() + "' because " + UserAgentManager.MAX_STREAMING_CONNECTIONS_PER_SESSION + " limit of '"
                            + session.maxConnectionsPerSession + "' has been reached.";
                }
                if (logString != null)
                    log.error(logString);
            }

            try {
                // Return an HTTP status code 400 to indicate that client request can't be processed.
                String errorMessage = null;
                if (!canStream) {
                    errorMessage = "Endpoint with id '" + getId() + "' cannot grant streaming connection to FlexClient with id '"
                            + flexClient.getId() + "' because " + MAX_STREAMING_CLIENTS + " limit has been reached.";
                } else if (!session.canStream) {
                    errorMessage = "Endpoint with id '" + getId() + "' cannot grant streaming connection to FlexClient with id '"
                            + flexClient.getId() + "' because " + UserAgentManager.MAX_STREAMING_CONNECTIONS_PER_SESSION + " limit has been reached.";
                }
                res.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, errorMessage);
            } catch (IOException ignore) {
            }
        }
    }