in server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/BoshBackedSessionContext.java [549:721]
public void insertRequest(final BoshRequest br) {
final Stanza boshOuterBody = br.getBody();
final Long rid = br.getRid();
LOGGER.debug("SID = " + getSessionId() + " - rid = {} - inserting new BOSH request", rid);
// reset the inactivity
currentInactivitySeconds = inactivitySeconds;
final HttpServletRequest request = br.getHttpServletRequest();
request.setAttribute(BOSH_REQUEST_ATTRIBUTE, br);
final AsyncContext context = request.startAsync();
addContinuationExpirationListener(context);
context.setTimeout(this.wait * 1000);
// allow two more parallel request, be generous in what you receive
final int maxToleratedParallelRequests = parallelRequestsCount + 2;
synchronized (requestsWindow) {
// only allow 'parallelRequestsCount' request to be queued
final long highestContinuousRid = requestsWindow.getHighestContinuousRid();
if (highestContinuousRid != -1 && rid > highestContinuousRid + maxToleratedParallelRequests) {
LOGGER.warn("SID = " + getSessionId() + " - rid = {} - received RID >= the permitted window of concurrent requests ({})",
rid, highestContinuousRid);
// don't queue // queueRequest(br);
sendError(br, "item-not-found");
return;
}
// resend missed responses
final boolean resend = rid <= requestsWindow.getCurrentProcessingRequest();
if (resend) {
// OLD: if (highestContinuousRid != null && rid <= highestContinuousRid) {
synchronized (sentResponses) {
if (LOGGER.isInfoEnabled()) {
final String pendingRids = requestsWindow.logRequestWindow();
final String sentRids = logSentResponsesBuffer();
LOGGER.info("SID = " + getSessionId() + " - rid = {} - resend request. sent buffer: {} - req.win.: " + pendingRids, rid, sentRids);
}
if (sentResponses.containsKey(rid)) {
LOGGER.info("SID = " + getSessionId() + " - rid = {} (re-sending)", rid);
// Resending the old response
resendResponse(br);
} else {
// not in sent responses, try alternatives: backlog and requestWindow
final BoshResponse response = sentResponsesBacklog.lookup(rid);
if (response != null) {
LOGGER.warn("SID = " + getSessionId() + " - rid = {} - BOSH response retrieved from sentResponsesBacklog", rid);
resendResponse(br, rid, response);
return; // no error
}
// rid not in sent responses, nor backlog. check to see if rid is still in requests window
boolean inRequestsWindow = requestsWindow.containsRid(rid);
if (!inRequestsWindow) {
if (LOGGER.isWarnEnabled()) {
final String sentRids = logSentResponsesBuffer();
LOGGER.warn("SID = " + getSessionId() + " - rid = {} - BOSH response not in buffer error - " + sentRids, rid);
}
} else {
if (LOGGER.isWarnEnabled()) {
final String sentRids = logSentResponsesBuffer();
LOGGER.warn("SID = " + getSessionId() + " - rid = {} - BOSH response still in requests window - " + sentRids, rid);
}
}
sendError(br, "item-not-found");
}
}
return;
}
// check for too many parallel requests
final boolean terminate = "terminate".equals(boshOuterBody.getAttributeValue("type"));
final boolean pause = boshOuterBody.getAttributeValue("pause") != null;
final boolean bodyIsEmpty = boshOuterBody.getInnerElements().isEmpty();
final int distinctRIDs = requestsWindow.getDistinctRIDs();
if (distinctRIDs >= maxToleratedParallelRequests && !terminate && !pause) {
LOGGER.warn("SID = " + getSessionId() + " - rid = {} - BOSH Overactivity: Too many simultaneous requests, max = {} " + logRIDSequence(), rid, maxToleratedParallelRequests);
sendError(br, "policy-violation");
return;
}
// check for new request comes early
if (distinctRIDs + 1 == maxToleratedParallelRequests && !terminate && !pause && bodyIsEmpty) {
final long millisSinceLastCalls = Math.abs(br.getTimestamp() - requestsWindow.getLatestAddionTimestamp());
if (millisSinceLastCalls < pollingSeconds * 1000 && !rid.equals(requestsWindow.getLatestRID())) {
LOGGER.warn("SID = " + getSessionId() + " - rid = {} - BOSH Overactivity: Too frequent requests, millis since requests = {}, " + logRIDSequence(), rid, millisSinceLastCalls);
sendError(br, "policy-violation");
return;
}
}
// check
if ((wait == 0 || hold == 0) && bodyIsEmpty) {
final long millisBetweenEmptyReqs = Math.abs(br.getTimestamp() - latestEmptyPollingRequestTimestamp);
if (millisBetweenEmptyReqs < pollingSeconds * 1000 && !rid.equals(requestsWindow.getLatestRID())) {
LOGGER.warn("SID = " + getSessionId() + " - rid = {} - BOSH Overactivity for polling: Too frequent requests, millis since requests = {}, " + logRIDSequence(), rid, millisBetweenEmptyReqs);
sendError(br, "policy-violation");
return;
}
latestEmptyPollingRequestTimestamp = br.getTimestamp();
}
queueRequest(br);
}
if (isClientAcknowledgements()) {
synchronized (sentResponses) {
if (boshOuterBody.getAttribute("ack") == null) {
// if there is no ack attribute present then the client confirmed it received all the responses to all the previous requests
// and we clear the cache
sentResponsesBacklog.addAll(sentResponses);
sentResponses.clear();
} else if (!sentResponses.isEmpty()) {
// After receiving a request with an 'ack' value less than the 'rid' of the last request that it has already responded to,
// the connection manager MAY inform the client of the situation. In this case it SHOULD include a 'report' attribute set
// to one greater than the 'ack' attribute it received from the client, and a 'time' attribute set to the number of milliseconds
// since it sent the response associated with the 'report' attribute.
long ack = Long.parseLong(boshOuterBody.getAttributeValue("ack"));
if (ack < sentResponses.lastKey() && sentResponses.containsKey(ack + 1)) {
long delta = System.currentTimeMillis() - sentResponses.get(ack + 1).getTimestamp();
if (delta >= brokenConnectionReportTimeoutMillis) {
sendBrokenConnectionReport(ack + 1, delta);
return;
}
}
}
}
}
// we cannot pause if there are missing requests, this is tested with
// br.getRid().equals(requestsWindow.lastKey()) && highestContinuousRid.equals(br.getRid())
synchronized (requestsWindow) {
final String pauseAttribute = boshOuterBody.getAttributeValue("pause");
if (pauseAttribute != null &&
rid.equals(requestsWindow.getLatestRID()) &&
rid.equals(requestsWindow.getHighestContinuousRid())) {
int pause;
try {
pause = Integer.parseInt(pauseAttribute);
} catch (NumberFormatException e) {
queueRequest(br);
sendError("bad-request");
return;
}
pause = Math.max(0, pause);
pause = Math.min(pause, maxpauseSeconds);
respondToPause(pause);
return;
}
}
// If there are delayed responses waiting to be sent to the BOSH client, then we wrap them all in
// a <body/> element and send them as a HTTP response to the current HTTP request.
Stanza delayedResponse;
ArrayList<Stanza> mergeCandidates = null; // do not create until there is a delayed response
while ((delayedResponse = delayedResponseQueue.poll()) != null) {
if (mergeCandidates == null) mergeCandidates = new ArrayList<Stanza>();
mergeCandidates.add(delayedResponse);
}
Stanza mergedResponse = BoshStanzaUtils.mergeResponses(mergeCandidates);
if (mergedResponse != null) {
LOGGER.debug("SID = " + getSessionId() + " - writing merged response. stanzas merged = " + mergeCandidates.size());
writeBoshResponse(mergedResponse);
return;
}
// If there are more suspended enqueued requests than it is allowed by the BOSH 'hold' parameter,
// than we release the oldest one by sending an empty response.
if (requestsWindow.size() > hold) {
writeBoshResponse(BoshStanzaUtils.EMPTY_BOSH_RESPONSE);
}
}