in server/src/main/java/org/apache/druid/client/DirectDruidClient.java [154:545]
public Sequence<T> run(final QueryPlus<T> queryPlus, final ResponseContext context)
{
final Query<T> query = queryPlus.getQuery();
QueryToolChest<T, Query<T>> toolChest = conglomerate.getToolChest(query);
boolean isBySegment = query.context().isBySegment();
final JavaType queryResultType = isBySegment ? toolChest.getBySegmentResultType() : toolChest.getBaseResultType();
final ListenableFuture<InputStream> future;
final String url = scheme + "://" + host + "/druid/v2/";
final String cancelUrl = url + query.getId();
try {
log.debug("Querying queryId [%s] url [%s]", query.getId(), url);
final long requestStartTimeNs = System.nanoTime();
final QueryContext queryContext = query.context();
// Will NPE if the value is not set.
final long timeoutAt = queryContext.getLong(QUERY_FAIL_TIME);
final long maxScatterGatherBytes = queryContext.getMaxScatterGatherBytes();
final AtomicLong totalBytesGathered = context.getTotalBytes();
final long maxQueuedBytes = queryContext.getMaxQueuedBytes(0);
final boolean usingBackpressure = maxQueuedBytes > 0;
final HttpResponseHandler<InputStream, InputStream> responseHandler = new HttpResponseHandler<>()
{
private final AtomicLong totalByteCount = new AtomicLong(0);
private final AtomicLong queuedByteCount = new AtomicLong(0);
private final AtomicLong channelSuspendedTime = new AtomicLong(0);
private final BlockingQueue<InputStreamHolder> queue = new LinkedBlockingQueue<>();
private final AtomicBoolean done = new AtomicBoolean(false);
private final AtomicReference<String> fail = new AtomicReference<>();
private final AtomicReference<TrafficCop> trafficCopRef = new AtomicReference<>();
private QueryMetrics<? super Query<T>> queryMetrics;
private long responseStartTimeNs;
private QueryMetrics<? super Query<T>> acquireResponseMetrics()
{
if (queryMetrics == null) {
queryMetrics = toolChest.makeMetrics(query);
queryMetrics.server(host);
}
return queryMetrics;
}
/**
* Queue a buffer. Returns true if we should keep reading, false otherwise.
*/
private boolean enqueue(ChannelBuffer buffer, long chunkNum) throws InterruptedException
{
// Increment queuedByteCount before queueing the object, so queuedByteCount is at least as high as
// the actual number of queued bytes at any particular time.
final InputStreamHolder holder = InputStreamHolder.fromChannelBuffer(buffer, chunkNum);
final long currentQueuedByteCount = queuedByteCount.addAndGet(holder.getLength());
queue.put(holder);
// True if we should keep reading.
return !usingBackpressure || currentQueuedByteCount < maxQueuedBytes;
}
private InputStream dequeue() throws InterruptedException
{
final InputStreamHolder holder = queue.poll(checkQueryTimeout(), TimeUnit.MILLISECONDS);
if (holder == null) {
throw new QueryTimeoutException(StringUtils.nonStrictFormat("Query[%s] url[%s] timed out.", query.getId(), url));
}
final long currentQueuedByteCount = queuedByteCount.addAndGet(-holder.getLength());
if (usingBackpressure && currentQueuedByteCount < maxQueuedBytes) {
long backPressureTime = Preconditions.checkNotNull(trafficCopRef.get(), "No TrafficCop, how can this be?")
.resume(holder.getChunkNum());
channelSuspendedTime.addAndGet(backPressureTime);
}
return holder.getStream();
}
@Override
public ClientResponse<InputStream> handleResponse(HttpResponse response, TrafficCop trafficCop)
{
trafficCopRef.set(trafficCop);
checkQueryTimeout();
checkTotalBytesLimit(response.getContent().readableBytes());
log.debug("Initial response from url[%s] for queryId[%s]", url, query.getId());
responseStartTimeNs = System.nanoTime();
acquireResponseMetrics().reportNodeTimeToFirstByte(responseStartTimeNs - requestStartTimeNs).emit(emitter);
final boolean continueReading;
try {
log.trace(
"Got a response from [%s] for query ID[%s], subquery ID[%s]",
url,
query.getId(),
query.getSubQueryId()
);
final String responseContext = response.headers().get(QueryResource.HEADER_RESPONSE_CONTEXT);
context.addRemainingResponse(query.getMostSpecificId(), VAL_TO_REDUCE_REMAINING_RESPONSES);
// context may be null in case of error or query timeout
if (responseContext != null) {
context.merge(ResponseContext.deserialize(responseContext, objectMapper));
}
continueReading = enqueue(response.getContent(), 0L);
}
catch (final IOException e) {
log.error(e, "Error parsing response context from url [%s]", url);
return ClientResponse.finished(
new InputStream()
{
@Override
public int read() throws IOException
{
throw e;
}
}
);
}
catch (InterruptedException e) {
log.error(e, "Queue appending interrupted");
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
totalByteCount.addAndGet(response.getContent().readableBytes());
return ClientResponse.finished(
new SequenceInputStream(
new Enumeration<>()
{
@Override
public boolean hasMoreElements()
{
if (fail.get() != null) {
throw new RE(fail.get());
}
checkQueryTimeout();
// Done is always true until the last stream has be put in the queue.
// Then the stream should be spouting good InputStreams.
synchronized (done) {
return !done.get() || !queue.isEmpty();
}
}
@Override
public InputStream nextElement()
{
if (fail.get() != null) {
throw new RE(fail.get());
}
try {
return dequeue();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
}
),
continueReading
);
}
@Override
public ClientResponse<InputStream> handleChunk(
ClientResponse<InputStream> clientResponse,
HttpChunk chunk,
long chunkNum
)
{
checkQueryTimeout();
final ChannelBuffer channelBuffer = chunk.getContent();
final int bytes = channelBuffer.readableBytes();
checkTotalBytesLimit(bytes);
boolean continueReading = true;
if (bytes > 0) {
try {
continueReading = enqueue(channelBuffer, chunkNum);
}
catch (InterruptedException e) {
log.error(e, "Unable to put finalizing input stream into Sequence queue for url [%s]", url);
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
totalByteCount.addAndGet(bytes);
}
return ClientResponse.finished(clientResponse.getObj(), continueReading);
}
@Override
public ClientResponse<InputStream> done(ClientResponse<InputStream> clientResponse)
{
long stopTimeNs = System.nanoTime();
long nodeTimeNs = stopTimeNs - requestStartTimeNs;
final long nodeTimeMs = TimeUnit.NANOSECONDS.toMillis(nodeTimeNs);
log.debug(
"Completed queryId[%s] request to url[%s] with %,d bytes returned in %,d millis [%,f b/s].",
query.getId(),
url,
totalByteCount.get(),
nodeTimeMs,
// Floating math; division by zero will yield Inf, not exception
totalByteCount.get() / (0.001 * nodeTimeMs)
);
QueryMetrics<? super Query<T>> responseMetrics = acquireResponseMetrics();
responseMetrics.reportNodeTime(nodeTimeNs);
responseMetrics.reportNodeBytes(totalByteCount.get());
if (usingBackpressure) {
responseMetrics.reportBackPressureTime(channelSuspendedTime.get());
}
responseMetrics.emit(emitter);
synchronized (done) {
try {
// An empty byte array is put at the end to give the SequenceInputStream.close() as something to close out
// after done is set to true, regardless of the rest of the stream's state.
queue.put(InputStreamHolder.fromChannelBuffer(ChannelBuffers.EMPTY_BUFFER, Long.MAX_VALUE));
}
catch (InterruptedException e) {
log.error(e, "Unable to put finalizing input stream into Sequence queue for url [%s]", url);
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
finally {
done.set(true);
}
}
return ClientResponse.finished(clientResponse.getObj());
}
@Override
public void exceptionCaught(final ClientResponse<InputStream> clientResponse, final Throwable e)
{
String msg = StringUtils.format(
"Query[%s] url[%s] failed with exception msg [%s]",
query.getId(),
url,
e.getMessage()
);
setupResponseReadFailure(msg, e);
}
private void setupResponseReadFailure(String msg, Throwable th)
{
fail.set(msg);
queue.clear();
queue.offer(
InputStreamHolder.fromStream(
new InputStream()
{
@Override
public int read() throws IOException
{
if (th != null) {
throw new IOException(msg, th);
} else {
throw new IOException(msg);
}
}
},
-1,
0
)
);
}
// Returns remaining timeout or throws exception if timeout already elapsed.
private long checkQueryTimeout()
{
long timeLeft = timeoutAt - System.currentTimeMillis();
if (timeLeft <= 0) {
String msg = StringUtils.format("Query[%s] url[%s] timed out.", query.getId(), url);
setupResponseReadFailure(msg, null);
throw new QueryTimeoutException(msg);
} else {
return timeLeft;
}
}
private void checkTotalBytesLimit(long bytes)
{
if (maxScatterGatherBytes < Long.MAX_VALUE && totalBytesGathered.addAndGet(bytes) > maxScatterGatherBytes) {
String msg = StringUtils.format(
"Query[%s] url[%s] max scatter-gather bytes limit reached.",
query.getId(),
url
);
setupResponseReadFailure(msg, null);
throw new ResourceLimitExceededException(msg);
}
}
};
long timeLeft = timeoutAt - System.currentTimeMillis();
if (timeLeft <= 0) {
throw new QueryTimeoutException(StringUtils.nonStrictFormat("Query[%s] url[%s] timed out.", query.getId(), url));
}
// increment is moved up so that if future initialization is queued by some other process,
// we can increment the count earlier so that we can route the request to a different server
openConnections.getAndIncrement();
try {
future = httpClient.go(
new Request(
HttpMethod.POST,
new URL(url)
).setContent(objectMapper.writeValueAsBytes(Queries.withTimeout(query, timeLeft)))
.setHeader(
HttpHeaders.Names.CONTENT_TYPE,
isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON
),
responseHandler,
Duration.millis(timeLeft)
);
}
catch (Exception e) {
openConnections.getAndDecrement();
throw e;
}
queryWatcher.registerQueryFuture(query, future);
Futures.addCallback(
future,
new FutureCallback<>()
{
@Override
public void onSuccess(InputStream result)
{
openConnections.getAndDecrement();
}
@Override
public void onFailure(Throwable t)
{
openConnections.getAndDecrement();
if (future.isCancelled()) {
cancelQuery(query, cancelUrl);
}
}
},
// The callback is non-blocking and quick, so it's OK to schedule it using directExecutor()
Execs.directExecutor()
);
}
catch (IOException e) {
throw new RuntimeException(e);
}
Sequence<T> retVal = new BaseSequence<>(
new BaseSequence.IteratorMaker<T, JsonParserIterator<T>>()
{
@Override
public JsonParserIterator<T> make()
{
return new JsonParserIterator<>(
queryResultType,
future,
url,
query,
host,
toolChest.decorateObjectMapper(objectMapper, query)
);
}
@Override
public void cleanup(JsonParserIterator<T> iterFromMake)
{
CloseableUtils.closeAndWrapExceptions(iterFromMake);
}
}
);
// bySegment queries are de-serialized after caching results in order to
// avoid the cost of de-serializing and then re-serializing again when adding to cache
if (!isBySegment) {
retVal = Sequences.map(
retVal,
toolChest.makePreComputeManipulatorFn(
query,
MetricManipulatorFns.deserializing()
)
);
}
return retVal;
}