protected void channelRead0()

in zuul-core/src/main/java/com/netflix/zuul/netty/server/push/PushMessageSender.java [81:159]


    protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
        if (!request.decoderResult().isSuccess()) {
            sendHttpResponse(ctx, request, HttpResponseStatus.BAD_REQUEST, null);
            return;
        }

        String path = request.uri();
        if (path == null) {
            sendHttpResponse(ctx, request, HttpResponseStatus.BAD_REQUEST, null);
            return;
        }

        if (path.endsWith("/push")) {
            logPushAttempt();

            HttpMethod method = request.method();
            if (!Objects.equals(method, HttpMethod.POST) && !Objects.equals(method, HttpMethod.GET)) {
                sendHttpResponse(ctx, request, HttpResponseStatus.METHOD_NOT_ALLOWED, null);
                return;
            }

            PushUserAuth userAuth = getPushUserAuth(request);
            if (!userAuth.isSuccess()) {
                sendHttpResponse(ctx, request, HttpResponseStatus.UNAUTHORIZED, userAuth);
                logNoIdentity();
                return;
            }

            PushConnection pushConn = pushConnectionRegistry.get(userAuth.getClientIdentity());
            if (pushConn == null) {
                sendHttpResponse(ctx, request, HttpResponseStatus.NOT_FOUND, userAuth);
                logClientNotConnected();
                return;
            }

            if (!verifySecureToken(request, pushConn)) {
                sendHttpResponse(ctx, request, HttpResponseStatus.FORBIDDEN, userAuth);
                logSecurityTokenVerificationFail();
                return;
            }

            if (Objects.equals(method, HttpMethod.GET)) {
                // client only checking if particular CID + ESN is connected to this instance
                sendHttpResponse(ctx, request, HttpResponseStatus.OK, userAuth);
                return;
            }

            if (pushConn.isRateLimited()) {
                sendHttpResponse(ctx, request, HttpResponseStatus.SERVICE_UNAVAILABLE, userAuth);
                logRateLimited();
                return;
            }

            ByteBuf body = request.content().retain();
            if (body.readableBytes() <= 0) {
                sendHttpResponse(ctx, request, HttpResponseStatus.NO_CONTENT, userAuth);
                // Because we are not passing the body to the pushConn (who would normally handle destroying),
                // we need to release it here.
                ReferenceCountUtil.release(body);
                return;
            }

            ChannelFuture clientFuture = pushConn.sendPushMessage(body);
            clientFuture.addListener(cf -> {
                HttpResponseStatus status;
                if (cf.isSuccess()) {
                    logPushSuccess();
                    status = HttpResponseStatus.OK;
                } else {
                    logPushError(cf.cause());
                    status = HttpResponseStatus.INTERNAL_SERVER_ERROR;
                }
                sendHttpResponse(ctx, request, status, userAuth);
            });
        } else {
            // Last handler in the chain
            sendHttpResponse(ctx, request, HttpResponseStatus.BAD_REQUEST, null);
        }
    }