in server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/BoshBackedSessionContext.java [219:294]
/*package*/ void writeBoshResponse(Stanza responseStanza) {
if (responseStanza == null) throw new IllegalArgumentException();
final boolean isEmtpyResponse = responseStanza == BoshStanzaUtils.EMPTY_BOSH_RESPONSE;
final ArrayList<BoshRequest> boshRequestsForRID = new ArrayList<BoshRequest>(1);
BoshResponse boshResponse;
final Long rid;
synchronized (requestsWindow) {
BoshRequest req = requestsWindow.pollNext();
if (req == null) {
if (isEmtpyResponse) return; // do not delay empty responses, everything's good.
// delay sending until request comes available
final boolean accepted = delayedResponseQueue.offer(responseStanza);
if (!accepted) {
LOGGER.debug("SID = " + getSessionId() + " - stanza not queued. BOSH delayedResponseQueue is full: {}",
delayedResponseQueue.size());
// TODO do not silently drop this stanza
}
return;
}
rid = req.getRid();
// in rare cases, we have same RID in two separate requests
boshRequestsForRID.add(req);
// collect more requests for this RID
while (rid.equals(requestsWindow.firstRid())) {
final BoshRequest sameRidRequest = requestsWindow.pollNext();
boshRequestsForRID.add(sameRidRequest);
LOGGER.warn("SID = " + getSessionId() + " - rid = {} - multi requests ({}) per RID.", rid, boshRequestsForRID.size());
}
long highestContinuousRid = requestsWindow.getHighestContinuousRid();
final Long ack = rid.equals(highestContinuousRid) ? null : highestContinuousRid;
boshResponse = getBoshResponse(responseStanza, ack);
if (LOGGER.isDebugEnabled()) {
String emptyHint = isEmtpyResponse ? "empty " : StringUtils.EMPTY;
LOGGER.debug("SID = " + getSessionId() + " - rid = " + rid + " - BOSH writing {}response: {}", emptyHint, new String(boshResponse.getContent()));
}
}
synchronized (sentResponses) {
if (isResponseSavable(boshRequestsForRID.get(0), responseStanza)) {
sentResponses.put(rid, boshResponse);
// The number of responses to non-pause requests kept in the buffer SHOULD be either the same as the maximum
// number of simultaneous requests allowed or, if Acknowledgements are being used, the number of responses
// that have not yet been acknowledged (this part is handled in insertRequest(BoshRequest)), or
// the hard limit maximumSentResponses (not in the specification) that prevents excessive memory consumption.
if (sentResponses.size() > maximumSentResponses || (!isClientAcknowledgements() && sentResponses.size() > parallelRequestsCount)) {
final Long key = sentResponses.firstKey();
sentResponsesBacklog.add(key, sentResponses.remove(key));
}
}
}
if (sentResponses.size() > maximumSentResponses + 10) {
synchronized (sentResponses) {
LOGGER.warn("stored sent responses ({}) exeeds maximum ({}). purging.", sentResponses.size(), maximumSentResponses);
while (sentResponses.size() > maximumSentResponses) {
final Long key = sentResponses.firstKey();
sentResponsesBacklog.add(key, sentResponses.remove(key));
}
}
}
for (BoshRequest boshRequest : boshRequestsForRID) {
try {
final AsyncContext asyncContext = saveResponse(boshRequest, boshResponse);
asyncContext.dispatch();
} catch (Exception e) {
LOGGER.warn("SID = " + getSessionId() + " - exception in async processing rid = {}", boshRequest.getRid(), e);
}
}
setLatestWriteTimestamp();
updateInactivityChecker();
}