in src/com/amazon/kinesis/streaming/agent/tailing/PublishingQueue.java [227:263]
private boolean queueCurrentBuffer(boolean block) {
lock.lock();
try {
if (!isOpen)
return false;
else if (currentBuffer.isEmpty())
return true; // practically a no-op
long waitMillis = flow.getWaitOnFullPublishQueueMillis();
if (block && waitMillis != 0) {
Stopwatch timer = Stopwatch.createStarted();
try {
if (waitMillis > 0) {
// Wait for a limited time
long nanos = TimeUnit.MILLISECONDS.toNanos(waitMillis);
while (isOpen && neverPubQueue.size() == neverPubCapacity && nanos > 0) {
nanos = notFull.awaitNanos(nanos);
}
} else {
if (neverPubQueue.size() == neverPubCapacity) {
// Wait indefinitely
notFull.await();
}
}
return tryQueueCurrentBuffer(timer.elapsed(TimeUnit.MILLISECONDS));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// Doesn't make sense to make this method interruptable; just return immediately
LOGGER.trace("{}: Thread interrupted.", name, e);
return tryQueueCurrentBuffer(timer.elapsed(TimeUnit.MILLISECONDS));
}
} else {
return tryQueueCurrentBuffer(0);
}
} finally {
lock.unlock();
}
}