void writeBoshResponse()

in server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/BoshBackedSessionContext.java [219:294]


    /*package*/ void writeBoshResponse(Stanza responseStanza) {
        if (responseStanza == null) throw new IllegalArgumentException();
        final boolean isEmtpyResponse = responseStanza == BoshStanzaUtils.EMPTY_BOSH_RESPONSE;
        
        final ArrayList<BoshRequest> boshRequestsForRID = new ArrayList<BoshRequest>(1);
        BoshResponse boshResponse;
        final Long rid;
        synchronized (requestsWindow) {
            BoshRequest req = requestsWindow.pollNext();
            if (req == null) {
                if (isEmtpyResponse) return; // do not delay empty responses, everything's good.
                // delay sending until request comes available
                final boolean accepted = delayedResponseQueue.offer(responseStanza);
                if (!accepted) {
                    LOGGER.debug("SID = " + getSessionId() + " - stanza not queued. BOSH delayedResponseQueue is full: {}", 
                            delayedResponseQueue.size());
                    // TODO do not silently drop this stanza
                }
                return;            
            }

            rid = req.getRid();
            // in rare cases, we have same RID in two separate requests
            boshRequestsForRID.add(req);
            
            // collect more requests for this RID
            while (rid.equals(requestsWindow.firstRid())) {
                final BoshRequest sameRidRequest = requestsWindow.pollNext();
                boshRequestsForRID.add(sameRidRequest);
                LOGGER.warn("SID = " + getSessionId() + " - rid = {} - multi requests ({}) per RID.", rid, boshRequestsForRID.size());
            }
            
            long highestContinuousRid = requestsWindow.getHighestContinuousRid();
            final Long ack = rid.equals(highestContinuousRid) ? null : highestContinuousRid;
            boshResponse = getBoshResponse(responseStanza, ack);
            if (LOGGER.isDebugEnabled()) {
                String emptyHint = isEmtpyResponse ? "empty " : StringUtils.EMPTY;
                LOGGER.debug("SID = " + getSessionId() + " - rid = " + rid + " - BOSH writing {}response: {}", emptyHint, new String(boshResponse.getContent()));
            }
        }

        synchronized (sentResponses) {
            if (isResponseSavable(boshRequestsForRID.get(0), responseStanza)) {
                sentResponses.put(rid, boshResponse);
                // The number of responses to non-pause requests kept in the buffer SHOULD be either the same as the maximum
                // number of simultaneous requests allowed or, if Acknowledgements are being used, the number of responses
                // that have not yet been acknowledged (this part is handled in insertRequest(BoshRequest)), or 
                // the hard limit maximumSentResponses (not in the specification) that prevents excessive memory consumption.
                if (sentResponses.size() > maximumSentResponses || (!isClientAcknowledgements() && sentResponses.size() > parallelRequestsCount)) {
                    final Long key = sentResponses.firstKey();
                    sentResponsesBacklog.add(key, sentResponses.remove(key));
                }
            }
        }
        
        if (sentResponses.size() > maximumSentResponses + 10) {
            synchronized (sentResponses) {
                LOGGER.warn("stored sent responses ({}) exeeds maximum ({}). purging.", sentResponses.size(), maximumSentResponses);
                while (sentResponses.size() > maximumSentResponses) {
                    final Long key = sentResponses.firstKey();
                    sentResponsesBacklog.add(key, sentResponses.remove(key));
                }
            }
        }

        for (BoshRequest boshRequest : boshRequestsForRID) {
            try {
                final AsyncContext asyncContext = saveResponse(boshRequest, boshResponse);
                asyncContext.dispatch();
            } catch (Exception e) {
                LOGGER.warn("SID = " + getSessionId() + " - exception in async processing rid = {}", boshRequest.getRid(), e);
            }
        }
        setLatestWriteTimestamp();
        updateInactivityChecker();
    }