private boolean queueCurrentBuffer()

in src/com/amazon/kinesis/streaming/agent/tailing/PublishingQueue.java [227:263]


    private boolean queueCurrentBuffer(boolean block) {
        lock.lock();
        try {
            if (!isOpen)
                return false;
            else if (currentBuffer.isEmpty())
                return true;    // practically a no-op
            long waitMillis = flow.getWaitOnFullPublishQueueMillis();
            if (block && waitMillis != 0) {
                Stopwatch timer = Stopwatch.createStarted();
                try {
                    if (waitMillis > 0) {
                        // Wait for a limited time
                        long nanos = TimeUnit.MILLISECONDS.toNanos(waitMillis);
                        while (isOpen && neverPubQueue.size() == neverPubCapacity && nanos > 0) {
                            nanos = notFull.awaitNanos(nanos);
                        }
                    } else {
                        if (neverPubQueue.size() == neverPubCapacity) {
                            // Wait indefinitely
                            notFull.await();
                        }
                    }
                    return tryQueueCurrentBuffer(timer.elapsed(TimeUnit.MILLISECONDS));
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    // Doesn't make sense to make this method interruptable; just return immediately
                    LOGGER.trace("{}: Thread interrupted.", name, e);
                    return tryQueueCurrentBuffer(timer.elapsed(TimeUnit.MILLISECONDS));
                }
            } else {
                return tryQueueCurrentBuffer(0);
            }
        } finally {
            lock.unlock();
        }
    }