in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java [427:456]
public List<E> getBatchBlocking(long timeoutMillis) throws InterruptedException {
if (timeoutMillis == 0L) {
// wait forever case
return getBatchBlocking();
} else if (timeoutMillis < 0L) {
throw new IllegalArgumentException("invalid timeout");
}
final long deadline = System.nanoTime() + timeoutMillis * 1_000_000L;
lock.lock();
try {
while (open && elements.isEmpty() && timeoutMillis > 0) {
nonEmpty.await(timeoutMillis, TimeUnit.MILLISECONDS);
timeoutMillis = (deadline - System.nanoTime()) / 1_000_000L;
}
if (!open) {
throw new IllegalStateException("queue is closed");
} else if (elements.isEmpty()) {
return Collections.emptyList();
} else {
ArrayList<E> result = new ArrayList<>(elements);
elements.clear();
return result;
}
} finally {
lock.unlock();
}
}