in zuul-core/src/main/java/com/netflix/zuul/netty/server/push/PushMessageSender.java [81:159]
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
if (!request.decoderResult().isSuccess()) {
sendHttpResponse(ctx, request, HttpResponseStatus.BAD_REQUEST, null);
return;
}
String path = request.uri();
if (path == null) {
sendHttpResponse(ctx, request, HttpResponseStatus.BAD_REQUEST, null);
return;
}
if (path.endsWith("/push")) {
logPushAttempt();
HttpMethod method = request.method();
if (!Objects.equals(method, HttpMethod.POST) && !Objects.equals(method, HttpMethod.GET)) {
sendHttpResponse(ctx, request, HttpResponseStatus.METHOD_NOT_ALLOWED, null);
return;
}
PushUserAuth userAuth = getPushUserAuth(request);
if (!userAuth.isSuccess()) {
sendHttpResponse(ctx, request, HttpResponseStatus.UNAUTHORIZED, userAuth);
logNoIdentity();
return;
}
PushConnection pushConn = pushConnectionRegistry.get(userAuth.getClientIdentity());
if (pushConn == null) {
sendHttpResponse(ctx, request, HttpResponseStatus.NOT_FOUND, userAuth);
logClientNotConnected();
return;
}
if (!verifySecureToken(request, pushConn)) {
sendHttpResponse(ctx, request, HttpResponseStatus.FORBIDDEN, userAuth);
logSecurityTokenVerificationFail();
return;
}
if (Objects.equals(method, HttpMethod.GET)) {
// client only checking if particular CID + ESN is connected to this instance
sendHttpResponse(ctx, request, HttpResponseStatus.OK, userAuth);
return;
}
if (pushConn.isRateLimited()) {
sendHttpResponse(ctx, request, HttpResponseStatus.SERVICE_UNAVAILABLE, userAuth);
logRateLimited();
return;
}
ByteBuf body = request.content().retain();
if (body.readableBytes() <= 0) {
sendHttpResponse(ctx, request, HttpResponseStatus.NO_CONTENT, userAuth);
// Because we are not passing the body to the pushConn (who would normally handle destroying),
// we need to release it here.
ReferenceCountUtil.release(body);
return;
}
ChannelFuture clientFuture = pushConn.sendPushMessage(body);
clientFuture.addListener(cf -> {
HttpResponseStatus status;
if (cf.isSuccess()) {
logPushSuccess();
status = HttpResponseStatus.OK;
} else {
logPushError(cf.cause());
status = HttpResponseStatus.INTERNAL_SERVER_ERROR;
}
sendHttpResponse(ctx, request, status, userAuth);
});
} else {
// Last handler in the chain
sendHttpResponse(ctx, request, HttpResponseStatus.BAD_REQUEST, null);
}
}