public void insertRequest()

in server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/BoshBackedSessionContext.java [549:721]


    public void insertRequest(final BoshRequest br) {

        final Stanza boshOuterBody = br.getBody();
        final Long rid = br.getRid();
        LOGGER.debug("SID = " + getSessionId() + " - rid = {} - inserting new BOSH request", rid);
        
        // reset the inactivity
        currentInactivitySeconds = inactivitySeconds;

        final HttpServletRequest request = br.getHttpServletRequest();
        request.setAttribute(BOSH_REQUEST_ATTRIBUTE, br);
        final AsyncContext context = request.startAsync();
        addContinuationExpirationListener(context);
        context.setTimeout(this.wait * 1000);

        // allow two more parallel request, be generous in what you receive
        final int maxToleratedParallelRequests = parallelRequestsCount + 2;
        synchronized (requestsWindow) {

            // only allow 'parallelRequestsCount' request to be queued
            final long highestContinuousRid = requestsWindow.getHighestContinuousRid();
            if (highestContinuousRid != -1 && rid > highestContinuousRid + maxToleratedParallelRequests) {
                LOGGER.warn("SID = " + getSessionId() + " - rid = {} - received RID >= the permitted window of concurrent requests ({})",
                        rid, highestContinuousRid);
                // don't queue // queueRequest(br);
                sendError(br, "item-not-found");
                return;
            }
            
            // resend missed responses
            final boolean resend = rid <= requestsWindow.getCurrentProcessingRequest();
            if (resend) {
            // OLD: if (highestContinuousRid != null && rid <= highestContinuousRid) {                
                synchronized (sentResponses) {
                    if (LOGGER.isInfoEnabled()) {
                        final String pendingRids = requestsWindow.logRequestWindow();
                        final String sentRids = logSentResponsesBuffer();
                        LOGGER.info("SID = " + getSessionId() + " - rid = {} - resend request. sent buffer: {} - req.win.: " + pendingRids, rid, sentRids);
                    }
                    if (sentResponses.containsKey(rid)) {
                        LOGGER.info("SID = " + getSessionId() + " - rid = {} (re-sending)", rid);
                        // Resending the old response
                        resendResponse(br);
                    } else {
                        // not in sent responses, try alternatives: backlog and requestWindow
                        
                        final BoshResponse response = sentResponsesBacklog.lookup(rid);
                        if (response != null) {
                            LOGGER.warn("SID = " + getSessionId() + " - rid = {} - BOSH response retrieved from sentResponsesBacklog", rid);
                            resendResponse(br, rid, response);
                            return; // no error
                        }

                        // rid not in sent responses, nor backlog. check to see if rid is still in requests window
                        boolean inRequestsWindow = requestsWindow.containsRid(rid);
                        if (!inRequestsWindow) {
                            if (LOGGER.isWarnEnabled()) {
                                final String sentRids = logSentResponsesBuffer();
                                LOGGER.warn("SID = " + getSessionId() + " - rid = {} - BOSH response not in buffer error - " + sentRids, rid);
                            }
                        } else {
                            if (LOGGER.isWarnEnabled()) {
                                final String sentRids = logSentResponsesBuffer();
                                LOGGER.warn("SID = " + getSessionId() + " - rid = {} - BOSH response still in requests window - " + sentRids, rid);
                            }
                        }
                        sendError(br, "item-not-found");
                    }
                }
                return;
            }
            // check for too many parallel requests
            final boolean terminate = "terminate".equals(boshOuterBody.getAttributeValue("type"));
            final boolean pause = boshOuterBody.getAttributeValue("pause") != null;
            final boolean bodyIsEmpty = boshOuterBody.getInnerElements().isEmpty();
            final int distinctRIDs = requestsWindow.getDistinctRIDs();
            
            if (distinctRIDs >= maxToleratedParallelRequests && !terminate && !pause) {
                LOGGER.warn("SID = " + getSessionId() + " - rid = {} - BOSH Overactivity: Too many simultaneous requests, max = {} " + logRIDSequence(), rid, maxToleratedParallelRequests);
                sendError(br, "policy-violation");
                return;
            }
            // check for new request comes early
            if (distinctRIDs + 1 == maxToleratedParallelRequests && !terminate && !pause && bodyIsEmpty) {
                final long millisSinceLastCalls = Math.abs(br.getTimestamp() - requestsWindow.getLatestAddionTimestamp());
                if (millisSinceLastCalls < pollingSeconds * 1000 && !rid.equals(requestsWindow.getLatestRID())) {
                    LOGGER.warn("SID = " + getSessionId() + " - rid = {} - BOSH Overactivity: Too frequent requests, millis since requests = {}, " + logRIDSequence(), rid, millisSinceLastCalls);
                    sendError(br, "policy-violation");
                    return;
                }
            }
            // check 
            if ((wait == 0 || hold == 0) && bodyIsEmpty) {
                final long millisBetweenEmptyReqs = Math.abs(br.getTimestamp() - latestEmptyPollingRequestTimestamp);
                if (millisBetweenEmptyReqs < pollingSeconds * 1000 && !rid.equals(requestsWindow.getLatestRID())) {
                    LOGGER.warn("SID = " + getSessionId() + " - rid = {} - BOSH Overactivity for polling: Too frequent requests, millis since requests = {}, " + logRIDSequence(), rid, millisBetweenEmptyReqs);
                    sendError(br, "policy-violation");
                    return;
                }
                latestEmptyPollingRequestTimestamp = br.getTimestamp();
            }

            queueRequest(br);
        }


        if (isClientAcknowledgements()) {
            synchronized (sentResponses) {
                if (boshOuterBody.getAttribute("ack") == null) {
                    // if there is no ack attribute present then the client confirmed it received all the responses to all the previous requests
                    // and we clear the cache
                    sentResponsesBacklog.addAll(sentResponses);
                    sentResponses.clear();
                } else if (!sentResponses.isEmpty()) {
                    // After receiving a request with an 'ack' value less than the 'rid' of the last request that it has already responded to,
                    // the connection manager MAY inform the client of the situation. In this case it SHOULD include a 'report' attribute set
                    // to one greater than the 'ack' attribute it received from the client, and a 'time' attribute set to the number of milliseconds
                    // since it sent the response associated with the 'report' attribute.
                    long ack = Long.parseLong(boshOuterBody.getAttributeValue("ack"));
                    if (ack < sentResponses.lastKey() && sentResponses.containsKey(ack + 1)) {
                        long delta = System.currentTimeMillis() - sentResponses.get(ack + 1).getTimestamp();
                        if (delta >= brokenConnectionReportTimeoutMillis) {
                            sendBrokenConnectionReport(ack + 1, delta);
                            return;
                        }
                    }
                }
            }
        }
        
        // we cannot pause if there are missing requests, this is tested with
        // br.getRid().equals(requestsWindow.lastKey()) && highestContinuousRid.equals(br.getRid())
        synchronized (requestsWindow) {
            final String pauseAttribute = boshOuterBody.getAttributeValue("pause");
            if (pauseAttribute != null && 
                    rid.equals(requestsWindow.getLatestRID()) && 
                    rid.equals(requestsWindow.getHighestContinuousRid())) {
                int pause;
                try {
                    pause = Integer.parseInt(pauseAttribute);
                } catch (NumberFormatException e) {
                    queueRequest(br);
                    sendError("bad-request");
                    return;
                }
                pause = Math.max(0, pause);
                pause = Math.min(pause, maxpauseSeconds);
                respondToPause(pause);
                return;
            }
        }

        // If there are delayed responses waiting to be sent to the BOSH client, then we wrap them all in
        // a <body/> element and send them as a HTTP response to the current HTTP request.
        Stanza delayedResponse;
        ArrayList<Stanza> mergeCandidates = null; // do not create until there is a delayed response
        while ((delayedResponse = delayedResponseQueue.poll()) != null) {
            if (mergeCandidates == null) mergeCandidates = new ArrayList<Stanza>();
            mergeCandidates.add(delayedResponse);
        }
        Stanza mergedResponse = BoshStanzaUtils.mergeResponses(mergeCandidates);
        if (mergedResponse != null) {
            LOGGER.debug("SID = " + getSessionId() + " - writing merged response. stanzas merged = " + mergeCandidates.size());
            writeBoshResponse(mergedResponse);
            return;
        }

        // If there are more suspended enqueued requests than it is allowed by the BOSH 'hold' parameter,
        // than we release the oldest one by sending an empty response.
        if (requestsWindow.size() > hold) {
            writeBoshResponse(BoshStanzaUtils.EMPTY_BOSH_RESPONSE);
        }
    }