public ListenableFuture go()

in processing/src/main/java/org/apache/druid/java/util/http/client/NettyHttpClient.java [109:419]


  public <Intermediate, Final> ListenableFuture<Final> go(
      final Request request,
      final HttpResponseHandler<Intermediate, Final> handler,
      final Duration requestReadTimeout
  )
  {
    final HttpMethod method = request.getMethod();
    final URL url = request.getUrl();
    final Multimap<String, String> headers = request.getHeaders();

    final String requestDesc = method + " " + url;
    if (log.isDebugEnabled()) {
      log.debug("[%s] starting", requestDesc);
    }

    // Block while acquiring a channel from the pool, then complete the request asynchronously.
    final Channel channel;
    final String hostKey = getPoolKey(url);
    final ResourceContainer<ChannelFuture> channelResourceContainer = pool.take(hostKey);
    final ChannelFuture channelFuture = channelResourceContainer.get().awaitUninterruptibly();
    if (!channelFuture.isSuccess()) {
      channelResourceContainer.returnResource(); // Some other poor sap will have to deal with it...
      return Futures.immediateFailedFuture(
          new ChannelException(
              "Faulty channel in resource pool",
              channelFuture.getCause()
          )
      );
    } else {
      channel = channelFuture.getChannel();

      // In case we get a channel that never had its readability turned back on.
      channel.setReadable(true);
    }
    final String urlFile = StringUtils.nullToEmptyNonDruidDataString(url.getFile());
    final HttpRequest httpRequest = new DefaultHttpRequest(
        HttpVersion.HTTP_1_1,
        method,
        urlFile.isEmpty() ? "/" : urlFile
    );

    if (!headers.containsKey(HttpHeaders.Names.HOST)) {
      httpRequest.headers().add(HttpHeaders.Names.HOST, getHost(url));
    }

    // If Accept-Encoding is set in the Request, use that. Otherwise use the default from "compressionCodec".
    if (!headers.containsKey(HttpHeaders.Names.ACCEPT_ENCODING)) {
      httpRequest.headers().set(HttpHeaders.Names.ACCEPT_ENCODING, compressionCodec.getEncodingString());
    }

    for (Map.Entry<String, Collection<String>> entry : headers.asMap().entrySet()) {
      String key = entry.getKey();

      for (String obj : entry.getValue()) {
        httpRequest.headers().add(key, obj);
      }
    }

    if (request.hasContent()) {
      httpRequest.setContent(request.getContent());
    }

    final long readTimeout = getReadTimeout(requestReadTimeout);
    final SettableFuture<Final> retVal = SettableFuture.create();

    // Pipeline can hand us chunks even after exceptionCaught is called. This has the potential to confuse
    // HttpResponseHandler implementations, which expect exceptionCaught to be the final method called. So, we
    // use this boolean to ensure that handlers do not see any chunks after exceptionCaught fires.
    final AtomicBoolean didEncounterException = new AtomicBoolean();

    if (readTimeout > 0) {
      channel.getPipeline().addLast(
          READ_TIMEOUT_HANDLER_NAME,
          new ReadTimeoutHandler(timer, readTimeout, TimeUnit.MILLISECONDS)
      );
    }

    channel.getPipeline().addLast(
        LAST_HANDLER_NAME,
        new SimpleChannelUpstreamHandler()
        {
          private volatile ClientResponse<Intermediate> response = null;

          // Chunk number most recently assigned.
          private long currentChunkNum = 0;

          // Suspend and resume watermarks (respectively: last chunk number that triggered a suspend, and that was
          // provided to the TrafficCop's resume method). Synchronized access since they are not always accessed
          // from an I/O thread. (TrafficCops can be called from any thread.)
          private final Object watermarkLock = new Object();
          private long suspendWatermark = -1;
          private long resumeWatermark = -1;

          @Override
          public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
          {
            if (log.isDebugEnabled()) {
              log.debug("[%s] messageReceived: %s", requestDesc, e.getMessage());
            }
            try {
              Object msg = e.getMessage();

              if (msg instanceof HttpResponse) {
                if (didEncounterException.get()) {
                  // Don't process HttpResponse after encountering an exception.
                  return;
                }

                HttpResponse httpResponse = (HttpResponse) msg;
                if (log.isDebugEnabled()) {
                  log.debug("[%s] Got response: %s", requestDesc, httpResponse.getStatus());
                }

                HttpResponseHandler.TrafficCop trafficCop = resumeChunkNum -> {
                  synchronized (watermarkLock) {
                    resumeWatermark = Math.max(resumeWatermark, resumeChunkNum);

                    if (suspendWatermark >= 0 && resumeWatermark >= suspendWatermark) {
                      suspendWatermark = -1;
                      channel.setReadable(true);
                      long backPressureDuration = System.nanoTime() - backPressureStartTimeNs;
                      log.debug("[%s] Resumed reads from channel (chunkNum = %,d).", requestDesc, resumeChunkNum);
                      return backPressureDuration;
                    }
                  }

                  return 0; //If we didn't resume, don't know if backpressure was happening
                };
                response = handler.handleResponse(httpResponse, trafficCop);
                if (response.isFinished()) {
                  retVal.set((Final) response.getObj());
                }

                assert currentChunkNum == 0;
                possiblySuspendReads(response);

                if (!httpResponse.isChunked()) {
                  finishRequest();
                }
              } else if (msg instanceof HttpChunk) {
                if (didEncounterException.get()) {
                  // Don't process HttpChunk after encountering an exception.
                  return;
                }

                HttpChunk httpChunk = (HttpChunk) msg;
                if (log.isDebugEnabled()) {
                  log.debug(
                      "[%s] Got chunk: %sB, last=%s",
                      requestDesc,
                      httpChunk.getContent().readableBytes(),
                      httpChunk.isLast()
                  );
                }

                if (httpChunk.isLast()) {
                  finishRequest();
                } else {
                  response = handler.handleChunk(response, httpChunk, ++currentChunkNum);
                  if (response.isFinished() && !retVal.isDone()) {
                    retVal.set((Final) response.getObj());
                  }
                  possiblySuspendReads(response);
                }
              } else {
                throw new ISE("Unknown message type[%s]", msg.getClass());
              }
            }
            catch (Exception ex) {
              log.warn(ex, "[%s] Exception thrown while processing message, closing channel.", requestDesc);

              if (!retVal.isDone()) {
                retVal.set(null);
              }
              channel.close();
              channelResourceContainer.returnResource();

              throw ex;
            }
          }

          private void possiblySuspendReads(ClientResponse<?> response)
          {
            if (!response.isContinueReading()) {
              synchronized (watermarkLock) {
                suspendWatermark = Math.max(suspendWatermark, currentChunkNum);
                if (suspendWatermark > resumeWatermark) {
                  channel.setReadable(false);
                  backPressureStartTimeNs = System.nanoTime();
                  log.debug("[%s] Suspended reads from channel (chunkNum = %,d).", requestDesc, currentChunkNum);
                }
              }
            }
          }

          private void finishRequest()
          {
            ClientResponse<Final> finalResponse = handler.done(response);

            if (!finalResponse.isFinished() || !finalResponse.isContinueReading()) {
              throw new ISE(
                  "[%s] Didn't get a completed ClientResponse Object from [%s] (finished = %s, continueReading = %s)",
                  requestDesc,
                  handler.getClass(),
                  finalResponse.isFinished(),
                  finalResponse.isContinueReading()
              );
            }
            if (!retVal.isDone()) {
              retVal.set(finalResponse.getObj());
            }
            removeHandlers();
            channel.setReadable(true);
            channelResourceContainer.returnResource();
          }

          @Override
          public void exceptionCaught(ChannelHandlerContext context, ExceptionEvent event)
          {
            handleExceptionAndCloseChannel(event.getCause(), false);
          }

          @Override
          public void channelDisconnected(ChannelHandlerContext context, ChannelStateEvent event)
          {
            handleExceptionAndCloseChannel(new ChannelException("Channel disconnected"), true);
          }

          /**
           * Handle an exception by logging it, possibly calling {@link SettableFuture#setException} on {@code retVal},
           * possibly calling {@link HttpResponseHandler#exceptionCaught}, and possibly closing the channel.
           *
           * No actions will be taken (other than logging) if an exception has already been handled for this request.
           *
           * @param t exception
           * @param closeIfNotOpen Call {@link Channel#close()} even if {@link Channel#isOpen()} returns false.
           *                       Provided to retain existing behavior of two different chunks of code that were
           *                       merged into this single method.
           */
          private void handleExceptionAndCloseChannel(final Throwable t, final boolean closeIfNotOpen)
          {
            if (log.isDebugEnabled()) {
              log.debug(t, "[%s] Caught exception", requestDesc);
            }

            // Only process the first exception encountered.
            if (!didEncounterException.compareAndSet(false, true)) {
              return;
            }

            if (!retVal.isDone()) {
              if (t instanceof ReadTimeoutException) {
                // ReadTimeoutException thrown by ReadTimeoutHandler is a singleton with a misleading stack trace.
                // No point including it: instead, we replace it with a fresh exception.
                retVal.setException(new ReadTimeoutException(StringUtils.format("[%s] Read timed out", requestDesc)));
              } else {
                retVal.setException(t);
              }
            }

            // response is non-null if we received initial chunk and then exception occurs
            if (response != null) {
              handler.exceptionCaught(response, t);
            }
            try {
              if (closeIfNotOpen || channel.isOpen()) {
                channel.close();
              }
            }
            catch (Exception e) {
              log.warn(e, "[%s] Error while closing channel", requestDesc);
            }
            finally {
              channelResourceContainer.returnResource();
            }
          }

          private void removeHandlers()
          {
            if (readTimeout > 0) {
              channel.getPipeline().remove(READ_TIMEOUT_HANDLER_NAME);
            }
            channel.getPipeline().remove(LAST_HANDLER_NAME);
          }
        }
    );

    channel.write(httpRequest).addListener(
        new ChannelFutureListener()
        {
          @Override
          public void operationComplete(ChannelFuture future)
          {
            if (!future.isSuccess()) {
              channel.close();
              channelResourceContainer.returnResource();
              if (!retVal.isDone()) {
                retVal.setException(
                    new ChannelException(
                        StringUtils.format("[%s] Failed to write request to channel", requestDesc),
                        future.getCause()
                    )
                );
              }
            }
          }
        }
    );

    return retVal;
  }