in processing/src/main/java/org/apache/druid/java/util/http/client/NettyHttpClient.java [109:419]
public <Intermediate, Final> ListenableFuture<Final> go(
final Request request,
final HttpResponseHandler<Intermediate, Final> handler,
final Duration requestReadTimeout
)
{
final HttpMethod method = request.getMethod();
final URL url = request.getUrl();
final Multimap<String, String> headers = request.getHeaders();
final String requestDesc = method + " " + url;
if (log.isDebugEnabled()) {
log.debug("[%s] starting", requestDesc);
}
// Block while acquiring a channel from the pool, then complete the request asynchronously.
final Channel channel;
final String hostKey = getPoolKey(url);
final ResourceContainer<ChannelFuture> channelResourceContainer = pool.take(hostKey);
final ChannelFuture channelFuture = channelResourceContainer.get().awaitUninterruptibly();
if (!channelFuture.isSuccess()) {
channelResourceContainer.returnResource(); // Some other poor sap will have to deal with it...
return Futures.immediateFailedFuture(
new ChannelException(
"Faulty channel in resource pool",
channelFuture.getCause()
)
);
} else {
channel = channelFuture.getChannel();
// In case we get a channel that never had its readability turned back on.
channel.setReadable(true);
}
final String urlFile = StringUtils.nullToEmptyNonDruidDataString(url.getFile());
final HttpRequest httpRequest = new DefaultHttpRequest(
HttpVersion.HTTP_1_1,
method,
urlFile.isEmpty() ? "/" : urlFile
);
if (!headers.containsKey(HttpHeaders.Names.HOST)) {
httpRequest.headers().add(HttpHeaders.Names.HOST, getHost(url));
}
// If Accept-Encoding is set in the Request, use that. Otherwise use the default from "compressionCodec".
if (!headers.containsKey(HttpHeaders.Names.ACCEPT_ENCODING)) {
httpRequest.headers().set(HttpHeaders.Names.ACCEPT_ENCODING, compressionCodec.getEncodingString());
}
for (Map.Entry<String, Collection<String>> entry : headers.asMap().entrySet()) {
String key = entry.getKey();
for (String obj : entry.getValue()) {
httpRequest.headers().add(key, obj);
}
}
if (request.hasContent()) {
httpRequest.setContent(request.getContent());
}
final long readTimeout = getReadTimeout(requestReadTimeout);
final SettableFuture<Final> retVal = SettableFuture.create();
// Pipeline can hand us chunks even after exceptionCaught is called. This has the potential to confuse
// HttpResponseHandler implementations, which expect exceptionCaught to be the final method called. So, we
// use this boolean to ensure that handlers do not see any chunks after exceptionCaught fires.
final AtomicBoolean didEncounterException = new AtomicBoolean();
if (readTimeout > 0) {
channel.getPipeline().addLast(
READ_TIMEOUT_HANDLER_NAME,
new ReadTimeoutHandler(timer, readTimeout, TimeUnit.MILLISECONDS)
);
}
channel.getPipeline().addLast(
LAST_HANDLER_NAME,
new SimpleChannelUpstreamHandler()
{
private volatile ClientResponse<Intermediate> response = null;
// Chunk number most recently assigned.
private long currentChunkNum = 0;
// Suspend and resume watermarks (respectively: last chunk number that triggered a suspend, and that was
// provided to the TrafficCop's resume method). Synchronized access since they are not always accessed
// from an I/O thread. (TrafficCops can be called from any thread.)
private final Object watermarkLock = new Object();
private long suspendWatermark = -1;
private long resumeWatermark = -1;
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
{
if (log.isDebugEnabled()) {
log.debug("[%s] messageReceived: %s", requestDesc, e.getMessage());
}
try {
Object msg = e.getMessage();
if (msg instanceof HttpResponse) {
if (didEncounterException.get()) {
// Don't process HttpResponse after encountering an exception.
return;
}
HttpResponse httpResponse = (HttpResponse) msg;
if (log.isDebugEnabled()) {
log.debug("[%s] Got response: %s", requestDesc, httpResponse.getStatus());
}
HttpResponseHandler.TrafficCop trafficCop = resumeChunkNum -> {
synchronized (watermarkLock) {
resumeWatermark = Math.max(resumeWatermark, resumeChunkNum);
if (suspendWatermark >= 0 && resumeWatermark >= suspendWatermark) {
suspendWatermark = -1;
channel.setReadable(true);
long backPressureDuration = System.nanoTime() - backPressureStartTimeNs;
log.debug("[%s] Resumed reads from channel (chunkNum = %,d).", requestDesc, resumeChunkNum);
return backPressureDuration;
}
}
return 0; //If we didn't resume, don't know if backpressure was happening
};
response = handler.handleResponse(httpResponse, trafficCop);
if (response.isFinished()) {
retVal.set((Final) response.getObj());
}
assert currentChunkNum == 0;
possiblySuspendReads(response);
if (!httpResponse.isChunked()) {
finishRequest();
}
} else if (msg instanceof HttpChunk) {
if (didEncounterException.get()) {
// Don't process HttpChunk after encountering an exception.
return;
}
HttpChunk httpChunk = (HttpChunk) msg;
if (log.isDebugEnabled()) {
log.debug(
"[%s] Got chunk: %sB, last=%s",
requestDesc,
httpChunk.getContent().readableBytes(),
httpChunk.isLast()
);
}
if (httpChunk.isLast()) {
finishRequest();
} else {
response = handler.handleChunk(response, httpChunk, ++currentChunkNum);
if (response.isFinished() && !retVal.isDone()) {
retVal.set((Final) response.getObj());
}
possiblySuspendReads(response);
}
} else {
throw new ISE("Unknown message type[%s]", msg.getClass());
}
}
catch (Exception ex) {
log.warn(ex, "[%s] Exception thrown while processing message, closing channel.", requestDesc);
if (!retVal.isDone()) {
retVal.set(null);
}
channel.close();
channelResourceContainer.returnResource();
throw ex;
}
}
private void possiblySuspendReads(ClientResponse<?> response)
{
if (!response.isContinueReading()) {
synchronized (watermarkLock) {
suspendWatermark = Math.max(suspendWatermark, currentChunkNum);
if (suspendWatermark > resumeWatermark) {
channel.setReadable(false);
backPressureStartTimeNs = System.nanoTime();
log.debug("[%s] Suspended reads from channel (chunkNum = %,d).", requestDesc, currentChunkNum);
}
}
}
}
private void finishRequest()
{
ClientResponse<Final> finalResponse = handler.done(response);
if (!finalResponse.isFinished() || !finalResponse.isContinueReading()) {
throw new ISE(
"[%s] Didn't get a completed ClientResponse Object from [%s] (finished = %s, continueReading = %s)",
requestDesc,
handler.getClass(),
finalResponse.isFinished(),
finalResponse.isContinueReading()
);
}
if (!retVal.isDone()) {
retVal.set(finalResponse.getObj());
}
removeHandlers();
channel.setReadable(true);
channelResourceContainer.returnResource();
}
@Override
public void exceptionCaught(ChannelHandlerContext context, ExceptionEvent event)
{
handleExceptionAndCloseChannel(event.getCause(), false);
}
@Override
public void channelDisconnected(ChannelHandlerContext context, ChannelStateEvent event)
{
handleExceptionAndCloseChannel(new ChannelException("Channel disconnected"), true);
}
/**
* Handle an exception by logging it, possibly calling {@link SettableFuture#setException} on {@code retVal},
* possibly calling {@link HttpResponseHandler#exceptionCaught}, and possibly closing the channel.
*
* No actions will be taken (other than logging) if an exception has already been handled for this request.
*
* @param t exception
* @param closeIfNotOpen Call {@link Channel#close()} even if {@link Channel#isOpen()} returns false.
* Provided to retain existing behavior of two different chunks of code that were
* merged into this single method.
*/
private void handleExceptionAndCloseChannel(final Throwable t, final boolean closeIfNotOpen)
{
if (log.isDebugEnabled()) {
log.debug(t, "[%s] Caught exception", requestDesc);
}
// Only process the first exception encountered.
if (!didEncounterException.compareAndSet(false, true)) {
return;
}
if (!retVal.isDone()) {
if (t instanceof ReadTimeoutException) {
// ReadTimeoutException thrown by ReadTimeoutHandler is a singleton with a misleading stack trace.
// No point including it: instead, we replace it with a fresh exception.
retVal.setException(new ReadTimeoutException(StringUtils.format("[%s] Read timed out", requestDesc)));
} else {
retVal.setException(t);
}
}
// response is non-null if we received initial chunk and then exception occurs
if (response != null) {
handler.exceptionCaught(response, t);
}
try {
if (closeIfNotOpen || channel.isOpen()) {
channel.close();
}
}
catch (Exception e) {
log.warn(e, "[%s] Error while closing channel", requestDesc);
}
finally {
channelResourceContainer.returnResource();
}
}
private void removeHandlers()
{
if (readTimeout > 0) {
channel.getPipeline().remove(READ_TIMEOUT_HANDLER_NAME);
}
channel.getPipeline().remove(LAST_HANDLER_NAME);
}
}
);
channel.write(httpRequest).addListener(
new ChannelFutureListener()
{
@Override
public void operationComplete(ChannelFuture future)
{
if (!future.isSuccess()) {
channel.close();
channelResourceContainer.returnResource();
if (!retVal.isDone()) {
retVal.setException(
new ChannelException(
StringUtils.format("[%s] Failed to write request to channel", requestDesc),
future.getCause()
)
);
}
}
}
}
);
return retVal;
}