in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java [349:376]
public E getElementBlocking(long timeoutMillis) throws InterruptedException {
if (timeoutMillis == 0L) {
// wait forever case
return getElementBlocking();
} else if (timeoutMillis < 0L) {
throw new IllegalArgumentException("invalid timeout");
}
final long deadline = System.nanoTime() + timeoutMillis * 1_000_000L;
lock.lock();
try {
while (open && elements.isEmpty() && timeoutMillis > 0) {
nonEmpty.await(timeoutMillis, TimeUnit.MILLISECONDS);
timeoutMillis = (deadline - System.nanoTime()) / 1_000_000L;
}
if (!open) {
throw new IllegalStateException("queue is closed");
} else if (elements.isEmpty()) {
return null;
} else {
return elements.removeFirst();
}
} finally {
lock.unlock();
}
}