protected void sendResponse()

in bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java [83:150]


    protected void sendResponse(int rc, Object response, OpStatsLogger statsLogger) {
        final long writeNanos = MathUtils.nowInNano();
        final long timeOut = requestProcessor.getWaitTimeoutOnBackpressureMillis();

        Channel channel = requestHandler.ctx().channel();

        if (timeOut >= 0 && !channel.isWritable()) {
            if (!requestProcessor.isBlacklisted(channel)) {
                synchronized (channel) {
                    if (!channel.isWritable() && !requestProcessor.isBlacklisted(channel)) {
                        final long waitUntilNanos = writeNanos + TimeUnit.MILLISECONDS.toNanos(timeOut);
                        while (!channel.isWritable() && MathUtils.nowInNano() < waitUntilNanos) {
                            try {
                                TimeUnit.MILLISECONDS.sleep(1);
                            } catch (InterruptedException e) {
                                break;
                            }
                        }
                        if (!channel.isWritable()) {
                            requestProcessor.blacklistChannel(channel);
                            requestProcessor.handleNonWritableChannel(channel);
                        }
                    }
                }
            }

            if (!channel.isWritable()) {
                logger.warn("cannot write response to non-writable channel {} for request {}", channel,
                    StringUtils.requestToString(request));
                requestProcessor.getRequestStats().getChannelWriteStats()
                    .registerFailedEvent(MathUtils.elapsedNanos(writeNanos), TimeUnit.NANOSECONDS);
                statsLogger.registerFailedEvent(MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS);
                if (response instanceof BookieProtocol.Response) {
                    ((BookieProtocol.Response) response).release();
                }
                return;
            } else {
                requestProcessor.invalidateBlacklist(channel);
            }
        }

        if (channel.isActive()) {
            final ChannelPromise promise;
            if (logger.isDebugEnabled()) {
                promise = channel.newPromise().addListener(future -> {
                    if (!future.isSuccess()) {
                        logger.debug("Netty channel write exception. ", future.cause());
                    }
                });
            } else {
                promise = channel.voidPromise();
            }
            channel.writeAndFlush(response, promise);
        } else {
            if (response instanceof BookieProtocol.Response) {
                ((BookieProtocol.Response) response).release();
            }
            if (logger.isDebugEnabled()) {
            logger.debug("Netty channel {} is inactive, "
                    + "hence bypassing netty channel writeAndFlush during sendResponse", channel);
            }
        }
        if (BookieProtocol.EOK == rc) {
            statsLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS);
        } else {
            statsLogger.registerFailedEvent(MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS);
        }
    }