in src/main/java/org/apache/sling/discovery/base/connectors/ping/TopologyConnectorServlet.java [195:323]
protected void doPut(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
if (!isWhitelisted(request)) {
// in theory it would be 403==forbidden, but that would reveal that
// a resource would exist there in the first place
response.sendError(HttpServletResponse.SC_NOT_FOUND);
return;
}
final String[] pathInfo = request.getPathInfo().split("\\.");
final String extension = pathInfo.length == 3 ? pathInfo[2] : "";
if (!"json".equals(extension)) {
response.sendError(HttpServletResponse.SC_NOT_FOUND);
return;
}
final String selector = pathInfo.length == 3 ? pathInfo[1] : "";
String topologyAnnouncementJSON = requestValidator.decodeMessage(request);
if (logger.isDebugEnabled()) {
// javasecurity:S5145: Replace pattern-breaking characters
logger.debug("doPost: incoming topology announcement is: " + topologyAnnouncementJSON.replaceAll("[\n\r\t]", "_"));
}
final Announcement incomingTopologyAnnouncement;
try {
incomingTopologyAnnouncement = Announcement.fromJSON(topologyAnnouncementJSON);
if (!incomingTopologyAnnouncement.getOwnerId().equals(selector)) {
response.sendError(HttpServletResponse.SC_BAD_REQUEST);
return;
}
String slingId = clusterViewService.getSlingId();
if (slingId == null) {
response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
logger.info("doPut: no slingId available. Service not ready as expected at the moment.");
return;
}
incomingTopologyAnnouncement.removeInherited(slingId);
final Announcement replyAnnouncement = new Announcement(slingId);
long backoffInterval = -1;
ClusterView clusterView = clusterViewService.getLocalClusterView();
if (!incomingTopologyAnnouncement.isCorrectVersion()) {
logger.warn("doPost: rejecting an announcement from an incompatible connector protocol version: "
+ incomingTopologyAnnouncement);
response.sendError(HttpServletResponse.SC_BAD_REQUEST);
return;
} else if (ClusterViewHelper.contains(clusterView, incomingTopologyAnnouncement
.getOwnerId())) {
if (logger.isDebugEnabled()) {
logger.debug("doPost: rejecting an announcement from an instance that is part of my cluster: "
+ incomingTopologyAnnouncement);
}
// marking as 'loop'
replyAnnouncement.setLoop(true);
backoffInterval = config.getBackoffStandbyInterval();
} else if (ClusterViewHelper.containsAny(clusterView, incomingTopologyAnnouncement
.listInstances())) {
if (logger.isDebugEnabled()) {
logger.debug("doPost: rejecting an announcement as it contains instance(s) that is/are part of my cluster: "
+ incomingTopologyAnnouncement);
}
// marking as 'loop'
replyAnnouncement.setLoop(true);
backoffInterval = config.getBackoffStandbyInterval();
} else {
backoffInterval = announcementRegistry
.registerAnnouncement(incomingTopologyAnnouncement);
if (logger.isDebugEnabled()) {
logger.debug("doPost: backoffInterval after registration: " + backoffInterval);
}
if (backoffInterval == -1) {
if (logger.isDebugEnabled()) {
logger.debug("doPost: rejecting an announcement from an instance that I already see in my topology: "
+ incomingTopologyAnnouncement);
}
// marking as 'loop'
replyAnnouncement.setLoop(true);
backoffInterval = config.getBackoffStandbyInterval();
} else {
// normal, successful case: replying with the part of the topology which this instance sees
replyAnnouncement.setLocalCluster(clusterView);
announcementRegistry.addAllExcept(replyAnnouncement, clusterView,
(receivingSlingId, announcement) -> {
if (announcement.getPrimaryKey().equals(
incomingTopologyAnnouncement.getPrimaryKey())) {
return false;
}
return true;
});
}
}
if (backoffInterval > 0) {
replyAnnouncement.setBackoffInterval(backoffInterval);
if (logger.isDebugEnabled()) {
logger.debug("doPost: backoffInterval for client set to " + replyAnnouncement.getBackoffInterval());
}
}
final String p = requestValidator.encodeMessage(replyAnnouncement.asJSON());
requestValidator.trustMessage(response, request, p);
// gzip the response if the client accepts this
final String acceptEncodingHeader = request.getHeader("Accept-Encoding");
if (acceptEncodingHeader != null && acceptEncodingHeader.contains("gzip")) {
// tell the client that the content is gzipped:
response.setHeader("Content-Encoding", "gzip");
// then gzip the body
final GZIPOutputStream gzipOut = new GZIPOutputStream(response.getOutputStream());
gzipOut.write(p.getBytes("UTF-8"));
gzipOut.close();
} else {
// otherwise plaintext
final PrintWriter pw = response.getWriter();
pw.print(p);
pw.flush();
}
} catch (JsonException e) {
logger.error("doPost: Got a JSONException: " + e, e);
response.sendError(500);
} catch (UndefinedClusterViewException e) {
logger.warn("doPost: no clusterView available at the moment - cannot handle connectors now: " + e);
response.sendError(503); // "please retry, but atm I can't help since I'm isolated"
}
}