in bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java [83:150]
protected void sendResponse(int rc, Object response, OpStatsLogger statsLogger) {
final long writeNanos = MathUtils.nowInNano();
final long timeOut = requestProcessor.getWaitTimeoutOnBackpressureMillis();
Channel channel = requestHandler.ctx().channel();
if (timeOut >= 0 && !channel.isWritable()) {
if (!requestProcessor.isBlacklisted(channel)) {
synchronized (channel) {
if (!channel.isWritable() && !requestProcessor.isBlacklisted(channel)) {
final long waitUntilNanos = writeNanos + TimeUnit.MILLISECONDS.toNanos(timeOut);
while (!channel.isWritable() && MathUtils.nowInNano() < waitUntilNanos) {
try {
TimeUnit.MILLISECONDS.sleep(1);
} catch (InterruptedException e) {
break;
}
}
if (!channel.isWritable()) {
requestProcessor.blacklistChannel(channel);
requestProcessor.handleNonWritableChannel(channel);
}
}
}
}
if (!channel.isWritable()) {
logger.warn("cannot write response to non-writable channel {} for request {}", channel,
StringUtils.requestToString(request));
requestProcessor.getRequestStats().getChannelWriteStats()
.registerFailedEvent(MathUtils.elapsedNanos(writeNanos), TimeUnit.NANOSECONDS);
statsLogger.registerFailedEvent(MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS);
if (response instanceof BookieProtocol.Response) {
((BookieProtocol.Response) response).release();
}
return;
} else {
requestProcessor.invalidateBlacklist(channel);
}
}
if (channel.isActive()) {
final ChannelPromise promise;
if (logger.isDebugEnabled()) {
promise = channel.newPromise().addListener(future -> {
if (!future.isSuccess()) {
logger.debug("Netty channel write exception. ", future.cause());
}
});
} else {
promise = channel.voidPromise();
}
channel.writeAndFlush(response, promise);
} else {
if (response instanceof BookieProtocol.Response) {
((BookieProtocol.Response) response).release();
}
if (logger.isDebugEnabled()) {
logger.debug("Netty channel {} is inactive, "
+ "hence bypassing netty channel writeAndFlush during sendResponse", channel);
}
}
if (BookieProtocol.EOK == rc) {
statsLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS);
} else {
statsLogger.registerFailedEvent(MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS);
}
}