private void handleIterator()

in gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java [413:551]


    private void handleIterator(final Context context, final Iterator itty, final MessageSerializer<?> serializer, final boolean bulking) throws InterruptedException {
        final ChannelHandlerContext nettyContext = context.getChannelHandlerContext();
        final RequestMessage msg = context.getRequestMessage();
        final Settings settings = context.getSettings();

        // used to limit warnings for when netty fills the buffer and hits the high watermark - prevents
        // over-logging of the same message.
        long lastWarningTime = 0;
        int warnCounter = 0;

        // we have an empty iterator - happens on stuff like: g.V().iterate()
        if (!itty.hasNext()) {
            ByteBuf chunk = null;
            try {
                chunk = makeChunk(context, serializer, new ArrayList<>(), false, bulking);
                nettyContext.writeAndFlush(new DefaultHttpContent(chunk));
            } catch (Exception ex) {
                // Bytebuf is a countable release - if it does not get written downstream
                // it needs to be released here
                if (chunk != null) chunk.release();
            }
            sendTrailingHeaders(nettyContext, HttpResponseStatus.OK, "");
            return;
        }

        // the batch size can be overridden by the request
        final int resultIterationBatchSize = (Integer) msg.optionalField(Tokens.ARGS_BATCH_SIZE)
                .orElse(settings.resultIterationBatchSize);
        List<Object> aggregate = new ArrayList<>(resultIterationBatchSize);

        // use an external control to manage the loop as opposed to just checking hasNext() in the while.  this
        // prevent situations where auto transactions create a new transaction after calls to commit() withing
        // the loop on calls to hasNext().
        boolean hasMore = itty.hasNext();

        while (hasMore) {
            if (Thread.interrupted()) throw new InterruptedException();

            // have to check the aggregate size because it is possible that the channel is not writeable (below)
            // so iterating next() if the message is not written and flushed would bump the aggregate size beyond
            // the expected resultIterationBatchSize.  Total serialization time for the response remains in
            // effect so if the client is "slow" it may simply timeout.
            //
            // there is a need to check hasNext() on the iterator because if the channel is not writeable the
            // previous pass through the while loop will have next()'d the iterator and if it is "done" then a
            // NoSuchElementException will raise its head. also need a check to ensure that this iteration doesn't
            // require a forced flush which can be forced by sub-classes.
            //
            // this could be placed inside the isWriteable() portion of the if-then below but it seems better to
            // allow iteration to continue into a batch if that is possible rather than just doing nothing at all
            // while waiting for the client to catch up
            if (aggregate.size() < resultIterationBatchSize && itty.hasNext()) {
                if (bulking) {
                    Traverser traverser = (Traverser) itty.next();
                    aggregate.add(traverser.get());
                    aggregate.add(traverser.bulk());
                } else {
                    aggregate.add(itty.next());
                }
            }

            // Don't keep executor busy if client has already given up; there is no way to catch up if the channel is
            // not active, and hence we should break the loop.
            if (!nettyContext.channel().isActive()) {
                break;
            }

            // send back a page of results if batch size is met or if it's the end of the results being iterated.
            // also check writeability of the channel to prevent OOME for slow clients.
            //
            // clients might decide to close the Netty channel to the server with a CloseWebsocketFrame after errors
            // like CorruptedFrameException. On the server, although the channel gets closed, there might be some
            // executor threads waiting for watermark to clear which will not clear in these cases since client has
            // already given up on these requests. This leads to these executors waiting for the client to consume
            // results till the timeout. checking for isActive() should help prevent that.
            if (nettyContext.channel().isActive() && nettyContext.channel().isWritable()) {
                if (aggregate.size() == resultIterationBatchSize || !itty.hasNext()) {
                    ByteBuf chunk = null;
                    try {
                        chunk = makeChunk(context, serializer, aggregate, itty.hasNext(), bulking);
                    } catch (Exception ex) {
                        // Bytebuf is a countable release - if it does not get written downstream
                        // it needs to be released here
                        if (chunk != null) chunk.release();

                        // exception is handled in makeFrame() - serialization error gets written back to driver
                        // at that point
                        break;
                    }

                    // track whether there is anything left in the iterator because it needs to be accessed after
                    // the transaction could be closed - in that case a call to hasNext() could open a new transaction
                    // unintentionally
                    hasMore = itty.hasNext();

                    try {
                        // only need to reset the aggregation list if there's more stuff to write
                        if (hasMore) {
                            aggregate = new ArrayList<>(resultIterationBatchSize);
                        }
                    } catch (Exception ex) {
                        // Bytebuf is a countable release - if it does not get written downstream
                        // it needs to be released here
                        if (chunk != null) chunk.release();
                        throw ex;
                    }

                    nettyContext.writeAndFlush(new DefaultHttpContent(chunk));

                    if (!hasMore) {
                        sendTrailingHeaders(nettyContext, HttpResponseStatus.OK, "");
                    }
                }
            } else {
                final long currentTime = System.currentTimeMillis();

                // exponential delay between warnings. don't keep triggering this warning over and over again for the
                // same request. totalPendingWriteBytes is volatile so it is possible that by the time this warning
                // hits the log the low watermark may have been hit
                long interval = (long) Math.pow(2, warnCounter) * 1000;
                if (currentTime - lastWarningTime >= interval) {
                    final Channel ch = context.getChannelHandlerContext().channel();
                    logger.warn("Warning {}: Outbound buffer size={}, pausing response writing as writeBufferHighWaterMark exceeded on request {} for channel {} - writing will continue once client has caught up",
                            warnCounter,
                            ch.unsafe().outboundBuffer().totalPendingWriteBytes(),
                            ch.attr(StateKey.REQUEST_ID),
                            ch.id());

                    lastWarningTime = currentTime;
                    warnCounter++;
                }

                // since the client is lagging we can hold here for a period of time for the client to catch up.
                // this isn't blocking the IO thread - just a worker.
                TimeUnit.MILLISECONDS.sleep(WRITE_PAUSE_TIME_MS);
                writePausesMeter.mark();
            }
        }
    }