in src/com/amazon/kinesis/streaming/agent/tailing/PublishingQueue.java [119:150]
public boolean waitNotEmpty() {
lock.lock();
try {
// It's a good time to check if temp buffer needs to be queued, in case the queue is empty
checkPendingRecords();
long waitMillis = flow.getWaitOnEmptyPublishQueueMillis();
if (isOpen && waitMillis != 0) {
try {
if (waitMillis > 0) {
// Wait for a limited time
long nanos = TimeUnit.MILLISECONDS.toNanos(waitMillis);
while (isOpen && neverPubQueue.isEmpty() && retryQueue.isEmpty() && nanos > 0) {
nanos = notEmpty.awaitNanos(nanos);
}
} else {
// Wait indefinitely
if (neverPubQueue.isEmpty() && retryQueue.isEmpty())
notEmpty.await();
}
return size() > 0;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// No need to make method interruptable; just return null.
LOGGER.trace("{}: Thread interrupted.", name, e);
return size() > 0;
}
} else
return size() > 0;
} finally {
lock.unlock();
}
}